Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19/// A node in the column lineage graph
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22    /// Name of this lineage step (e.g., "table.column")
23    pub name: String,
24    /// The expression at this node
25    pub expression: Expression,
26    /// The source expression (the full query context)
27    pub source: Expression,
28    /// Downstream nodes that depend on this one
29    pub downstream: Vec<LineageNode>,
30    /// Optional source name (e.g., for derived tables)
31    pub source_name: String,
32    /// Semantic source kind for downstream consumers.
33    pub source_kind: SourceKind,
34    /// User-written source alias when different from canonical source name.
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub source_alias: Option<String>,
37    /// Optional reference node name (e.g., for CTEs)
38    pub reference_node_name: String,
39}
40
41impl LineageNode {
42    /// Create a new lineage node
43    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44        Self {
45            name: name.into(),
46            expression,
47            source,
48            downstream: Vec::new(),
49            source_name: String::new(),
50            source_kind: SourceKind::Unknown,
51            source_alias: None,
52            reference_node_name: String::new(),
53        }
54    }
55
56    /// Iterate over all nodes in the lineage graph using DFS
57    pub fn walk(&self) -> LineageWalker<'_> {
58        LineageWalker { stack: vec![self] }
59    }
60
61    /// Get all downstream column names
62    pub fn downstream_names(&self) -> Vec<String> {
63        self.downstream.iter().map(|n| n.name.clone()).collect()
64    }
65}
66
67fn source_kind_for_scope_context(
68    scope: &Scope,
69    source_name: &str,
70    reference_node_name: &str,
71) -> SourceKind {
72    if source_name.is_empty() && reference_node_name.is_empty() {
73        return SourceKind::Root;
74    }
75    if let Some(source_info) = scope.sources.get(source_name) {
76        return source_info.kind;
77    }
78    if scope.cte_sources.contains_key(source_name) {
79        return SourceKind::Cte;
80    }
81    match scope.scope_type {
82        ScopeType::Cte => SourceKind::Cte,
83        ScopeType::DerivedTable => SourceKind::DerivedTable,
84        ScopeType::Udtf => SourceKind::Virtual,
85        _ => SourceKind::Unknown,
86    }
87}
88
89fn apply_scope_context(
90    node: &mut LineageNode,
91    scope: &Scope,
92    source_name: &str,
93    reference_node_name: &str,
94) {
95    node.source_name = source_name.to_string();
96    node.reference_node_name = reference_node_name.to_string();
97    node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100/// Iterator for walking the lineage graph
101pub struct LineageWalker<'a> {
102    stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106    type Item = &'a LineageNode;
107
108    fn next(&mut self) -> Option<Self::Item> {
109        if let Some(node) = self.stack.pop() {
110            // Add children in reverse order so they're visited in order
111            for child in node.downstream.iter().rev() {
112                self.stack.push(child);
113            }
114            Some(node)
115        } else {
116            None
117        }
118    }
119}
120
121// ---------------------------------------------------------------------------
122// ColumnRef: name or positional index for column lookup
123// ---------------------------------------------------------------------------
124
125/// Column reference for lineage tracing — by name or positional index.
126enum ColumnRef<'a> {
127    Name(&'a str),
128    Index(usize),
129}
130
131// ---------------------------------------------------------------------------
132// Public API
133// ---------------------------------------------------------------------------
134
135/// Build the lineage graph for a column in a SQL query
136///
137/// # Arguments
138/// * `column` - The column name to trace lineage for
139/// * `sql` - The SQL expression (SELECT, UNION, etc.)
140/// * `dialect` - Optional dialect for parsing
141/// * `trim_selects` - If true, trim the source SELECT to only include the target column
142///
143/// # Returns
144/// The root lineage node for the specified column
145///
146/// # Example
147/// ```ignore
148/// use polyglot_sql::lineage::lineage;
149/// use polyglot_sql::parse_one;
150/// use polyglot_sql::DialectType;
151///
152/// let sql = "SELECT a, b + 1 AS c FROM t";
153/// let expr = parse_one(sql, DialectType::Generic).unwrap();
154/// let node = lineage("c", &expr, None, false).unwrap();
155/// ```
156pub fn lineage(
157    column: &str,
158    sql: &Expression,
159    dialect: Option<DialectType>,
160    trim_selects: bool,
161) -> Result<LineageNode> {
162    // Fast path: skip clone when there are no CTEs to expand
163    let effective_sql = lineage_effective_expression(sql);
164    let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
165    if !has_with {
166        return lineage_from_expression(column, sql, dialect, trim_selects);
167    }
168    let mut owned = sql.clone();
169    expand_cte_stars(&mut owned, None);
170    lineage_from_expression(column, &owned, dialect, trim_selects)
171}
172
173/// Build the lineage graph for a column in a SQL query using optional schema metadata.
174///
175/// When `schema` is provided, the query is first qualified with
176/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
177/// ambiguous column references.
178///
179/// # Arguments
180/// * `column` - The column name to trace lineage for
181/// * `sql` - The SQL expression (SELECT, UNION, etc.)
182/// * `schema` - Optional schema used for qualification
183/// * `dialect` - Optional dialect for qualification and lineage handling
184/// * `trim_selects` - If true, trim the source SELECT to only include the target column
185///
186/// # Returns
187/// The root lineage node for the specified column
188pub fn lineage_with_schema(
189    column: &str,
190    sql: &Expression,
191    schema: Option<&dyn Schema>,
192    dialect: Option<DialectType>,
193    trim_selects: bool,
194) -> Result<LineageNode> {
195    let mut qualified_expression = if let Some(schema) = schema {
196        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197            QualifyColumnsOptions::new().with_dialect(dialect_type)
198        } else {
199            QualifyColumnsOptions::new()
200        };
201
202        qualify_columns(sql.clone(), schema, &options).map_err(|e| {
203            Error::internal(format!("Lineage qualification failed with schema: {}", e))
204        })?
205    } else {
206        sql.clone()
207    };
208
209    // Annotate types in-place so lineage nodes carry type information
210    annotate_types(&mut qualified_expression, schema, dialect);
211
212    // Expand CTE stars on the already-owned expression (no extra clone).
213    // Pass schema so that stars from external tables can also be resolved.
214    expand_cte_stars(&mut qualified_expression, schema);
215
216    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220    column: &str,
221    sql: &Expression,
222    dialect: Option<DialectType>,
223    trim_selects: bool,
224) -> Result<LineageNode> {
225    let sql = lineage_effective_expression(sql);
226    let scope = build_scope(sql);
227    to_node(
228        ColumnRef::Name(column),
229        &scope,
230        dialect,
231        "",
232        "",
233        "",
234        trim_selects,
235    )
236}
237
238fn lineage_effective_expression(sql: &Expression) -> &Expression {
239    match sql {
240        Expression::Prepare(prepare) => &prepare.statement,
241        _ => sql,
242    }
243}
244
245// ---------------------------------------------------------------------------
246// CTE star expansion
247// ---------------------------------------------------------------------------
248
249/// Normalize an identifier for CTE name matching.
250///
251/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
252/// quoted identifiers preserve their original case. This matches sqlglot's
253/// `normalize_identifiers` behavior.
254fn normalize_cte_name(ident: &Identifier) -> String {
255    if ident.quoted {
256        ident.name.clone()
257    } else {
258        ident.name.to_lowercase()
259    }
260}
261
262/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
263/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
264/// which qualify_columns cannot resolve because it processes each SELECT independently.
265///
266/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
267/// by looking up column names in the schema. This enables correct expansion of patterns
268/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
269///
270/// CTE name matching follows SQL identifier semantics: unquoted names are compared
271/// case-insensitively (lowercased), while quoted names preserve their original case.
272/// This matches sqlglot's `normalize_identifiers` behavior.
273pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
274    if let Expression::Prepare(prepare) = expr {
275        expand_cte_stars(&mut prepare.statement, schema);
276        return;
277    }
278
279    let select = match expr {
280        Expression::Select(s) => s,
281        _ => return,
282    };
283
284    let with = match &mut select.with {
285        Some(w) => w,
286        None => return,
287    };
288
289    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
290
291    for cte in &mut with.ctes {
292        let cte_name = normalize_cte_name(&cte.alias);
293
294        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
295        if !cte.columns.is_empty() {
296            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
297            resolved_cte_columns.insert(cte_name, cols);
298            continue;
299        }
300
301        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
302        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
303        // references itself. We detect this conservatively: if the CTE name appears as
304        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
305        // block are still expanded.
306        if with.recursive {
307            let is_self_referencing =
308                if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
309                    let body_sources = get_select_sources(body_select);
310                    body_sources.iter().any(|s| s.normalized == cte_name)
311                } else {
312                    false
313                };
314            if is_self_referencing {
315                continue;
316            }
317        }
318
319        // Get the SELECT from the CTE body (handle UNION by taking left branch)
320        let body_select = match get_leftmost_select_mut(&mut cte.this) {
321            Some(s) => s,
322            None => continue,
323        };
324
325        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
326        resolved_cte_columns.insert(cte_name, columns);
327    }
328
329    // Also expand stars in the outer SELECT itself
330    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
331}
332
333/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
334///
335/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
336/// are determined by the left branch. This matches sqlglot's behavior.
337fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
338    let mut current = expr;
339    for _ in 0..MAX_LINEAGE_DEPTH {
340        match current {
341            Expression::Select(s) => return Some(s),
342            Expression::Union(u) => current = &mut u.left,
343            Expression::Intersect(i) => current = &mut i.left,
344            Expression::Except(e) => current = &mut e.left,
345            Expression::Paren(p) => current = &mut p.this,
346            _ => return None,
347        }
348    }
349    None
350}
351
352/// Rewrite star expressions in a SELECT using resolved CTE column lists.
353/// Falls back to `schema` for external table column lookup.
354/// Returns the list of output column names after expansion.
355fn rewrite_stars_in_select(
356    select: &mut Select,
357    resolved_ctes: &HashMap<String, Vec<String>>,
358    schema: Option<&dyn Schema>,
359) -> Vec<String> {
360    // The AST represents star expressions in two forms depending on syntax:
361    //   - `SELECT *`      → Expression::Star (unqualified star)
362    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
363    // Both must be checked to handle all star patterns.
364    let has_star = select
365        .expressions
366        .iter()
367        .any(|e| matches!(e, Expression::Star(_)));
368    let has_qualified_star = select
369        .expressions
370        .iter()
371        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
372
373    if !has_star && !has_qualified_star {
374        // No stars — just extract column names without rewriting
375        return select
376            .expressions
377            .iter()
378            .filter_map(get_expression_output_name)
379            .collect();
380    }
381
382    let sources = get_select_sources(select);
383    let mut new_expressions = Vec::new();
384    let mut result_columns = Vec::new();
385
386    for expr in &select.expressions {
387        match expr {
388            Expression::Star(star) => {
389                let qual = star.table.as_ref();
390                if let Some(expanded) =
391                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
392                {
393                    for (src_alias, col_name) in &expanded {
394                        let table_id = Identifier::new(src_alias);
395                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
396                        result_columns.push(col_name.clone());
397                    }
398                } else {
399                    new_expressions.push(expr.clone());
400                    result_columns.push("*".to_string());
401                }
402            }
403            Expression::Column(c) if c.name.name == "*" => {
404                let qual = c.table.as_ref();
405                if let Some(expanded) =
406                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
407                {
408                    for (_src_alias, col_name) in &expanded {
409                        // Keep the original table qualifier for qualified stars (table.*)
410                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
411                        result_columns.push(col_name.clone());
412                    }
413                } else {
414                    new_expressions.push(expr.clone());
415                    result_columns.push("*".to_string());
416                }
417            }
418            _ => {
419                new_expressions.push(expr.clone());
420                if let Some(name) = get_expression_output_name(expr) {
421                    result_columns.push(name);
422                }
423            }
424        }
425    }
426
427    select.expressions = new_expressions;
428    result_columns
429}
430
431/// Try to expand a star expression by looking up source columns from resolved CTEs,
432/// falling back to the schema for external tables.
433/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
434/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
435fn expand_star_from_sources(
436    qualifier: Option<&Identifier>,
437    sources: &[SourceInfo],
438    resolved_ctes: &HashMap<String, Vec<String>>,
439    schema: Option<&dyn Schema>,
440) -> Option<Vec<(String, String)>> {
441    let mut expanded = Vec::new();
442
443    if let Some(qual) = qualifier {
444        // Qualified star: table.*
445        let qual_normalized = normalize_cte_name(qual);
446        for src in sources {
447            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
448                // Try CTE first
449                if let Some(cols) = resolved_ctes.get(&src.normalized) {
450                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
451                    return Some(expanded);
452                }
453                // Fall back to schema
454                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
455                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
456                    return Some(expanded);
457                }
458            }
459        }
460        None
461    } else {
462        // Unqualified star: expand all sources.
463        // Intentionally conservative: if any source can't be resolved, the entire
464        // expansion is aborted. Partial expansion would produce an incomplete column
465        // list, causing downstream lineage resolution to silently omit columns.
466        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
467        let mut any_expanded = false;
468        for src in sources {
469            if let Some(cols) = resolved_ctes.get(&src.normalized) {
470                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
471                any_expanded = true;
472            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
473                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
474                any_expanded = true;
475            } else {
476                return None;
477            }
478        }
479        if any_expanded {
480            Some(expanded)
481        } else {
482            None
483        }
484    }
485}
486
487/// Look up column names for a table from the schema.
488fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
489    let schema = schema?;
490    if fq_name.is_empty() {
491        return None;
492    }
493    schema
494        .column_names(fq_name)
495        .ok()
496        .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
497}
498
499/// Create a Column expression with the given name and optional table qualifier.
500fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
501    Expression::Column(Box::new(crate::expressions::Column {
502        name: Identifier::new(name),
503        table: table.cloned(),
504        join_mark: false,
505        trailing_comments: Vec::new(),
506        span: None,
507        inferred_type: None,
508    }))
509}
510
511/// Extract the output name of a SELECT expression.
512fn get_expression_output_name(expr: &Expression) -> Option<String> {
513    match expr {
514        Expression::Alias(a) => Some(a.alias.name.clone()),
515        Expression::Column(c) => Some(c.name.name.clone()),
516        Expression::Identifier(id) => Some(id.name.clone()),
517        Expression::Star(_) => Some("*".to_string()),
518        _ => None,
519    }
520}
521
522/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
523struct SourceInfo {
524    alias: String,
525    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
526    normalized: String,
527    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
528    fq_name: String,
529}
530
531/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
532/// SELECT's FROM and JOIN clauses in a single pass.
533fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
534    let mut sources = Vec::new();
535
536    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
537        fn virtual_source_info(alias: &Identifier) -> SourceInfo {
538            SourceInfo {
539                alias: alias.name.clone(),
540                normalized: normalize_cte_name(alias),
541                fq_name: alias.name.clone(),
542            }
543        }
544
545        fn named_virtual_source_info(alias: &str) -> SourceInfo {
546            SourceInfo {
547                alias: alias.to_string(),
548                normalized: alias.to_lowercase(),
549                fq_name: alias.to_string(),
550            }
551        }
552
553        match expr {
554            Expression::Table(t) => {
555                let normalized = normalize_cte_name(&t.name);
556                let alias = t
557                    .alias
558                    .as_ref()
559                    .map(|a| a.name.clone())
560                    .unwrap_or_else(|| t.name.name.clone());
561                let mut parts = Vec::new();
562                if let Some(catalog) = &t.catalog {
563                    parts.push(catalog.name.clone());
564                }
565                if let Some(schema) = &t.schema {
566                    parts.push(schema.name.clone());
567                }
568                parts.push(t.name.name.clone());
569                let fq_name = parts.join(".");
570                Some(SourceInfo {
571                    alias,
572                    normalized,
573                    fq_name,
574                })
575            }
576            Expression::Subquery(s) => {
577                let alias = s.alias.as_ref()?.name.clone();
578                let normalized = alias.to_lowercase();
579                let fq_name = alias.clone();
580                Some(SourceInfo {
581                    alias,
582                    normalized,
583                    fq_name,
584                })
585            }
586            Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
587            Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
588                Some(virtual_source_info(&a.alias))
589            }
590            Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
591            Expression::LateralView(lateral_view) => lateral_view
592                .table_alias
593                .as_ref()
594                .or_else(|| lateral_view.column_aliases.first())
595                .map(virtual_source_info),
596            Expression::Paren(p) => extract_source(&p.this),
597            _ => None,
598        }
599    }
600
601    if let Some(from) = &select.from {
602        for expr in &from.expressions {
603            if let Some(info) = extract_source(expr) {
604                sources.push(info);
605            }
606        }
607    }
608    for join in &select.joins {
609        if let Some(info) = extract_source(&join.this) {
610            sources.push(info);
611        }
612    }
613    for lateral_view in &select.lateral_views {
614        if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
615        {
616            sources.push(info);
617        }
618    }
619    sources
620}
621
622/// Get all source tables from a lineage graph
623pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
624    let mut tables = HashSet::new();
625    collect_source_tables(node, &mut tables);
626    tables
627}
628
629/// Recursively collect source table names from lineage graph
630pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
631    if let Expression::Table(table) = &node.source {
632        tables.insert(table.name.name.clone());
633    }
634    for child in &node.downstream {
635        collect_source_tables(child, tables);
636    }
637}
638
639// ---------------------------------------------------------------------------
640// Core recursive lineage builder
641// ---------------------------------------------------------------------------
642
643/// Maximum recursion depth for lineage tracing to prevent stack overflow
644/// on circular or deeply nested CTE chains.
645const MAX_LINEAGE_DEPTH: usize = 64;
646
647/// Recursively build a lineage node for a column in a scope.
648fn to_node(
649    column: ColumnRef<'_>,
650    scope: &Scope,
651    dialect: Option<DialectType>,
652    scope_name: &str,
653    source_name: &str,
654    reference_node_name: &str,
655    trim_selects: bool,
656) -> Result<LineageNode> {
657    to_node_inner(
658        column,
659        scope,
660        dialect,
661        scope_name,
662        source_name,
663        reference_node_name,
664        trim_selects,
665        &[],
666        0,
667    )
668}
669
670fn to_node_inner(
671    column: ColumnRef<'_>,
672    scope: &Scope,
673    dialect: Option<DialectType>,
674    scope_name: &str,
675    source_name: &str,
676    reference_node_name: &str,
677    trim_selects: bool,
678    ancestor_cte_scopes: &[Scope],
679    depth: usize,
680) -> Result<LineageNode> {
681    if depth > MAX_LINEAGE_DEPTH {
682        return Err(Error::internal(format!(
683            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
684        )));
685    }
686    let scope_expr = &scope.expression;
687
688    // Build combined CTE scopes: current scope's cte_scopes + ancestors
689    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
690    for s in ancestor_cte_scopes {
691        all_cte_scopes.push(s);
692    }
693
694    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
695    //    but we need the inner query (SELECT/UNION) for column lookup.
696    let effective_expr = match scope_expr {
697        Expression::Cte(cte) => &cte.this,
698        other => other,
699    };
700
701    // 1. Set operations (UNION / INTERSECT / EXCEPT)
702    if matches!(
703        effective_expr,
704        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
705    ) {
706        // For CTE wrapping a set op, create a temporary scope with the inner expression
707        if matches!(scope_expr, Expression::Cte(_)) {
708            let mut inner_scope = Scope::new(effective_expr.clone());
709            inner_scope.union_scopes = scope.union_scopes.clone();
710            inner_scope.sources = scope.sources.clone();
711            inner_scope.cte_sources = scope.cte_sources.clone();
712            inner_scope.cte_scopes = scope.cte_scopes.clone();
713            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
714            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
715            return handle_set_operation(
716                &column,
717                &inner_scope,
718                dialect,
719                scope_name,
720                source_name,
721                reference_node_name,
722                trim_selects,
723                ancestor_cte_scopes,
724                depth,
725            );
726        }
727        return handle_set_operation(
728            &column,
729            scope,
730            dialect,
731            scope_name,
732            source_name,
733            reference_node_name,
734            trim_selects,
735            ancestor_cte_scopes,
736            depth,
737        );
738    }
739
740    // 2. Find the select expression for this column
741    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
742    let column_name = resolve_column_name(&column, &select_expr);
743
744    // 3. Trim source if requested
745    let node_source = if trim_selects {
746        trim_source(effective_expr, &select_expr)
747    } else {
748        effective_expr.clone()
749    };
750
751    // 4. Create the lineage node
752    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
753    apply_scope_context(&mut node, scope, source_name, reference_node_name);
754
755    // 5. Star handling — add downstream for each source
756    if matches!(&select_expr, Expression::Star(_)) {
757        for (name, source_info) in &scope.sources {
758            let mut child = LineageNode::new(
759                format!("{}.*", name),
760                Expression::Star(crate::expressions::Star {
761                    table: None,
762                    except: None,
763                    replace: None,
764                    rename: None,
765                    trailing_comments: vec![],
766                    span: None,
767                }),
768                source_info.expression.clone(),
769            );
770            apply_source_info_context(&mut child, name, source_info);
771            node.downstream.push(child);
772        }
773        return Ok(node);
774    }
775
776    // 6. Subqueries in select — trace through scalar subqueries
777    let subqueries: Vec<&Expression> =
778        select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
779    for sq_expr in subqueries {
780        if let Expression::Subquery(sq) = sq_expr {
781            for sq_scope in &scope.subquery_scopes {
782                if sq_scope.expression == sq.this {
783                    if let Ok(child) = to_node_inner(
784                        ColumnRef::Index(0),
785                        sq_scope,
786                        dialect,
787                        &column_name,
788                        "",
789                        "",
790                        trim_selects,
791                        ancestor_cte_scopes,
792                        depth + 1,
793                    ) {
794                        node.downstream.push(child);
795                    }
796                    break;
797                }
798            }
799        }
800    }
801
802    // 7. Column references — trace each column to its source
803    let col_refs = find_column_refs_in_expr(&select_expr, dialect);
804    for col_ref in col_refs {
805        let col_name = &col_ref.column;
806        if let Some(ref table_id) = col_ref.table {
807            let tbl = &table_id.name;
808            resolve_qualified_column(
809                &mut node,
810                scope,
811                dialect,
812                tbl,
813                col_name,
814                &column_name,
815                trim_selects,
816                &all_cte_scopes,
817                depth,
818            );
819        } else {
820            resolve_unqualified_column(
821                &mut node,
822                scope,
823                dialect,
824                col_name,
825                &column_name,
826                trim_selects,
827                &all_cte_scopes,
828                depth,
829            );
830        }
831    }
832
833    Ok(node)
834}
835
836// ---------------------------------------------------------------------------
837// Set operation handling
838// ---------------------------------------------------------------------------
839
840fn handle_set_operation(
841    column: &ColumnRef<'_>,
842    scope: &Scope,
843    dialect: Option<DialectType>,
844    scope_name: &str,
845    source_name: &str,
846    reference_node_name: &str,
847    trim_selects: bool,
848    ancestor_cte_scopes: &[Scope],
849    depth: usize,
850) -> Result<LineageNode> {
851    let scope_expr = &scope.expression;
852
853    // Determine column index
854    let col_index = match column {
855        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
856        ColumnRef::Index(i) => *i,
857    };
858
859    let col_name = match column {
860        ColumnRef::Name(name) => name.to_string(),
861        ColumnRef::Index(_) => format!("_{col_index}"),
862    };
863
864    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
865    apply_scope_context(&mut node, scope, source_name, reference_node_name);
866
867    // Recurse into each union branch
868    for branch_scope in &scope.union_scopes {
869        if let Ok(child) = to_node_inner(
870            ColumnRef::Index(col_index),
871            branch_scope,
872            dialect,
873            scope_name,
874            "",
875            "",
876            trim_selects,
877            ancestor_cte_scopes,
878            depth + 1,
879        ) {
880            node.downstream.push(child);
881        }
882    }
883
884    Ok(node)
885}
886
887// ---------------------------------------------------------------------------
888// Column resolution helpers
889// ---------------------------------------------------------------------------
890
891fn resolve_qualified_column(
892    node: &mut LineageNode,
893    scope: &Scope,
894    dialect: Option<DialectType>,
895    table: &str,
896    col_name: &str,
897    parent_name: &str,
898    trim_selects: bool,
899    all_cte_scopes: &[&Scope],
900    depth: usize,
901) {
902    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
903    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
904    let resolved_cte_name = resolve_cte_alias(scope, table);
905    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
906
907    // Check if table is a CTE reference — check both the current scope's cte_sources
908    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
909    let is_cte = scope.cte_sources.contains_key(effective_table)
910        || all_cte_scopes.iter().any(
911            |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
912        );
913    if is_cte {
914        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
915            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
916            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
917            if let Ok(child) = to_node_inner(
918                ColumnRef::Name(col_name),
919                child_scope,
920                dialect,
921                parent_name,
922                effective_table,
923                parent_name,
924                trim_selects,
925                &ancestors,
926                depth + 1,
927            ) {
928                node.downstream.push(child);
929                return;
930            }
931        }
932    }
933
934    // Check if table is a derived table (is_scope = true in sources)
935    if let Some(source_info) = scope.sources.get(table) {
936        if source_info.is_scope {
937            if let Some(child_scope) = find_child_scope(scope, table) {
938                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
939                if let Ok(child) = to_node_inner(
940                    ColumnRef::Name(col_name),
941                    child_scope,
942                    dialect,
943                    parent_name,
944                    table,
945                    parent_name,
946                    trim_selects,
947                    &ancestors,
948                    depth + 1,
949                ) {
950                    node.downstream.push(child);
951                    return;
952                }
953            }
954        }
955    }
956
957    // Base table source found in current scope: preserve alias in the display name
958    // but store the resolved table expression and name for downstream consumers.
959    if let Some(source_info) = scope.sources.get(table) {
960        if !source_info.is_scope {
961            let mut child = make_table_column_node_from_source(table, col_name, source_info);
962            if source_info.kind == SourceKind::Virtual {
963                attach_virtual_source_dependencies(
964                    &mut child,
965                    scope,
966                    dialect,
967                    table,
968                    &source_info.expression,
969                    trim_selects,
970                    all_cte_scopes,
971                    depth,
972                );
973            }
974            node.downstream.push(child);
975            return;
976        }
977    }
978
979    // Base table or unresolved — terminal node
980    node.downstream
981        .push(make_table_column_node(table, col_name));
982}
983
984/// Resolve a FROM alias to the original CTE name.
985///
986/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
987/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
988/// This function checks if `name` is such an alias and returns the CTE name.
989fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
990    // If it's already a known CTE name, no resolution needed
991    if scope.cte_sources.contains_key(name) {
992        return None;
993    }
994    // Check if the source's expression is a CTE — if so, extract the CTE name
995    if let Some(source_info) = scope.sources.get(name) {
996        if source_info.is_scope {
997            if let Expression::Cte(cte) = &source_info.expression {
998                let cte_name = &cte.alias.name;
999                if scope.cte_sources.contains_key(cte_name) {
1000                    return Some(cte_name.clone());
1001                }
1002            }
1003        }
1004    }
1005    None
1006}
1007
1008fn resolve_unqualified_column(
1009    node: &mut LineageNode,
1010    scope: &Scope,
1011    dialect: Option<DialectType>,
1012    col_name: &str,
1013    parent_name: &str,
1014    trim_selects: bool,
1015    all_cte_scopes: &[&Scope],
1016    depth: usize,
1017) {
1018    // Try to find which source this column belongs to.
1019    // Build the source list from the actual FROM/JOIN clauses to avoid
1020    // mixing in CTE definitions that are in scope but not referenced.
1021    let from_source_names = source_names_from_from_join(scope);
1022
1023    if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1024        resolve_qualified_column(
1025            node,
1026            scope,
1027            dialect,
1028            &tbl,
1029            col_name,
1030            parent_name,
1031            trim_selects,
1032            all_cte_scopes,
1033            depth,
1034        );
1035        return;
1036    }
1037
1038    if from_source_names.len() == 1 {
1039        let tbl = &from_source_names[0];
1040        resolve_qualified_column(
1041            node,
1042            scope,
1043            dialect,
1044            tbl,
1045            col_name,
1046            parent_name,
1047            trim_selects,
1048            all_cte_scopes,
1049            depth,
1050        );
1051        return;
1052    }
1053
1054    // Multiple sources — can't resolve without schema info, add unqualified node
1055    let child = LineageNode::new(
1056        col_name.to_string(),
1057        Expression::Column(Box::new(crate::expressions::Column {
1058            name: crate::expressions::Identifier::new(col_name.to_string()),
1059            table: None,
1060            join_mark: false,
1061            trailing_comments: vec![],
1062            span: None,
1063            inferred_type: None,
1064        })),
1065        node.source.clone(),
1066    );
1067    node.downstream.push(child);
1068}
1069
1070fn unique_virtual_source_for_column(
1071    scope: &Scope,
1072    source_names: &[String],
1073    col_name: &str,
1074) -> Option<String> {
1075    let mut matches = source_names.iter().filter_map(|source_name| {
1076        let source = scope.sources.get(source_name)?;
1077        if source.kind == SourceKind::Virtual
1078            && virtual_source_output_columns(source)
1079                .any(|column| column.eq_ignore_ascii_case(col_name))
1080        {
1081            Some(source_name.clone())
1082        } else {
1083            None
1084        }
1085    });
1086
1087    let first = matches.next()?;
1088    if matches.next().is_none() {
1089        Some(first)
1090    } else {
1091        None
1092    }
1093}
1094
1095fn virtual_source_output_columns(
1096    source_info: &ScopeSourceInfo,
1097) -> Box<dyn Iterator<Item = String> + '_> {
1098    match &source_info.expression {
1099        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1100        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1101            Box::new(alias_output_columns(alias))
1102        }
1103        Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1104        Expression::LateralView(lateral_view) => {
1105            Box::new(lateral_view_output_columns(lateral_view))
1106        }
1107        _ => Box::new(source_info.alias.clone().into_iter()),
1108    }
1109}
1110
1111fn unnest_output_columns(
1112    unnest: &crate::expressions::UnnestFunc,
1113) -> impl Iterator<Item = String> + '_ {
1114    unnest
1115        .alias
1116        .iter()
1117        .map(|alias| alias.name.clone())
1118        .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1119}
1120
1121fn alias_output_columns(
1122    alias: &crate::expressions::Alias,
1123) -> Box<dyn Iterator<Item = String> + '_> {
1124    if alias.column_aliases.is_empty() {
1125        Box::new(std::iter::once(alias.alias.name.clone()))
1126    } else {
1127        Box::new(
1128            alias
1129                .column_aliases
1130                .iter()
1131                .map(|column| column.name.clone()),
1132        )
1133    }
1134}
1135
1136fn lateral_output_columns(
1137    lateral: &crate::expressions::Lateral,
1138) -> Box<dyn Iterator<Item = String> + '_> {
1139    if lateral.column_aliases.is_empty() {
1140        default_virtual_output_columns(&lateral.this)
1141    } else {
1142        Box::new(lateral.column_aliases.iter().cloned())
1143    }
1144}
1145
1146fn lateral_view_output_columns(
1147    lateral_view: &crate::expressions::LateralView,
1148) -> Box<dyn Iterator<Item = String> + '_> {
1149    Box::new(
1150        lateral_view
1151            .column_aliases
1152            .iter()
1153            .map(|column| column.name.clone()),
1154    )
1155}
1156
1157fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1158    match expr {
1159        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1160        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1161            alias_output_columns(alias)
1162        }
1163        Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1164            Box::new(
1165                ["seq", "key", "path", "index", "value", "this"]
1166                    .into_iter()
1167                    .map(String::from),
1168            )
1169        }
1170        _ => Box::new(std::iter::empty()),
1171    }
1172}
1173
1174fn attach_virtual_source_dependencies(
1175    node: &mut LineageNode,
1176    scope: &Scope,
1177    dialect: Option<DialectType>,
1178    source_alias: &str,
1179    source_expr: &Expression,
1180    trim_selects: bool,
1181    all_cte_scopes: &[&Scope],
1182    depth: usize,
1183) {
1184    let parent_name = node.name.clone();
1185    let mut seen = HashSet::new();
1186    for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1187        let key = (
1188            col_ref.table.as_ref().map(|t| t.name.clone()),
1189            col_ref.column.clone(),
1190        );
1191        if !seen.insert(key) {
1192            continue;
1193        }
1194
1195        if let Some(table_id) = col_ref.table {
1196            let table = table_id.name;
1197            if table == source_alias {
1198                continue;
1199            }
1200            resolve_qualified_column(
1201                node,
1202                scope,
1203                dialect,
1204                &table,
1205                &col_ref.column,
1206                &parent_name,
1207                trim_selects,
1208                all_cte_scopes,
1209                depth + 1,
1210            );
1211        } else {
1212            let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1213            if non_virtual_sources.len() == 1 {
1214                resolve_qualified_column(
1215                    node,
1216                    scope,
1217                    dialect,
1218                    &non_virtual_sources[0],
1219                    &col_ref.column,
1220                    &parent_name,
1221                    trim_selects,
1222                    all_cte_scopes,
1223                    depth + 1,
1224                );
1225            }
1226        }
1227    }
1228}
1229
1230fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1231    fn source_name(expr: &Expression) -> Option<String> {
1232        match expr {
1233            Expression::Table(table) => Some(
1234                table
1235                    .alias
1236                    .as_ref()
1237                    .map(|a| a.name.clone())
1238                    .unwrap_or_else(|| table.name.name.clone()),
1239            ),
1240            Expression::Subquery(subquery) => {
1241                subquery.alias.as_ref().map(|alias| alias.name.clone())
1242            }
1243            Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1244            Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1245                Some(alias.alias.name.clone())
1246            }
1247            Expression::Lateral(lateral) => lateral.alias.clone(),
1248            Expression::LateralView(lateral_view) => lateral_view
1249                .table_alias
1250                .as_ref()
1251                .or_else(|| lateral_view.column_aliases.first())
1252                .map(|alias| alias.name.clone()),
1253            Expression::Paren(paren) => source_name(&paren.this),
1254            _ => None,
1255        }
1256    }
1257
1258    let effective_expr = match &scope.expression {
1259        Expression::Cte(cte) => &cte.this,
1260        expr => expr,
1261    };
1262
1263    let mut names = Vec::new();
1264    let mut seen = std::collections::HashSet::new();
1265
1266    if let Expression::Select(select) = effective_expr {
1267        if let Some(from) = &select.from {
1268            for expr in &from.expressions {
1269                if let Some(name) = source_name(expr) {
1270                    if !name.is_empty() && seen.insert(name.clone()) {
1271                        names.push(name);
1272                    }
1273                }
1274            }
1275        }
1276        for join in &select.joins {
1277            if let Some(name) = source_name(&join.this) {
1278                if !name.is_empty() && seen.insert(name.clone()) {
1279                    names.push(name);
1280                }
1281            }
1282        }
1283        for lateral_view in &select.lateral_views {
1284            if let Some(name) =
1285                source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
1286            {
1287                if !name.is_empty() && seen.insert(name.clone()) {
1288                    names.push(name);
1289                }
1290            }
1291        }
1292    }
1293
1294    names
1295}
1296
1297fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1298    source_names_from_from_join(scope)
1299        .into_iter()
1300        .filter(|name| {
1301            !matches!(
1302                scope.sources.get(name).map(|source| source.kind),
1303                Some(SourceKind::Virtual)
1304            )
1305        })
1306        .collect()
1307}
1308
1309// ---------------------------------------------------------------------------
1310// Helper functions
1311// ---------------------------------------------------------------------------
1312
1313/// Get the alias or name of an expression
1314fn get_alias_or_name(expr: &Expression) -> Option<String> {
1315    match expr {
1316        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1317        Expression::Column(col) => Some(col.name.name.clone()),
1318        Expression::Identifier(id) => Some(id.name.clone()),
1319        Expression::Star(_) => Some("*".to_string()),
1320        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
1321        // Unwrap to get the actual column/alias name from the inner expression.
1322        Expression::Annotated(a) => get_alias_or_name(&a.this),
1323        _ => None,
1324    }
1325}
1326
1327/// Resolve the display name for a column reference.
1328fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1329    match column {
1330        ColumnRef::Name(n) => n.to_string(),
1331        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1332    }
1333}
1334
1335/// Find the select expression matching a column reference.
1336fn find_select_expr(
1337    scope_expr: &Expression,
1338    column: &ColumnRef<'_>,
1339    dialect: Option<DialectType>,
1340) -> Result<Expression> {
1341    if let Expression::Select(ref select) = scope_expr {
1342        match column {
1343            ColumnRef::Name(name) => {
1344                let normalized_name = normalize_column_name(name, dialect);
1345                for expr in &select.expressions {
1346                    if let Some(alias_or_name) = get_alias_or_name(expr) {
1347                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1348                            return Ok(expr.clone());
1349                        }
1350                    }
1351                }
1352                Err(crate::error::Error::parse(
1353                    format!("Cannot find column '{}' in query", name),
1354                    0,
1355                    0,
1356                    0,
1357                    0,
1358                ))
1359            }
1360            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1361                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1362            }),
1363        }
1364    } else {
1365        Err(crate::error::Error::parse(
1366            "Expected SELECT expression for column lookup",
1367            0,
1368            0,
1369            0,
1370            0,
1371        ))
1372    }
1373}
1374
1375/// Find the positional index of a column name in a set operation's first SELECT branch.
1376fn column_to_index(
1377    set_op_expr: &Expression,
1378    name: &str,
1379    dialect: Option<DialectType>,
1380) -> Result<usize> {
1381    let normalized_name = normalize_column_name(name, dialect);
1382    let mut expr = set_op_expr;
1383    loop {
1384        match expr {
1385            Expression::Union(u) => expr = &u.left,
1386            Expression::Intersect(i) => expr = &i.left,
1387            Expression::Except(e) => expr = &e.left,
1388            Expression::Select(select) => {
1389                for (i, e) in select.expressions.iter().enumerate() {
1390                    if let Some(alias_or_name) = get_alias_or_name(e) {
1391                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1392                            return Ok(i);
1393                        }
1394                    }
1395                }
1396                return Err(crate::error::Error::parse(
1397                    format!("Cannot find column '{}' in set operation", name),
1398                    0,
1399                    0,
1400                    0,
1401                    0,
1402                ));
1403            }
1404            _ => {
1405                return Err(crate::error::Error::parse(
1406                    "Expected SELECT or set operation",
1407                    0,
1408                    0,
1409                    0,
1410                    0,
1411                ))
1412            }
1413        }
1414    }
1415}
1416
1417fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1418    normalize_name(name, dialect, false, true)
1419}
1420
1421/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
1422fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1423    if let Expression::Select(select) = select_expr {
1424        let mut trimmed = select.as_ref().clone();
1425        trimmed.expressions = vec![target_expr.clone()];
1426        Expression::Select(Box::new(trimmed))
1427    } else {
1428        select_expr.clone()
1429    }
1430}
1431
1432/// Find the child scope (CTE or derived table) for a given source name.
1433fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1434    // Check CTE scopes
1435    if scope.cte_sources.contains_key(source_name) {
1436        for cte_scope in &scope.cte_scopes {
1437            if let Expression::Cte(cte) = &cte_scope.expression {
1438                if cte.alias.name == source_name {
1439                    return Some(cte_scope);
1440                }
1441            }
1442        }
1443    }
1444
1445    // Check derived table scopes
1446    if let Some(source_info) = scope.sources.get(source_name) {
1447        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1448            if let Expression::Subquery(sq) = &source_info.expression {
1449                for dt_scope in &scope.derived_table_scopes {
1450                    if dt_scope.expression == sq.this {
1451                        return Some(dt_scope);
1452                    }
1453                }
1454            }
1455        }
1456    }
1457
1458    None
1459}
1460
1461/// Find a CTE scope by name, searching through a combined list of CTE scopes.
1462/// This handles nested CTEs where the current scope doesn't have the CTE scope
1463/// as a direct child but knows about it via cte_sources.
1464fn find_child_scope_in<'a>(
1465    all_cte_scopes: &[&'a Scope],
1466    scope: &'a Scope,
1467    source_name: &str,
1468) -> Option<&'a Scope> {
1469    // First try the scope's own cte_scopes
1470    for cte_scope in &scope.cte_scopes {
1471        if let Expression::Cte(cte) = &cte_scope.expression {
1472            if cte.alias.name == source_name {
1473                return Some(cte_scope);
1474            }
1475        }
1476    }
1477
1478    // Then search through all ancestor CTE scopes
1479    for cte_scope in all_cte_scopes {
1480        if let Expression::Cte(cte) = &cte_scope.expression {
1481            if cte.alias.name == source_name {
1482                return Some(cte_scope);
1483            }
1484        }
1485    }
1486
1487    // Fall back to derived table scopes
1488    if let Some(source_info) = scope.sources.get(source_name) {
1489        if source_info.is_scope {
1490            if let Expression::Subquery(sq) = &source_info.expression {
1491                for dt_scope in &scope.derived_table_scopes {
1492                    if dt_scope.expression == sq.this {
1493                        return Some(dt_scope);
1494                    }
1495                }
1496            }
1497        }
1498    }
1499
1500    None
1501}
1502
1503/// Create a terminal lineage node for a table.column reference.
1504fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1505    let mut node = LineageNode::new(
1506        format!("{}.{}", table, column),
1507        Expression::Column(Box::new(crate::expressions::Column {
1508            name: crate::expressions::Identifier::new(column.to_string()),
1509            table: Some(crate::expressions::Identifier::new(table.to_string())),
1510            join_mark: false,
1511            trailing_comments: vec![],
1512            span: None,
1513            inferred_type: None,
1514        })),
1515        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1516    );
1517    node.source_name = table.to_string();
1518    node.source_kind = SourceKind::Table;
1519    node
1520}
1521
1522fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1523    let mut parts: Vec<String> = Vec::new();
1524    if let Some(catalog) = &table_ref.catalog {
1525        parts.push(catalog.name.clone());
1526    }
1527    if let Some(schema) = &table_ref.schema {
1528        parts.push(schema.name.clone());
1529    }
1530    parts.push(table_ref.name.name.clone());
1531    parts.join(".")
1532}
1533
1534fn apply_source_info_context(
1535    node: &mut LineageNode,
1536    source_key: &str,
1537    source_info: &ScopeSourceInfo,
1538) {
1539    node.source_kind = source_info.kind;
1540    node.source_name =
1541        source_info
1542            .lineage_name
1543            .clone()
1544            .unwrap_or_else(|| match &source_info.expression {
1545                Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
1546                _ => source_key.to_string(),
1547            });
1548    node.source_alias = source_info.alias.clone();
1549}
1550
1551fn make_table_column_node_from_source(
1552    source_key: &str,
1553    column: &str,
1554    source_info: &ScopeSourceInfo,
1555) -> LineageNode {
1556    let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
1557    let mut node = LineageNode::new(
1558        format!("{}.{}", lineage_name, column),
1559        Expression::Column(Box::new(crate::expressions::Column {
1560            name: crate::expressions::Identifier::new(column.to_string()),
1561            table: Some(crate::expressions::Identifier::new(
1562                lineage_name.to_string(),
1563            )),
1564            join_mark: false,
1565            trailing_comments: vec![],
1566            span: None,
1567            inferred_type: None,
1568        })),
1569        source_info.expression.clone(),
1570    );
1571
1572    apply_source_info_context(&mut node, source_key, source_info);
1573
1574    node
1575}
1576
1577/// Simple column reference extracted from an expression
1578#[derive(Debug, Clone)]
1579struct SimpleColumnRef {
1580    table: Option<crate::expressions::Identifier>,
1581    column: String,
1582}
1583
1584/// Find all column references in an expression (does not recurse into subqueries).
1585fn find_column_refs_in_expr(
1586    expr: &Expression,
1587    dialect: Option<DialectType>,
1588) -> Vec<SimpleColumnRef> {
1589    let mut refs = Vec::new();
1590    collect_column_refs(expr, dialect, &mut refs);
1591    refs
1592}
1593
1594fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1595    match expr {
1596        Expression::Column(col) => {
1597            col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1598        }
1599        Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1600        _ => false,
1601    }
1602}
1603
1604fn collect_column_refs(
1605    expr: &Expression,
1606    dialect: Option<DialectType>,
1607    refs: &mut Vec<SimpleColumnRef>,
1608) {
1609    let mut stack: Vec<&Expression> = vec![expr];
1610
1611    while let Some(current) = stack.pop() {
1612        match current {
1613            // === Leaf: collect Column references ===
1614            Expression::Column(col) => {
1615                refs.push(SimpleColumnRef {
1616                    table: col.table.clone(),
1617                    column: col.name.name.clone(),
1618                });
1619            }
1620
1621            // === Boundary: don't recurse into subqueries (handled separately) ===
1622            Expression::Subquery(_) | Expression::Exists(_) => {}
1623
1624            // === BinaryOp variants: left, right ===
1625            Expression::And(op)
1626            | Expression::Or(op)
1627            | Expression::Eq(op)
1628            | Expression::Neq(op)
1629            | Expression::Lt(op)
1630            | Expression::Lte(op)
1631            | Expression::Gt(op)
1632            | Expression::Gte(op)
1633            | Expression::Add(op)
1634            | Expression::Sub(op)
1635            | Expression::Mul(op)
1636            | Expression::Div(op)
1637            | Expression::Mod(op)
1638            | Expression::BitwiseAnd(op)
1639            | Expression::BitwiseOr(op)
1640            | Expression::BitwiseXor(op)
1641            | Expression::BitwiseLeftShift(op)
1642            | Expression::BitwiseRightShift(op)
1643            | Expression::Concat(op)
1644            | Expression::Adjacent(op)
1645            | Expression::TsMatch(op)
1646            | Expression::PropertyEQ(op)
1647            | Expression::ArrayContainsAll(op)
1648            | Expression::ArrayContainedBy(op)
1649            | Expression::ArrayOverlaps(op)
1650            | Expression::JSONBContainsAllTopKeys(op)
1651            | Expression::JSONBContainsAnyTopKeys(op)
1652            | Expression::JSONBDeleteAtPath(op)
1653            | Expression::ExtendsLeft(op)
1654            | Expression::ExtendsRight(op)
1655            | Expression::Is(op)
1656            | Expression::MemberOf(op)
1657            | Expression::NullSafeEq(op)
1658            | Expression::NullSafeNeq(op)
1659            | Expression::Glob(op)
1660            | Expression::Match(op) => {
1661                stack.push(&op.left);
1662                stack.push(&op.right);
1663            }
1664
1665            // === UnaryOp variants: this ===
1666            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1667                stack.push(&u.this);
1668            }
1669
1670            // === UnaryFunc variants: this ===
1671            Expression::Upper(f)
1672            | Expression::Lower(f)
1673            | Expression::Length(f)
1674            | Expression::LTrim(f)
1675            | Expression::RTrim(f)
1676            | Expression::Reverse(f)
1677            | Expression::Abs(f)
1678            | Expression::Sqrt(f)
1679            | Expression::Cbrt(f)
1680            | Expression::Ln(f)
1681            | Expression::Exp(f)
1682            | Expression::Sign(f)
1683            | Expression::Date(f)
1684            | Expression::Time(f)
1685            | Expression::DateFromUnixDate(f)
1686            | Expression::UnixDate(f)
1687            | Expression::UnixSeconds(f)
1688            | Expression::UnixMillis(f)
1689            | Expression::UnixMicros(f)
1690            | Expression::TimeStrToDate(f)
1691            | Expression::DateToDi(f)
1692            | Expression::DiToDate(f)
1693            | Expression::TsOrDiToDi(f)
1694            | Expression::TsOrDsToDatetime(f)
1695            | Expression::TsOrDsToTimestamp(f)
1696            | Expression::YearOfWeek(f)
1697            | Expression::YearOfWeekIso(f)
1698            | Expression::Initcap(f)
1699            | Expression::Ascii(f)
1700            | Expression::Chr(f)
1701            | Expression::Soundex(f)
1702            | Expression::ByteLength(f)
1703            | Expression::Hex(f)
1704            | Expression::LowerHex(f)
1705            | Expression::Unicode(f)
1706            | Expression::Radians(f)
1707            | Expression::Degrees(f)
1708            | Expression::Sin(f)
1709            | Expression::Cos(f)
1710            | Expression::Tan(f)
1711            | Expression::Asin(f)
1712            | Expression::Acos(f)
1713            | Expression::Atan(f)
1714            | Expression::IsNan(f)
1715            | Expression::IsInf(f)
1716            | Expression::ArrayLength(f)
1717            | Expression::ArraySize(f)
1718            | Expression::Cardinality(f)
1719            | Expression::ArrayReverse(f)
1720            | Expression::ArrayDistinct(f)
1721            | Expression::ArrayFlatten(f)
1722            | Expression::ArrayCompact(f)
1723            | Expression::Explode(f)
1724            | Expression::ExplodeOuter(f)
1725            | Expression::ToArray(f)
1726            | Expression::MapFromEntries(f)
1727            | Expression::MapKeys(f)
1728            | Expression::MapValues(f)
1729            | Expression::JsonArrayLength(f)
1730            | Expression::JsonKeys(f)
1731            | Expression::JsonType(f)
1732            | Expression::ParseJson(f)
1733            | Expression::ToJson(f)
1734            | Expression::Typeof(f)
1735            | Expression::BitwiseCount(f)
1736            | Expression::Year(f)
1737            | Expression::Month(f)
1738            | Expression::Day(f)
1739            | Expression::Hour(f)
1740            | Expression::Minute(f)
1741            | Expression::Second(f)
1742            | Expression::DayOfWeek(f)
1743            | Expression::DayOfWeekIso(f)
1744            | Expression::DayOfMonth(f)
1745            | Expression::DayOfYear(f)
1746            | Expression::WeekOfYear(f)
1747            | Expression::Quarter(f)
1748            | Expression::Epoch(f)
1749            | Expression::EpochMs(f)
1750            | Expression::TimeStrToUnix(f)
1751            | Expression::SHA(f)
1752            | Expression::SHA1Digest(f)
1753            | Expression::TimeToUnix(f)
1754            | Expression::JSONBool(f)
1755            | Expression::Int64(f)
1756            | Expression::MD5NumberLower64(f)
1757            | Expression::MD5NumberUpper64(f)
1758            | Expression::DateStrToDate(f)
1759            | Expression::DateToDateStr(f) => {
1760                stack.push(&f.this);
1761            }
1762
1763            // === BinaryFunc variants: this, expression ===
1764            Expression::Power(f)
1765            | Expression::NullIf(f)
1766            | Expression::IfNull(f)
1767            | Expression::Nvl(f)
1768            | Expression::UnixToTimeStr(f)
1769            | Expression::Contains(f)
1770            | Expression::StartsWith(f)
1771            | Expression::EndsWith(f)
1772            | Expression::Levenshtein(f)
1773            | Expression::ModFunc(f)
1774            | Expression::Atan2(f)
1775            | Expression::IntDiv(f)
1776            | Expression::AddMonths(f)
1777            | Expression::MonthsBetween(f)
1778            | Expression::NextDay(f)
1779            | Expression::ArrayContains(f)
1780            | Expression::ArrayPosition(f)
1781            | Expression::ArrayAppend(f)
1782            | Expression::ArrayPrepend(f)
1783            | Expression::ArrayUnion(f)
1784            | Expression::ArrayExcept(f)
1785            | Expression::ArrayRemove(f)
1786            | Expression::StarMap(f)
1787            | Expression::MapFromArrays(f)
1788            | Expression::MapContainsKey(f)
1789            | Expression::ElementAt(f)
1790            | Expression::JsonMergePatch(f)
1791            | Expression::JSONBContains(f)
1792            | Expression::JSONBExtract(f) => {
1793                stack.push(&f.this);
1794                stack.push(&f.expression);
1795            }
1796
1797            // === VarArgFunc variants: expressions ===
1798            Expression::Greatest(f)
1799            | Expression::Least(f)
1800            | Expression::Coalesce(f)
1801            | Expression::ArrayConcat(f)
1802            | Expression::ArrayIntersect(f)
1803            | Expression::ArrayZip(f)
1804            | Expression::MapConcat(f)
1805            | Expression::JsonArray(f) => {
1806                for e in &f.expressions {
1807                    stack.push(e);
1808                }
1809            }
1810
1811            // === AggFunc variants: this, filter, having_max, limit ===
1812            Expression::Sum(f)
1813            | Expression::Avg(f)
1814            | Expression::Min(f)
1815            | Expression::Max(f)
1816            | Expression::ArrayAgg(f)
1817            | Expression::CountIf(f)
1818            | Expression::Stddev(f)
1819            | Expression::StddevPop(f)
1820            | Expression::StddevSamp(f)
1821            | Expression::Variance(f)
1822            | Expression::VarPop(f)
1823            | Expression::VarSamp(f)
1824            | Expression::Median(f)
1825            | Expression::Mode(f)
1826            | Expression::First(f)
1827            | Expression::Last(f)
1828            | Expression::AnyValue(f)
1829            | Expression::ApproxDistinct(f)
1830            | Expression::ApproxCountDistinct(f)
1831            | Expression::LogicalAnd(f)
1832            | Expression::LogicalOr(f)
1833            | Expression::Skewness(f)
1834            | Expression::ArrayConcatAgg(f)
1835            | Expression::ArrayUniqueAgg(f)
1836            | Expression::BoolXorAgg(f)
1837            | Expression::BitwiseAndAgg(f)
1838            | Expression::BitwiseOrAgg(f)
1839            | Expression::BitwiseXorAgg(f) => {
1840                stack.push(&f.this);
1841                if let Some(ref filter) = f.filter {
1842                    stack.push(filter);
1843                }
1844                if let Some((ref expr, _)) = f.having_max {
1845                    stack.push(expr);
1846                }
1847                if let Some(ref limit) = f.limit {
1848                    stack.push(limit);
1849                }
1850            }
1851
1852            // === Generic Function / AggregateFunction: args ===
1853            Expression::Function(func) => {
1854                for arg in &func.args {
1855                    stack.push(arg);
1856                }
1857            }
1858            Expression::AggregateFunction(func) => {
1859                for arg in &func.args {
1860                    stack.push(arg);
1861                }
1862                if let Some(ref filter) = func.filter {
1863                    stack.push(filter);
1864                }
1865                if let Some(ref limit) = func.limit {
1866                    stack.push(limit);
1867                }
1868            }
1869
1870            // === WindowFunction: this (skip Over for lineage purposes) ===
1871            Expression::WindowFunction(wf) => {
1872                stack.push(&wf.this);
1873            }
1874
1875            // === Containers and special expressions ===
1876            Expression::Alias(a) => {
1877                stack.push(&a.this);
1878            }
1879            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1880                stack.push(&c.this);
1881                if let Some(ref fmt) = c.format {
1882                    stack.push(fmt);
1883                }
1884                if let Some(ref def) = c.default {
1885                    stack.push(def);
1886                }
1887            }
1888            Expression::Paren(p) => {
1889                stack.push(&p.this);
1890            }
1891            Expression::Annotated(a) => {
1892                stack.push(&a.this);
1893            }
1894            Expression::Case(case) => {
1895                if let Some(ref operand) = case.operand {
1896                    stack.push(operand);
1897                }
1898                for (cond, result) in &case.whens {
1899                    stack.push(cond);
1900                    stack.push(result);
1901                }
1902                if let Some(ref else_expr) = case.else_ {
1903                    stack.push(else_expr);
1904                }
1905            }
1906            Expression::Collation(c) => {
1907                stack.push(&c.this);
1908            }
1909            Expression::In(i) => {
1910                stack.push(&i.this);
1911                for e in &i.expressions {
1912                    stack.push(e);
1913                }
1914                if let Some(ref q) = i.query {
1915                    stack.push(q);
1916                }
1917                if let Some(ref u) = i.unnest {
1918                    stack.push(u);
1919                }
1920            }
1921            Expression::Between(b) => {
1922                stack.push(&b.this);
1923                stack.push(&b.low);
1924                stack.push(&b.high);
1925            }
1926            Expression::IsNull(n) => {
1927                stack.push(&n.this);
1928            }
1929            Expression::IsTrue(t) | Expression::IsFalse(t) => {
1930                stack.push(&t.this);
1931            }
1932            Expression::IsJson(j) => {
1933                stack.push(&j.this);
1934            }
1935            Expression::Like(l) | Expression::ILike(l) => {
1936                stack.push(&l.left);
1937                stack.push(&l.right);
1938                if let Some(ref esc) = l.escape {
1939                    stack.push(esc);
1940                }
1941            }
1942            Expression::SimilarTo(s) => {
1943                stack.push(&s.this);
1944                stack.push(&s.pattern);
1945                if let Some(ref esc) = s.escape {
1946                    stack.push(esc);
1947                }
1948            }
1949            Expression::Ordered(o) => {
1950                stack.push(&o.this);
1951            }
1952            Expression::Array(a) => {
1953                for e in &a.expressions {
1954                    stack.push(e);
1955                }
1956            }
1957            Expression::Tuple(t) => {
1958                for e in &t.expressions {
1959                    stack.push(e);
1960                }
1961            }
1962            Expression::Struct(s) => {
1963                for (_, e) in &s.fields {
1964                    stack.push(e);
1965                }
1966            }
1967            Expression::Subscript(s) => {
1968                stack.push(&s.this);
1969                stack.push(&s.index);
1970            }
1971            Expression::Dot(d) => {
1972                stack.push(&d.this);
1973            }
1974            Expression::MethodCall(m) => {
1975                if !matches!(dialect, Some(DialectType::BigQuery))
1976                    || !is_bigquery_safe_namespace_receiver(&m.this)
1977                {
1978                    stack.push(&m.this);
1979                }
1980                for arg in &m.args {
1981                    stack.push(arg);
1982                }
1983            }
1984            Expression::ArraySlice(s) => {
1985                stack.push(&s.this);
1986                if let Some(ref start) = s.start {
1987                    stack.push(start);
1988                }
1989                if let Some(ref end) = s.end {
1990                    stack.push(end);
1991                }
1992            }
1993            Expression::Lambda(l) => {
1994                stack.push(&l.body);
1995            }
1996            Expression::NamedArgument(n) => {
1997                stack.push(&n.value);
1998            }
1999            Expression::Lateral(l) => {
2000                stack.push(&l.this);
2001                if let Some(ref view) = l.view {
2002                    stack.push(view);
2003                }
2004                if let Some(ref outer) = l.outer {
2005                    stack.push(outer);
2006                }
2007                if let Some(ref ordinality) = l.ordinality {
2008                    stack.push(ordinality);
2009                }
2010            }
2011            Expression::LateralView(lv) => {
2012                stack.push(&lv.this);
2013            }
2014            Expression::TryCatch(t) => {
2015                for stmt in &t.try_body {
2016                    stack.push(stmt);
2017                }
2018                if let Some(catch_body) = &t.catch_body {
2019                    for stmt in catch_body {
2020                        stack.push(stmt);
2021                    }
2022                }
2023            }
2024            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
2025                stack.push(e);
2026            }
2027
2028            // === Custom function structs ===
2029            Expression::Substring(f) => {
2030                stack.push(&f.this);
2031                stack.push(&f.start);
2032                if let Some(ref len) = f.length {
2033                    stack.push(len);
2034                }
2035            }
2036            Expression::Trim(f) => {
2037                stack.push(&f.this);
2038                if let Some(ref chars) = f.characters {
2039                    stack.push(chars);
2040                }
2041            }
2042            Expression::Replace(f) => {
2043                stack.push(&f.this);
2044                stack.push(&f.old);
2045                stack.push(&f.new);
2046            }
2047            Expression::IfFunc(f) => {
2048                stack.push(&f.condition);
2049                stack.push(&f.true_value);
2050                if let Some(ref fv) = f.false_value {
2051                    stack.push(fv);
2052                }
2053            }
2054            Expression::Nvl2(f) => {
2055                stack.push(&f.this);
2056                stack.push(&f.true_value);
2057                stack.push(&f.false_value);
2058            }
2059            Expression::ConcatWs(f) => {
2060                stack.push(&f.separator);
2061                for e in &f.expressions {
2062                    stack.push(e);
2063                }
2064            }
2065            Expression::Count(f) => {
2066                if let Some(ref this) = f.this {
2067                    stack.push(this);
2068                }
2069                if let Some(ref filter) = f.filter {
2070                    stack.push(filter);
2071                }
2072            }
2073            Expression::GroupConcat(f) => {
2074                stack.push(&f.this);
2075                if let Some(ref sep) = f.separator {
2076                    stack.push(sep);
2077                }
2078                if let Some(ref filter) = f.filter {
2079                    stack.push(filter);
2080                }
2081            }
2082            Expression::StringAgg(f) => {
2083                stack.push(&f.this);
2084                if let Some(ref sep) = f.separator {
2085                    stack.push(sep);
2086                }
2087                if let Some(ref filter) = f.filter {
2088                    stack.push(filter);
2089                }
2090                if let Some(ref limit) = f.limit {
2091                    stack.push(limit);
2092                }
2093            }
2094            Expression::ListAgg(f) => {
2095                stack.push(&f.this);
2096                if let Some(ref sep) = f.separator {
2097                    stack.push(sep);
2098                }
2099                if let Some(ref filter) = f.filter {
2100                    stack.push(filter);
2101                }
2102            }
2103            Expression::SumIf(f) => {
2104                stack.push(&f.this);
2105                stack.push(&f.condition);
2106                if let Some(ref filter) = f.filter {
2107                    stack.push(filter);
2108                }
2109            }
2110            Expression::DateAdd(f) | Expression::DateSub(f) => {
2111                stack.push(&f.this);
2112                stack.push(&f.interval);
2113            }
2114            Expression::DateDiff(f) => {
2115                stack.push(&f.this);
2116                stack.push(&f.expression);
2117            }
2118            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
2119                stack.push(&f.this);
2120            }
2121            Expression::Extract(f) => {
2122                stack.push(&f.this);
2123            }
2124            Expression::Round(f) => {
2125                stack.push(&f.this);
2126                if let Some(ref d) = f.decimals {
2127                    stack.push(d);
2128                }
2129            }
2130            Expression::Floor(f) => {
2131                stack.push(&f.this);
2132                if let Some(ref s) = f.scale {
2133                    stack.push(s);
2134                }
2135                if let Some(ref t) = f.to {
2136                    stack.push(t);
2137                }
2138            }
2139            Expression::Ceil(f) => {
2140                stack.push(&f.this);
2141                if let Some(ref d) = f.decimals {
2142                    stack.push(d);
2143                }
2144                if let Some(ref t) = f.to {
2145                    stack.push(t);
2146                }
2147            }
2148            Expression::Log(f) => {
2149                stack.push(&f.this);
2150                if let Some(ref b) = f.base {
2151                    stack.push(b);
2152                }
2153            }
2154            Expression::AtTimeZone(f) => {
2155                stack.push(&f.this);
2156                stack.push(&f.zone);
2157            }
2158            Expression::Lead(f) | Expression::Lag(f) => {
2159                stack.push(&f.this);
2160                if let Some(ref off) = f.offset {
2161                    stack.push(off);
2162                }
2163                if let Some(ref def) = f.default {
2164                    stack.push(def);
2165                }
2166            }
2167            Expression::FirstValue(f) | Expression::LastValue(f) => {
2168                stack.push(&f.this);
2169            }
2170            Expression::NthValue(f) => {
2171                stack.push(&f.this);
2172                stack.push(&f.offset);
2173            }
2174            Expression::Position(f) => {
2175                stack.push(&f.substring);
2176                stack.push(&f.string);
2177                if let Some(ref start) = f.start {
2178                    stack.push(start);
2179                }
2180            }
2181            Expression::Decode(f) => {
2182                stack.push(&f.this);
2183                for (search, result) in &f.search_results {
2184                    stack.push(search);
2185                    stack.push(result);
2186                }
2187                if let Some(ref def) = f.default {
2188                    stack.push(def);
2189                }
2190            }
2191            Expression::CharFunc(f) => {
2192                for arg in &f.args {
2193                    stack.push(arg);
2194                }
2195            }
2196            Expression::ArraySort(f) => {
2197                stack.push(&f.this);
2198                if let Some(ref cmp) = f.comparator {
2199                    stack.push(cmp);
2200                }
2201            }
2202            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2203                stack.push(&f.this);
2204                stack.push(&f.separator);
2205                if let Some(ref nr) = f.null_replacement {
2206                    stack.push(nr);
2207                }
2208            }
2209            Expression::ArrayFilter(f) => {
2210                stack.push(&f.this);
2211                stack.push(&f.filter);
2212            }
2213            Expression::ArrayTransform(f) => {
2214                stack.push(&f.this);
2215                stack.push(&f.transform);
2216            }
2217            Expression::Sequence(f)
2218            | Expression::Generate(f)
2219            | Expression::ExplodingGenerateSeries(f) => {
2220                stack.push(&f.start);
2221                stack.push(&f.stop);
2222                if let Some(ref step) = f.step {
2223                    stack.push(step);
2224                }
2225            }
2226            Expression::JsonExtract(f)
2227            | Expression::JsonExtractScalar(f)
2228            | Expression::JsonQuery(f)
2229            | Expression::JsonValue(f) => {
2230                stack.push(&f.this);
2231                stack.push(&f.path);
2232            }
2233            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2234                stack.push(&f.this);
2235                for p in &f.paths {
2236                    stack.push(p);
2237                }
2238            }
2239            Expression::JsonObject(f) => {
2240                for (k, v) in &f.pairs {
2241                    stack.push(k);
2242                    stack.push(v);
2243                }
2244            }
2245            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2246                stack.push(&f.this);
2247                for (path, val) in &f.path_values {
2248                    stack.push(path);
2249                    stack.push(val);
2250                }
2251            }
2252            Expression::Overlay(f) => {
2253                stack.push(&f.this);
2254                stack.push(&f.replacement);
2255                stack.push(&f.from);
2256                if let Some(ref len) = f.length {
2257                    stack.push(len);
2258                }
2259            }
2260            Expression::Convert(f) => {
2261                stack.push(&f.this);
2262                if let Some(ref style) = f.style {
2263                    stack.push(style);
2264                }
2265            }
2266            Expression::ApproxPercentile(f) => {
2267                stack.push(&f.this);
2268                stack.push(&f.percentile);
2269                if let Some(ref acc) = f.accuracy {
2270                    stack.push(acc);
2271                }
2272                if let Some(ref filter) = f.filter {
2273                    stack.push(filter);
2274                }
2275            }
2276            Expression::Percentile(f)
2277            | Expression::PercentileCont(f)
2278            | Expression::PercentileDisc(f) => {
2279                stack.push(&f.this);
2280                stack.push(&f.percentile);
2281                if let Some(ref filter) = f.filter {
2282                    stack.push(filter);
2283                }
2284            }
2285            Expression::WithinGroup(f) => {
2286                stack.push(&f.this);
2287            }
2288            Expression::Left(f) | Expression::Right(f) => {
2289                stack.push(&f.this);
2290                stack.push(&f.length);
2291            }
2292            Expression::Repeat(f) => {
2293                stack.push(&f.this);
2294                stack.push(&f.times);
2295            }
2296            Expression::Lpad(f) | Expression::Rpad(f) => {
2297                stack.push(&f.this);
2298                stack.push(&f.length);
2299                if let Some(ref fill) = f.fill {
2300                    stack.push(fill);
2301                }
2302            }
2303            Expression::Split(f) => {
2304                stack.push(&f.this);
2305                stack.push(&f.delimiter);
2306            }
2307            Expression::RegexpLike(f) => {
2308                stack.push(&f.this);
2309                stack.push(&f.pattern);
2310                if let Some(ref flags) = f.flags {
2311                    stack.push(flags);
2312                }
2313            }
2314            Expression::RegexpReplace(f) => {
2315                stack.push(&f.this);
2316                stack.push(&f.pattern);
2317                stack.push(&f.replacement);
2318                if let Some(ref flags) = f.flags {
2319                    stack.push(flags);
2320                }
2321            }
2322            Expression::RegexpExtract(f) => {
2323                stack.push(&f.this);
2324                stack.push(&f.pattern);
2325                if let Some(ref group) = f.group {
2326                    stack.push(group);
2327                }
2328            }
2329            Expression::ToDate(f) => {
2330                stack.push(&f.this);
2331                if let Some(ref fmt) = f.format {
2332                    stack.push(fmt);
2333                }
2334            }
2335            Expression::ToTimestamp(f) => {
2336                stack.push(&f.this);
2337                if let Some(ref fmt) = f.format {
2338                    stack.push(fmt);
2339                }
2340            }
2341            Expression::DateFormat(f) | Expression::FormatDate(f) => {
2342                stack.push(&f.this);
2343                stack.push(&f.format);
2344            }
2345            Expression::LastDay(f) => {
2346                stack.push(&f.this);
2347            }
2348            Expression::FromUnixtime(f) => {
2349                stack.push(&f.this);
2350                if let Some(ref fmt) = f.format {
2351                    stack.push(fmt);
2352                }
2353            }
2354            Expression::UnixTimestamp(f) => {
2355                if let Some(ref this) = f.this {
2356                    stack.push(this);
2357                }
2358                if let Some(ref fmt) = f.format {
2359                    stack.push(fmt);
2360                }
2361            }
2362            Expression::MakeDate(f) => {
2363                stack.push(&f.year);
2364                stack.push(&f.month);
2365                stack.push(&f.day);
2366            }
2367            Expression::MakeTimestamp(f) => {
2368                stack.push(&f.year);
2369                stack.push(&f.month);
2370                stack.push(&f.day);
2371                stack.push(&f.hour);
2372                stack.push(&f.minute);
2373                stack.push(&f.second);
2374                if let Some(ref tz) = f.timezone {
2375                    stack.push(tz);
2376                }
2377            }
2378            Expression::TruncFunc(f) => {
2379                stack.push(&f.this);
2380                if let Some(ref d) = f.decimals {
2381                    stack.push(d);
2382                }
2383            }
2384            Expression::ArrayFunc(f) => {
2385                for e in &f.expressions {
2386                    stack.push(e);
2387                }
2388            }
2389            Expression::Unnest(f) => {
2390                stack.push(&f.this);
2391                for e in &f.expressions {
2392                    stack.push(e);
2393                }
2394            }
2395            Expression::StructFunc(f) => {
2396                for (_, e) in &f.fields {
2397                    stack.push(e);
2398                }
2399            }
2400            Expression::StructExtract(f) => {
2401                stack.push(&f.this);
2402            }
2403            Expression::NamedStruct(f) => {
2404                for (k, v) in &f.pairs {
2405                    stack.push(k);
2406                    stack.push(v);
2407                }
2408            }
2409            Expression::MapFunc(f) => {
2410                for k in &f.keys {
2411                    stack.push(k);
2412                }
2413                for v in &f.values {
2414                    stack.push(v);
2415                }
2416            }
2417            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2418                stack.push(&f.this);
2419                stack.push(&f.transform);
2420            }
2421            Expression::JsonArrayAgg(f) => {
2422                stack.push(&f.this);
2423                if let Some(ref filter) = f.filter {
2424                    stack.push(filter);
2425                }
2426            }
2427            Expression::JsonObjectAgg(f) => {
2428                stack.push(&f.key);
2429                stack.push(&f.value);
2430                if let Some(ref filter) = f.filter {
2431                    stack.push(filter);
2432                }
2433            }
2434            Expression::NTile(f) => {
2435                if let Some(ref n) = f.num_buckets {
2436                    stack.push(n);
2437                }
2438            }
2439            Expression::Rand(f) => {
2440                if let Some(ref s) = f.seed {
2441                    stack.push(s);
2442                }
2443                if let Some(ref lo) = f.lower {
2444                    stack.push(lo);
2445                }
2446                if let Some(ref hi) = f.upper {
2447                    stack.push(hi);
2448                }
2449            }
2450            Expression::Any(q) | Expression::All(q) => {
2451                stack.push(&q.this);
2452                stack.push(&q.subquery);
2453            }
2454            Expression::Overlaps(o) => {
2455                if let Some(ref this) = o.this {
2456                    stack.push(this);
2457                }
2458                if let Some(ref expr) = o.expression {
2459                    stack.push(expr);
2460                }
2461                if let Some(ref ls) = o.left_start {
2462                    stack.push(ls);
2463                }
2464                if let Some(ref le) = o.left_end {
2465                    stack.push(le);
2466                }
2467                if let Some(ref rs) = o.right_start {
2468                    stack.push(rs);
2469                }
2470                if let Some(ref re) = o.right_end {
2471                    stack.push(re);
2472                }
2473            }
2474            Expression::Interval(i) => {
2475                if let Some(ref this) = i.this {
2476                    stack.push(this);
2477                }
2478            }
2479            Expression::TimeStrToTime(f) => {
2480                stack.push(&f.this);
2481                if let Some(ref zone) = f.zone {
2482                    stack.push(zone);
2483                }
2484            }
2485            Expression::JSONBExtractScalar(f) => {
2486                stack.push(&f.this);
2487                stack.push(&f.expression);
2488                if let Some(ref jt) = f.json_type {
2489                    stack.push(jt);
2490                }
2491            }
2492            Expression::JSONExtract(f) => {
2493                stack.push(&f.this);
2494                stack.push(&f.expression);
2495                for e in &f.expressions {
2496                    stack.push(e);
2497                }
2498                if let Some(ref option) = f.option {
2499                    stack.push(option);
2500                }
2501                if let Some(ref on_condition) = f.on_condition {
2502                    stack.push(on_condition);
2503                }
2504            }
2505
2506            // === True leaves and non-expression-bearing nodes ===
2507            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
2508            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
2509            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
2510            _ => {}
2511        }
2512    }
2513}
2514
2515// ---------------------------------------------------------------------------
2516// Tests
2517// ---------------------------------------------------------------------------
2518
2519#[cfg(test)]
2520mod tests {
2521    use super::*;
2522    use crate::dialects::{Dialect, DialectType};
2523    use crate::expressions::DataType;
2524    use crate::optimizer::annotate_types::annotate_types;
2525    use crate::parse_one;
2526    use crate::schema::{MappingSchema, Schema};
2527
2528    fn parse(sql: &str) -> Expression {
2529        let dialect = Dialect::get(DialectType::Generic);
2530        let ast = dialect.parse(sql).unwrap();
2531        ast.into_iter().next().unwrap()
2532    }
2533
2534    #[test]
2535    fn test_simple_lineage() {
2536        let expr = parse("SELECT a FROM t");
2537        let node = lineage("a", &expr, None, false).unwrap();
2538
2539        assert_eq!(node.name, "a");
2540        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2541        // Should trace to t.a
2542        let names = node.downstream_names();
2543        assert!(
2544            names.iter().any(|n| n == "t.a"),
2545            "Expected t.a in downstream, got: {:?}",
2546            names
2547        );
2548    }
2549
2550    #[test]
2551    fn test_lineage_walk() {
2552        let root = LineageNode {
2553            name: "col_a".to_string(),
2554            expression: Expression::Null(crate::expressions::Null),
2555            source: Expression::Null(crate::expressions::Null),
2556            downstream: vec![LineageNode::new(
2557                "t.a",
2558                Expression::Null(crate::expressions::Null),
2559                Expression::Null(crate::expressions::Null),
2560            )],
2561            source_name: String::new(),
2562            source_kind: SourceKind::Unknown,
2563            source_alias: None,
2564            reference_node_name: String::new(),
2565        };
2566
2567        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2568        assert_eq!(names.len(), 2);
2569        assert_eq!(names[0], "col_a");
2570        assert_eq!(names[1], "t.a");
2571    }
2572
2573    #[test]
2574    fn test_aliased_column() {
2575        let expr = parse("SELECT a + 1 AS b FROM t");
2576        let node = lineage("b", &expr, None, false).unwrap();
2577
2578        assert_eq!(node.name, "b");
2579        // Should trace through the expression to t.a
2580        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2581        assert!(
2582            all_names.iter().any(|n| n.contains("a")),
2583            "Expected to trace to column a, got: {:?}",
2584            all_names
2585        );
2586    }
2587
2588    #[test]
2589    fn test_qualified_column() {
2590        let expr = parse("SELECT t.a FROM t");
2591        let node = lineage("a", &expr, None, false).unwrap();
2592
2593        assert_eq!(node.name, "a");
2594        let names = node.downstream_names();
2595        assert!(
2596            names.iter().any(|n| n == "t.a"),
2597            "Expected t.a, got: {:?}",
2598            names
2599        );
2600    }
2601
2602    #[test]
2603    fn test_unqualified_column() {
2604        let expr = parse("SELECT a FROM t");
2605        let node = lineage("a", &expr, None, false).unwrap();
2606
2607        // Unqualified but single source → resolved to t.a
2608        let names = node.downstream_names();
2609        assert!(
2610            names.iter().any(|n| n == "t.a"),
2611            "Expected t.a, got: {:?}",
2612            names
2613        );
2614    }
2615
2616    #[test]
2617    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2618        let query = "SELECT name FROM users";
2619        let dialect = Dialect::get(DialectType::BigQuery);
2620        let expr = dialect
2621            .parse(query)
2622            .unwrap()
2623            .into_iter()
2624            .next()
2625            .expect("expected one expression");
2626
2627        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2628        schema
2629            .add_table("users", &[("name".into(), DataType::Text)], None)
2630            .expect("schema setup");
2631
2632        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2633            .expect("lineage without schema");
2634        let mut expr_without = node_without_schema.expression.clone();
2635        annotate_types(
2636            &mut expr_without,
2637            Some(&schema),
2638            Some(DialectType::BigQuery),
2639        );
2640        assert_eq!(
2641            expr_without.inferred_type(),
2642            None,
2643            "Expected unresolved root type without schema-aware lineage qualification"
2644        );
2645
2646        let node_with_schema = lineage_with_schema(
2647            "name",
2648            &expr,
2649            Some(&schema),
2650            Some(DialectType::BigQuery),
2651            false,
2652        )
2653        .expect("lineage with schema");
2654        let mut expr_with = node_with_schema.expression.clone();
2655        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2656
2657        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2658    }
2659
2660    #[test]
2661    fn test_lineage_with_schema_correlated_scalar_subquery() {
2662        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2663        let dialect = Dialect::get(DialectType::BigQuery);
2664        let expr = dialect
2665            .parse(query)
2666            .unwrap()
2667            .into_iter()
2668            .next()
2669            .expect("expected one expression");
2670
2671        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2672        schema
2673            .add_table(
2674                "t1",
2675                &[("id".into(), DataType::BigInt { length: None })],
2676                None,
2677            )
2678            .expect("schema setup");
2679        schema
2680            .add_table(
2681                "t2",
2682                &[
2683                    ("id".into(), DataType::BigInt { length: None }),
2684                    ("val".into(), DataType::BigInt { length: None }),
2685                ],
2686                None,
2687            )
2688            .expect("schema setup");
2689
2690        let node = lineage_with_schema(
2691            "id",
2692            &expr,
2693            Some(&schema),
2694            Some(DialectType::BigQuery),
2695            false,
2696        )
2697        .expect("lineage_with_schema should handle correlated scalar subqueries");
2698
2699        assert_eq!(node.name, "id");
2700    }
2701
2702    #[test]
2703    fn test_lineage_with_schema_join_using() {
2704        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2705        let dialect = Dialect::get(DialectType::BigQuery);
2706        let expr = dialect
2707            .parse(query)
2708            .unwrap()
2709            .into_iter()
2710            .next()
2711            .expect("expected one expression");
2712
2713        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2714        schema
2715            .add_table(
2716                "t1",
2717                &[("a".into(), DataType::BigInt { length: None })],
2718                None,
2719            )
2720            .expect("schema setup");
2721        schema
2722            .add_table(
2723                "t2",
2724                &[("a".into(), DataType::BigInt { length: None })],
2725                None,
2726            )
2727            .expect("schema setup");
2728
2729        let node = lineage_with_schema(
2730            "a",
2731            &expr,
2732            Some(&schema),
2733            Some(DialectType::BigQuery),
2734            false,
2735        )
2736        .expect("lineage_with_schema should handle JOIN USING");
2737
2738        assert_eq!(node.name, "a");
2739    }
2740
2741    #[test]
2742    fn test_lineage_with_schema_qualified_table_name() {
2743        let query = "SELECT a FROM raw.t1";
2744        let dialect = Dialect::get(DialectType::BigQuery);
2745        let expr = dialect
2746            .parse(query)
2747            .unwrap()
2748            .into_iter()
2749            .next()
2750            .expect("expected one expression");
2751
2752        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2753        schema
2754            .add_table(
2755                "raw.t1",
2756                &[("a".into(), DataType::BigInt { length: None })],
2757                None,
2758            )
2759            .expect("schema setup");
2760
2761        let node = lineage_with_schema(
2762            "a",
2763            &expr,
2764            Some(&schema),
2765            Some(DialectType::BigQuery),
2766            false,
2767        )
2768        .expect("lineage_with_schema should handle dotted schema.table names");
2769
2770        assert_eq!(node.name, "a");
2771    }
2772
2773    #[test]
2774    fn test_lineage_with_schema_none_matches_lineage() {
2775        let expr = parse("SELECT a FROM t");
2776        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2777        let with_none =
2778            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2779
2780        assert_eq!(with_none.name, baseline.name);
2781        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2782    }
2783
2784    #[test]
2785    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2786        let dialect = Dialect::get(DialectType::BigQuery);
2787        let expr = dialect
2788            .parse("SELECT Name AS name FROM teams")
2789            .unwrap()
2790            .into_iter()
2791            .next()
2792            .expect("expected one expression");
2793
2794        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2795        schema
2796            .add_table(
2797                "teams",
2798                &[("Name".into(), DataType::String { length: None })],
2799                None,
2800            )
2801            .expect("schema setup");
2802
2803        let node = lineage_with_schema(
2804            "name",
2805            &expr,
2806            Some(&schema),
2807            Some(DialectType::BigQuery),
2808            false,
2809        )
2810        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2811
2812        let names = node.downstream_names();
2813        assert!(
2814            names.iter().any(|n| n == "teams.Name"),
2815            "Expected teams.Name in downstream, got: {:?}",
2816            names
2817        );
2818    }
2819
2820    #[test]
2821    fn test_lineage_bigquery_mixed_case_alias_lookup() {
2822        let dialect = Dialect::get(DialectType::BigQuery);
2823        let expr = dialect
2824            .parse("SELECT Name AS Name FROM teams")
2825            .unwrap()
2826            .into_iter()
2827            .next()
2828            .expect("expected one expression");
2829
2830        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2831            .expect("lineage should resolve mixed-case aliases in BigQuery");
2832
2833        assert_eq!(node.name, "name");
2834    }
2835
2836    #[test]
2837    fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2838        let expr = parse_one(
2839            r#"
2840SELECT date_val AS week_start
2841FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2842"#,
2843            DialectType::BigQuery,
2844        )
2845        .expect("parse");
2846
2847        let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2848            .expect("lineage should resolve UNNEST alias as a source");
2849        let child = node
2850            .downstream
2851            .first()
2852            .expect("week_start should have downstream lineage");
2853
2854        assert_eq!(child.name, "_0.date_val");
2855        assert_eq!(child.source_name, "_0");
2856        assert_eq!(child.source_kind, SourceKind::Virtual);
2857        assert_eq!(child.source_alias.as_deref(), Some("date_val"));
2858
2859        let Expression::Column(column) = &child.expression else {
2860            panic!(
2861                "expected downstream column expression, got {:?}",
2862                child.expression
2863            );
2864        };
2865        assert_eq!(column.name.name, "date_val");
2866        assert_eq!(
2867            column.table.as_ref().map(|table| table.name.as_str()),
2868            Some("_0")
2869        );
2870        assert!(
2871            matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
2872            "expected UNNEST source expression, got {:?}",
2873            child.source
2874        );
2875    }
2876
2877    #[test]
2878    fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
2879        let expr =
2880            parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
2881
2882        let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2883        let child = node.downstream.first().expect("id should have lineage");
2884
2885        assert_eq!(child.name, "date_val.id");
2886        assert_eq!(child.source_name, "date_val");
2887        assert_eq!(child.source_kind, SourceKind::Table);
2888        assert_eq!(child.source_alias, None);
2889    }
2890
2891    #[test]
2892    fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
2893        let expr = parse_one(
2894            r#"
2895SELECT a.a AS first_value, b.b AS second_value
2896FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
2897JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
2898"#,
2899            DialectType::BigQuery,
2900        )
2901        .expect("parse");
2902
2903        let first =
2904            lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2905        let second =
2906            lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2907
2908        let first_child = first.downstream.first().expect("first source");
2909        let second_child = second.downstream.first().expect("second source");
2910
2911        assert_eq!(first_child.name, "_0.a");
2912        assert_eq!(first_child.source_name, "_0");
2913        assert_eq!(first_child.source_alias.as_deref(), Some("a"));
2914        assert_eq!(first_child.source_kind, SourceKind::Virtual);
2915
2916        assert_eq!(second_child.name, "_1.b");
2917        assert_eq!(second_child.source_name, "_1");
2918        assert_eq!(second_child.source_alias.as_deref(), Some("b"));
2919        assert_eq!(second_child.source_kind, SourceKind::Virtual);
2920    }
2921
2922    #[test]
2923    fn test_lineage_table_backed_unnest_points_to_real_source_column() {
2924        let expr = parse_one(
2925            r#"
2926SELECT item.item AS item
2927FROM t JOIN UNNEST(t.items) AS item ON TRUE
2928"#,
2929            DialectType::BigQuery,
2930        )
2931        .expect("parse");
2932
2933        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2934        let virtual_child = node.downstream.first().expect("virtual item source");
2935        assert_eq!(virtual_child.name, "_0.item");
2936        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2937
2938        let real_child = virtual_child
2939            .downstream
2940            .first()
2941            .expect("UNNEST(t.items) should depend on t.items");
2942        assert_eq!(real_child.name, "t.items");
2943        assert_eq!(real_child.source_name, "t");
2944        assert_eq!(real_child.source_kind, SourceKind::Table);
2945    }
2946
2947    #[test]
2948    fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
2949        let expr = parse_one(
2950            r#"
2951SELECT item AS item
2952FROM t JOIN UNNEST(t.items) AS item ON TRUE
2953"#,
2954            DialectType::BigQuery,
2955        )
2956        .expect("parse");
2957
2958        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2959        let virtual_child = node.downstream.first().expect("virtual item source");
2960        assert_eq!(virtual_child.name, "_0.item");
2961        assert_eq!(virtual_child.source_name, "_0");
2962        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2963        assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
2964
2965        let real_child = virtual_child
2966            .downstream
2967            .first()
2968            .expect("UNNEST(t.items) should depend on t.items");
2969        assert_eq!(real_child.name, "t.items");
2970        assert_eq!(real_child.source_name, "t");
2971        assert_eq!(real_child.source_kind, SourceKind::Table);
2972    }
2973
2974    #[test]
2975    fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
2976        let cases = [
2977            (
2978                DialectType::PostgreSQL,
2979                "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
2980            ),
2981            (
2982                DialectType::Presto,
2983                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
2984            ),
2985            (
2986                DialectType::Trino,
2987                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
2988            ),
2989        ];
2990
2991        for (dialect, sql) in cases {
2992            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
2993            let node = lineage("out", &expr, Some(dialect), false)
2994                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
2995            let virtual_child = node
2996                .downstream
2997                .first()
2998                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
2999
3000            assert_eq!(
3001                virtual_child.name, "_0.x",
3002                "unexpected virtual child for {dialect:?}"
3003            );
3004            assert_eq!(virtual_child.source_name, "_0");
3005            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3006            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3007
3008            let real_child = virtual_child
3009                .downstream
3010                .first()
3011                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3012            assert_eq!(real_child.name, "t.items");
3013            assert_eq!(real_child.source_kind, SourceKind::Table);
3014        }
3015    }
3016
3017    #[test]
3018    fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
3019        let cases = [
3020            (
3021                DialectType::Spark,
3022                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3023            ),
3024            (
3025                DialectType::Hive,
3026                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3027            ),
3028        ];
3029
3030        for (dialect, sql) in cases {
3031            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3032            let node = lineage("out", &expr, Some(dialect), false)
3033                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3034            let virtual_child = node
3035                .downstream
3036                .first()
3037                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3038
3039            assert_eq!(virtual_child.name, "_0.x");
3040            assert_eq!(virtual_child.source_name, "_0");
3041            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3042            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3043
3044            let real_child = virtual_child
3045                .downstream
3046                .first()
3047                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3048            assert_eq!(real_child.name, "t.items");
3049            assert_eq!(real_child.source_kind, SourceKind::Table);
3050        }
3051    }
3052
3053    #[test]
3054    fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
3055        let expr = parse_one(
3056            "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
3057            DialectType::Snowflake,
3058        )
3059        .expect("parse");
3060
3061        let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
3062        let virtual_child = node.downstream.first().expect("virtual flatten source");
3063        assert_eq!(virtual_child.name, "_0.value");
3064        assert_eq!(virtual_child.source_name, "_0");
3065        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3066        assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
3067
3068        let real_child = virtual_child
3069            .downstream
3070            .first()
3071            .expect("FLATTEN input should depend on raw_events.payload");
3072        assert_eq!(real_child.name, "raw_events.payload");
3073        assert_eq!(real_child.source_kind, SourceKind::Table);
3074    }
3075
3076    #[test]
3077    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
3078        let expr = parse_one(
3079            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3080            DialectType::Snowflake,
3081        )
3082        .expect("parse");
3083
3084        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3085        schema
3086            .add_table(
3087                "fact.some_daily_metrics",
3088                &[("date_utc".to_string(), DataType::Date)],
3089                None,
3090            )
3091            .expect("schema setup");
3092
3093        let node = lineage_with_schema(
3094            "recency",
3095            &expr,
3096            Some(&schema),
3097            Some(DialectType::Snowflake),
3098            false,
3099        )
3100        .expect("lineage_with_schema should not treat date part as a column");
3101
3102        let names = node.downstream_names();
3103        assert!(
3104            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3105            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3106            names
3107        );
3108        assert!(
3109            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3110            "Did not expect date part to appear as lineage column, got: {:?}",
3111            names
3112        );
3113    }
3114
3115    #[test]
3116    fn test_snowflake_datediff_parses_to_typed_ast() {
3117        let expr = parse_one(
3118            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3119            DialectType::Snowflake,
3120        )
3121        .expect("parse");
3122
3123        match expr {
3124            Expression::Select(select) => match &select.expressions[0] {
3125                Expression::Alias(alias) => match &alias.this {
3126                    Expression::DateDiff(f) => {
3127                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
3128                    }
3129                    other => panic!("expected DateDiff, got {other:?}"),
3130                },
3131                other => panic!("expected Alias, got {other:?}"),
3132            },
3133            other => panic!("expected Select, got {other:?}"),
3134        }
3135    }
3136
3137    #[test]
3138    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
3139        let expr = parse_one(
3140            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3141            DialectType::Snowflake,
3142        )
3143        .expect("parse");
3144
3145        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3146        schema
3147            .add_table(
3148                "fact.some_daily_metrics",
3149                &[("date_utc".to_string(), DataType::Date)],
3150                None,
3151            )
3152            .expect("schema setup");
3153
3154        let node = lineage_with_schema(
3155            "next_day",
3156            &expr,
3157            Some(&schema),
3158            Some(DialectType::Snowflake),
3159            false,
3160        )
3161        .expect("lineage_with_schema should not treat DATEADD date part as a column");
3162
3163        let names = node.downstream_names();
3164        assert!(
3165            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3166            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3167            names
3168        );
3169        assert!(
3170            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3171            "Did not expect date part to appear as lineage column, got: {:?}",
3172            names
3173        );
3174    }
3175
3176    #[test]
3177    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
3178        let expr = parse_one(
3179            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3180            DialectType::Snowflake,
3181        )
3182        .expect("parse");
3183
3184        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3185        schema
3186            .add_table(
3187                "fact.some_daily_metrics",
3188                &[("date_utc".to_string(), DataType::Date)],
3189                None,
3190            )
3191            .expect("schema setup");
3192
3193        let node = lineage_with_schema(
3194            "day_part",
3195            &expr,
3196            Some(&schema),
3197            Some(DialectType::Snowflake),
3198            false,
3199        )
3200        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
3201
3202        let names = node.downstream_names();
3203        assert!(
3204            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3205            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3206            names
3207        );
3208        assert!(
3209            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3210            "Did not expect date part to appear as lineage column, got: {:?}",
3211            names
3212        );
3213    }
3214
3215    #[test]
3216    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
3217        let expr = parse_one(
3218            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3219            DialectType::Snowflake,
3220        )
3221        .expect("parse");
3222
3223        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3224        schema
3225            .add_table(
3226                "fact.some_daily_metrics",
3227                &[("date_utc".to_string(), DataType::Date)],
3228                None,
3229            )
3230            .expect("schema setup");
3231
3232        let node = lineage_with_schema(
3233            "day_part",
3234            &expr,
3235            Some(&schema),
3236            Some(DialectType::Snowflake),
3237            false,
3238        )
3239        .expect("quoted DATE_PART should continue to work");
3240
3241        let names = node.downstream_names();
3242        assert!(
3243            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3244            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3245            names
3246        );
3247    }
3248
3249    #[test]
3250    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
3251        let expr = parse_one(
3252            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3253            DialectType::Snowflake,
3254        )
3255        .expect("parse");
3256
3257        match expr {
3258            Expression::Select(select) => match &select.expressions[0] {
3259                Expression::Alias(alias) => match &alias.this {
3260                    Expression::Function(f) => {
3261                        assert_eq!(f.name.to_uppercase(), "DATEADD");
3262                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3263                    }
3264                    other => panic!("expected generic DATEADD function, got {other:?}"),
3265                },
3266                other => panic!("expected Alias, got {other:?}"),
3267            },
3268            other => panic!("expected Select, got {other:?}"),
3269        }
3270    }
3271
3272    #[test]
3273    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
3274        let expr = parse_one(
3275            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3276            DialectType::Snowflake,
3277        )
3278        .expect("parse");
3279
3280        match expr {
3281            Expression::Select(select) => match &select.expressions[0] {
3282                Expression::Alias(alias) => match &alias.this {
3283                    Expression::Function(f) => {
3284                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
3285                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3286                    }
3287                    other => panic!("expected generic DATE_PART function, got {other:?}"),
3288                },
3289                other => panic!("expected Alias, got {other:?}"),
3290            },
3291            other => panic!("expected Select, got {other:?}"),
3292        }
3293    }
3294
3295    #[test]
3296    fn test_snowflake_date_part_string_literal_stays_generic_function() {
3297        let expr = parse_one(
3298            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3299            DialectType::Snowflake,
3300        )
3301        .expect("parse");
3302
3303        match expr {
3304            Expression::Select(select) => match &select.expressions[0] {
3305                Expression::Alias(alias) => match &alias.this {
3306                    Expression::Function(f) => {
3307                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
3308                    }
3309                    other => panic!("expected generic DATE_PART function, got {other:?}"),
3310                },
3311                other => panic!("expected Alias, got {other:?}"),
3312            },
3313            other => panic!("expected Select, got {other:?}"),
3314        }
3315    }
3316
3317    #[test]
3318    fn test_lineage_join() {
3319        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3320
3321        let node_a = lineage("a", &expr, None, false).unwrap();
3322        let names_a = node_a.downstream_names();
3323        assert!(
3324            names_a.iter().any(|n| n == "t.a"),
3325            "Expected t.a, got: {:?}",
3326            names_a
3327        );
3328
3329        let node_b = lineage("b", &expr, None, false).unwrap();
3330        let names_b = node_b.downstream_names();
3331        assert!(
3332            names_b.iter().any(|n| n == "s.b"),
3333            "Expected s.b, got: {:?}",
3334            names_b
3335        );
3336    }
3337
3338    #[test]
3339    fn test_lineage_alias_leaf_has_resolved_source_name() {
3340        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
3341        let node = lineage("col1", &expr, None, false).unwrap();
3342
3343        // Keep alias in the display lineage edge.
3344        let names = node.downstream_names();
3345        assert!(
3346            names.iter().any(|n| n == "t1.col1"),
3347            "Expected aliased column edge t1.col1, got: {:?}",
3348            names
3349        );
3350
3351        // Leaf should expose the resolved base table for consumers.
3352        let leaf = node
3353            .downstream
3354            .iter()
3355            .find(|n| n.name == "t1.col1")
3356            .expect("Expected t1.col1 leaf");
3357        assert_eq!(leaf.source_name, "table1");
3358        match &leaf.source {
3359            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
3360            _ => panic!("Expected leaf source to be a table expression"),
3361        }
3362    }
3363
3364    #[test]
3365    fn test_lineage_derived_table() {
3366        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
3367        let node = lineage("a", &expr, None, false).unwrap();
3368
3369        assert_eq!(node.name, "a");
3370        // Should trace through the derived table to t.a
3371        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3372        assert!(
3373            all_names.iter().any(|n| n == "t.a"),
3374            "Expected to trace through derived table to t.a, got: {:?}",
3375            all_names
3376        );
3377    }
3378
3379    #[test]
3380    fn test_lineage_cte() {
3381        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
3382        let node = lineage("a", &expr, None, false).unwrap();
3383
3384        assert_eq!(node.name, "a");
3385        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3386        assert!(
3387            all_names.iter().any(|n| n == "t.a"),
3388            "Expected to trace through CTE to t.a, got: {:?}",
3389            all_names
3390        );
3391    }
3392
3393    #[test]
3394    fn test_lineage_union() {
3395        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
3396        let node = lineage("a", &expr, None, false).unwrap();
3397
3398        assert_eq!(node.name, "a");
3399        // Should have 2 downstream branches
3400        assert_eq!(
3401            node.downstream.len(),
3402            2,
3403            "Expected 2 branches for UNION, got {}",
3404            node.downstream.len()
3405        );
3406    }
3407
3408    #[test]
3409    fn test_lineage_cte_union() {
3410        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
3411        let node = lineage("a", &expr, None, false).unwrap();
3412
3413        // Should trace through CTE into both UNION branches
3414        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3415        assert!(
3416            all_names.len() >= 3,
3417            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
3418            all_names
3419        );
3420    }
3421
3422    #[test]
3423    fn test_lineage_star() {
3424        let expr = parse("SELECT * FROM t");
3425        let node = lineage("*", &expr, None, false).unwrap();
3426
3427        assert_eq!(node.name, "*");
3428        // Should have downstream for table t
3429        assert!(
3430            !node.downstream.is_empty(),
3431            "Star should produce downstream nodes"
3432        );
3433    }
3434
3435    #[test]
3436    fn test_lineage_subquery_in_select() {
3437        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
3438        let node = lineage("x", &expr, None, false).unwrap();
3439
3440        assert_eq!(node.name, "x");
3441        // Should have traced into the scalar subquery
3442        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3443        assert!(
3444            all_names.len() >= 2,
3445            "Expected tracing into scalar subquery, got: {:?}",
3446            all_names
3447        );
3448    }
3449
3450    #[test]
3451    fn test_lineage_multiple_columns() {
3452        let expr = parse("SELECT a, b FROM t");
3453
3454        let node_a = lineage("a", &expr, None, false).unwrap();
3455        let node_b = lineage("b", &expr, None, false).unwrap();
3456
3457        assert_eq!(node_a.name, "a");
3458        assert_eq!(node_b.name, "b");
3459
3460        // Each should trace independently
3461        let names_a = node_a.downstream_names();
3462        let names_b = node_b.downstream_names();
3463        assert!(names_a.iter().any(|n| n == "t.a"));
3464        assert!(names_b.iter().any(|n| n == "t.b"));
3465    }
3466
3467    #[test]
3468    fn test_get_source_tables() {
3469        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3470        let node = lineage("a", &expr, None, false).unwrap();
3471
3472        let tables = get_source_tables(&node);
3473        assert!(
3474            tables.contains("t"),
3475            "Expected source table 't', got: {:?}",
3476            tables
3477        );
3478    }
3479
3480    #[test]
3481    fn test_lineage_column_not_found() {
3482        let expr = parse("SELECT a FROM t");
3483        let result = lineage("nonexistent", &expr, None, false);
3484        assert!(result.is_err());
3485    }
3486
3487    #[test]
3488    fn test_lineage_nested_cte() {
3489        let expr = parse(
3490            "WITH cte1 AS (SELECT a FROM t), \
3491             cte2 AS (SELECT a FROM cte1) \
3492             SELECT a FROM cte2",
3493        );
3494        let node = lineage("a", &expr, None, false).unwrap();
3495
3496        // Should trace through cte2 → cte1 → t
3497        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3498        assert!(
3499            all_names.len() >= 3,
3500            "Expected to trace through nested CTEs, got: {:?}",
3501            all_names
3502        );
3503    }
3504
3505    #[test]
3506    fn test_trim_selects_true() {
3507        let expr = parse("SELECT a, b, c FROM t");
3508        let node = lineage("a", &expr, None, true).unwrap();
3509
3510        // The source should be trimmed to only include 'a'
3511        if let Expression::Select(select) = &node.source {
3512            assert_eq!(
3513                select.expressions.len(),
3514                1,
3515                "Trimmed source should have 1 expression, got {}",
3516                select.expressions.len()
3517            );
3518        } else {
3519            panic!("Expected Select source");
3520        }
3521    }
3522
3523    #[test]
3524    fn test_trim_selects_false() {
3525        let expr = parse("SELECT a, b, c FROM t");
3526        let node = lineage("a", &expr, None, false).unwrap();
3527
3528        // The source should keep all columns
3529        if let Expression::Select(select) = &node.source {
3530            assert_eq!(
3531                select.expressions.len(),
3532                3,
3533                "Untrimmed source should have 3 expressions"
3534            );
3535        } else {
3536            panic!("Expected Select source");
3537        }
3538    }
3539
3540    #[test]
3541    fn test_lineage_expression_in_select() {
3542        let expr = parse("SELECT a + b AS c FROM t");
3543        let node = lineage("c", &expr, None, false).unwrap();
3544
3545        // Should trace to both a and b from t
3546        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3547        assert!(
3548            all_names.len() >= 3,
3549            "Expected to trace a + b to both columns, got: {:?}",
3550            all_names
3551        );
3552    }
3553
3554    #[test]
3555    fn test_set_operation_by_index() {
3556        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3557
3558        // Trace column "a" which is at index 0
3559        let node = lineage("a", &expr, None, false).unwrap();
3560
3561        // UNION branches should be traced by index
3562        assert_eq!(node.downstream.len(), 2);
3563    }
3564
3565    // --- Tests for column lineage inside function calls (issue #18) ---
3566
3567    fn print_node(node: &LineageNode, indent: usize) {
3568        let pad = "  ".repeat(indent);
3569        println!(
3570            "{pad}name={:?} source_name={:?}",
3571            node.name, node.source_name
3572        );
3573        for child in &node.downstream {
3574            print_node(child, indent + 1);
3575        }
3576    }
3577
3578    #[test]
3579    fn test_issue18_repro() {
3580        // Exact scenario from the issue
3581        let query = "SELECT UPPER(name) as upper_name FROM users";
3582        println!("Query: {query}\n");
3583
3584        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3585        let exprs = dialect.parse(query).unwrap();
3586        let expr = &exprs[0];
3587
3588        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3589        println!("lineage(\"upper_name\"):");
3590        print_node(&node, 1);
3591
3592        let names = node.downstream_names();
3593        assert!(
3594            names.iter().any(|n| n == "users.name"),
3595            "Expected users.name in downstream, got: {:?}",
3596            names
3597        );
3598    }
3599
3600    #[test]
3601    fn test_lineage_bigquery_safe_namespace_issue207() {
3602        let query = r#"
3603WITH import_cte AS (
3604  SELECT timestamp, data, operation
3605  FROM `project`.`dataset`.`source_table`
3606),
3607transform_cte AS (
3608  SELECT
3609    timestamp,
3610    SAFE.PARSE_JSON(data) AS json_data
3611  FROM import_cte
3612)
3613SELECT json_data FROM transform_cte
3614"#;
3615        let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3616        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3617            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3618        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3619
3620        assert!(
3621            names.iter().any(|name| name == "source_table.data"),
3622            "expected source_table.data in lineage, got {names:?}"
3623        );
3624        assert!(
3625            !names
3626                .iter()
3627                .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3628            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3629        );
3630    }
3631
3632    #[test]
3633    fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3634        let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3635        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3636            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3637        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3638
3639        assert!(
3640            names.iter().any(|name| name == "t.data"),
3641            "expected t.data in lineage, got {names:?}"
3642        );
3643        assert!(
3644            !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3645            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3646        );
3647    }
3648
3649    #[test]
3650    fn test_lineage_method_call_receiver_control() {
3651        let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3652        let node = lineage("out", &expr, None, false).expect("lineage");
3653        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3654
3655        assert!(
3656            names.iter().any(|name| name == "t.obj"),
3657            "expected ordinary method receiver to remain in lineage, got {names:?}"
3658        );
3659        assert!(
3660            names.iter().any(|name| name == "t.arg"),
3661            "expected method argument in lineage, got {names:?}"
3662        );
3663    }
3664
3665    #[test]
3666    fn test_lineage_upper_function() {
3667        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3668        let node = lineage("upper_name", &expr, None, false).unwrap();
3669
3670        let names = node.downstream_names();
3671        assert!(
3672            names.iter().any(|n| n == "users.name"),
3673            "Expected users.name in downstream, got: {:?}",
3674            names
3675        );
3676    }
3677
3678    #[test]
3679    fn test_lineage_round_function() {
3680        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3681        let node = lineage("rounded", &expr, None, false).unwrap();
3682
3683        let names = node.downstream_names();
3684        assert!(
3685            names.iter().any(|n| n == "products.price"),
3686            "Expected products.price in downstream, got: {:?}",
3687            names
3688        );
3689    }
3690
3691    #[test]
3692    fn test_lineage_coalesce_function() {
3693        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3694        let node = lineage("val", &expr, None, false).unwrap();
3695
3696        let names = node.downstream_names();
3697        assert!(
3698            names.iter().any(|n| n == "t.a"),
3699            "Expected t.a in downstream, got: {:?}",
3700            names
3701        );
3702        assert!(
3703            names.iter().any(|n| n == "t.b"),
3704            "Expected t.b in downstream, got: {:?}",
3705            names
3706        );
3707    }
3708
3709    #[test]
3710    fn test_lineage_count_function() {
3711        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3712        let node = lineage("cnt", &expr, None, false).unwrap();
3713
3714        let names = node.downstream_names();
3715        assert!(
3716            names.iter().any(|n| n == "t.id"),
3717            "Expected t.id in downstream, got: {:?}",
3718            names
3719        );
3720    }
3721
3722    #[test]
3723    fn test_lineage_sum_function() {
3724        let expr = parse("SELECT SUM(amount) AS total FROM t");
3725        let node = lineage("total", &expr, None, false).unwrap();
3726
3727        let names = node.downstream_names();
3728        assert!(
3729            names.iter().any(|n| n == "t.amount"),
3730            "Expected t.amount in downstream, got: {:?}",
3731            names
3732        );
3733    }
3734
3735    #[test]
3736    fn test_lineage_case_with_nested_functions() {
3737        let expr =
3738            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3739        let node = lineage("result", &expr, None, false).unwrap();
3740
3741        let names = node.downstream_names();
3742        assert!(
3743            names.iter().any(|n| n == "t.x"),
3744            "Expected t.x in downstream, got: {:?}",
3745            names
3746        );
3747        assert!(
3748            names.iter().any(|n| n == "t.name"),
3749            "Expected t.name in downstream, got: {:?}",
3750            names
3751        );
3752    }
3753
3754    #[test]
3755    fn test_lineage_substring_function() {
3756        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3757        let node = lineage("short", &expr, None, false).unwrap();
3758
3759        let names = node.downstream_names();
3760        assert!(
3761            names.iter().any(|n| n == "t.name"),
3762            "Expected t.name in downstream, got: {:?}",
3763            names
3764        );
3765    }
3766
3767    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
3768
3769    #[test]
3770    fn test_lineage_cte_select_star() {
3771        // Ported from sqlglot: test_lineage_source_with_star
3772        // WITH y AS (SELECT * FROM x) SELECT a FROM y
3773        // After star expansion: SELECT y.a AS a FROM y
3774        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3775        let node = lineage("a", &expr, None, false).unwrap();
3776
3777        assert_eq!(node.name, "a");
3778        // Should successfully resolve column 'a' through the CTE
3779        // (previously failed with "Cannot find column 'a' in query")
3780        assert!(
3781            !node.downstream.is_empty(),
3782            "Expected downstream nodes tracing through CTE, got none"
3783        );
3784    }
3785
3786    #[test]
3787    fn test_lineage_cte_select_star_renamed_column() {
3788        // dbt standard pattern: CTE with column rename + outer SELECT *
3789        // This is the primary use case for dbt projects (jaffle-shop etc.)
3790        let expr =
3791            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3792        let node = lineage("customer_id", &expr, None, false).unwrap();
3793
3794        assert_eq!(node.name, "customer_id");
3795        // Should trace customer_id → renamed CTE → source.id
3796        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3797        assert!(
3798            all_names.len() >= 2,
3799            "Expected at least 2 nodes (customer_id → source), got: {:?}",
3800            all_names
3801        );
3802    }
3803
3804    #[test]
3805    fn test_lineage_cte_select_star_multiple_columns() {
3806        // CTE exposes multiple columns, outer SELECT * should resolve each
3807        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3808
3809        for col in &["a", "b", "c"] {
3810            let node = lineage(col, &expr, None, false).unwrap();
3811            assert_eq!(node.name, *col);
3812            // Verify lineage resolves without error (star expanded to explicit columns)
3813            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3814            assert!(
3815                all_names.len() >= 2,
3816                "Expected at least 2 nodes for column {}, got: {:?}",
3817                col,
3818                all_names
3819            );
3820        }
3821    }
3822
3823    #[test]
3824    fn test_lineage_nested_cte_select_star() {
3825        // Nested CTE star expansion: cte2 references cte1 via SELECT *
3826        let expr = parse(
3827            "WITH cte1 AS (SELECT a FROM t), \
3828             cte2 AS (SELECT * FROM cte1) \
3829             SELECT * FROM cte2",
3830        );
3831        let node = lineage("a", &expr, None, false).unwrap();
3832
3833        assert_eq!(node.name, "a");
3834        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3835        assert!(
3836            all_names.len() >= 3,
3837            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3838            all_names
3839        );
3840    }
3841
3842    #[test]
3843    fn test_lineage_three_level_nested_cte_star() {
3844        // Three-level nested CTE: cte3 → cte2 → cte1 → t
3845        let expr = parse(
3846            "WITH cte1 AS (SELECT x FROM t), \
3847             cte2 AS (SELECT * FROM cte1), \
3848             cte3 AS (SELECT * FROM cte2) \
3849             SELECT * FROM cte3",
3850        );
3851        let node = lineage("x", &expr, None, false).unwrap();
3852
3853        assert_eq!(node.name, "x");
3854        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3855        assert!(
3856            all_names.len() >= 4,
3857            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3858            all_names
3859        );
3860    }
3861
3862    #[test]
3863    fn test_lineage_cte_union_star() {
3864        // CTE with UNION body, outer SELECT * should resolve from left branch
3865        let expr = parse(
3866            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3867             SELECT * FROM cte",
3868        );
3869        let node = lineage("a", &expr, None, false).unwrap();
3870
3871        assert_eq!(node.name, "a");
3872        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3873        assert!(
3874            all_names.len() >= 2,
3875            "Expected at least 2 nodes for CTE union star, got: {:?}",
3876            all_names
3877        );
3878    }
3879
3880    #[test]
3881    fn test_lineage_cte_star_unknown_table() {
3882        // When CTE references an unknown table, star expansion is skipped gracefully
3883        // and lineage falls back to normal resolution (which may fail)
3884        let expr = parse(
3885            "WITH cte AS (SELECT * FROM unknown_table) \
3886             SELECT * FROM cte",
3887        );
3888        // This should not panic — it may succeed or fail depending on resolution,
3889        // but should not crash
3890        let _result = lineage("x", &expr, None, false);
3891    }
3892
3893    #[test]
3894    fn test_lineage_cte_explicit_columns() {
3895        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
3896        let expr = parse(
3897            "WITH cte(x, y) AS (SELECT a, b FROM t) \
3898             SELECT * FROM cte",
3899        );
3900        let node = lineage("x", &expr, None, false).unwrap();
3901        assert_eq!(node.name, "x");
3902    }
3903
3904    #[test]
3905    fn test_lineage_cte_qualified_star() {
3906        // Qualified star: SELECT cte.* FROM cte
3907        let expr = parse(
3908            "WITH cte AS (SELECT a, b FROM t) \
3909             SELECT cte.* FROM cte",
3910        );
3911        for col in &["a", "b"] {
3912            let node = lineage(col, &expr, None, false).unwrap();
3913            assert_eq!(node.name, *col);
3914            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3915            assert!(
3916                all_names.len() >= 2,
3917                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3918                col,
3919                all_names
3920            );
3921        }
3922    }
3923
3924    #[test]
3925    fn test_lineage_subquery_select_star() {
3926        // Ported from sqlglot: test_select_star
3927        // SELECT x FROM (SELECT * FROM table_a)
3928        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3929        let node = lineage("x", &expr, None, false).unwrap();
3930
3931        assert_eq!(node.name, "x");
3932        assert!(
3933            !node.downstream.is_empty(),
3934            "Expected downstream nodes for subquery with SELECT *, got none"
3935        );
3936    }
3937
3938    #[test]
3939    fn test_lineage_cte_star_with_schema_external_table() {
3940        // CTE references an external table via SELECT * — schema enables expansion
3941        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3942SELECT * FROM orders"#;
3943        let expr = parse(sql);
3944
3945        let mut schema = MappingSchema::new();
3946        let cols = vec![
3947            ("order_id".to_string(), DataType::Unknown),
3948            ("customer_id".to_string(), DataType::Unknown),
3949            ("amount".to_string(), DataType::Unknown),
3950        ];
3951        schema.add_table("stg_orders", &cols, None).unwrap();
3952
3953        let node =
3954            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3955                .unwrap();
3956        assert_eq!(node.name, "order_id");
3957    }
3958
3959    #[test]
3960    fn test_lineage_cte_star_with_schema_three_part_name() {
3961        // CTE references an external table with fully-qualified 3-part name
3962        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3963SELECT * FROM orders"#;
3964        let expr = parse(sql);
3965
3966        let mut schema = MappingSchema::new();
3967        let cols = vec![
3968            ("order_id".to_string(), DataType::Unknown),
3969            ("customer_id".to_string(), DataType::Unknown),
3970        ];
3971        schema
3972            .add_table("db.schema.stg_orders", &cols, None)
3973            .unwrap();
3974
3975        let node = lineage_with_schema(
3976            "customer_id",
3977            &expr,
3978            Some(&schema as &dyn Schema),
3979            None,
3980            false,
3981        )
3982        .unwrap();
3983        assert_eq!(node.name, "customer_id");
3984    }
3985
3986    #[test]
3987    fn test_lineage_cte_star_with_schema_nested() {
3988        // Nested CTEs: outer CTE references inner CTE with SELECT *,
3989        // inner CTE references external table with SELECT *
3990        let sql = r#"WITH
3991            raw AS (SELECT * FROM external_table),
3992            enriched AS (SELECT * FROM raw)
3993        SELECT * FROM enriched"#;
3994        let expr = parse(sql);
3995
3996        let mut schema = MappingSchema::new();
3997        let cols = vec![
3998            ("id".to_string(), DataType::Unknown),
3999            ("name".to_string(), DataType::Unknown),
4000        ];
4001        schema.add_table("external_table", &cols, None).unwrap();
4002
4003        let node =
4004            lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4005        assert_eq!(node.name, "name");
4006    }
4007
4008    #[test]
4009    fn test_lineage_cte_qualified_star_with_schema() {
4010        // CTE uses qualified star (orders.*) from a CTE whose columns
4011        // come from an external table via SELECT *
4012        let sql = r#"WITH
4013            orders AS (SELECT * FROM stg_orders),
4014            enriched AS (
4015                SELECT orders.*, 'extra' AS extra
4016                FROM orders
4017            )
4018        SELECT * FROM enriched"#;
4019        let expr = parse(sql);
4020
4021        let mut schema = MappingSchema::new();
4022        let cols = vec![
4023            ("order_id".to_string(), DataType::Unknown),
4024            ("total".to_string(), DataType::Unknown),
4025        ];
4026        schema.add_table("stg_orders", &cols, None).unwrap();
4027
4028        let node =
4029            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4030                .unwrap();
4031        assert_eq!(node.name, "order_id");
4032
4033        // Also verify the extra column works
4034        let extra =
4035            lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4036        assert_eq!(extra.name, "extra");
4037    }
4038
4039    #[test]
4040    fn test_lineage_cte_star_without_schema_still_works() {
4041        // Without schema, CTE-to-CTE star expansion still works
4042        let sql = r#"WITH
4043            cte1 AS (SELECT id, name FROM raw_table),
4044            cte2 AS (SELECT * FROM cte1)
4045        SELECT * FROM cte2"#;
4046        let expr = parse(sql);
4047
4048        // No schema — should still resolve through CTE chain
4049        let node = lineage("id", &expr, None, false).unwrap();
4050        assert_eq!(node.name, "id");
4051    }
4052
4053    #[test]
4054    fn test_lineage_nested_cte_star_with_join_and_schema() {
4055        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
4056        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
4057        let sql = r#"WITH
4058base_orders AS (
4059    SELECT * FROM stg_orders
4060),
4061with_payments AS (
4062    SELECT
4063        base_orders.*,
4064        p.amount
4065    FROM base_orders
4066    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
4067),
4068final_cte AS (
4069    SELECT * FROM with_payments
4070)
4071SELECT * FROM final_cte"#;
4072        let expr = parse(sql);
4073
4074        let mut schema = MappingSchema::new();
4075        let order_cols = vec![
4076            (
4077                "order_id".to_string(),
4078                crate::expressions::DataType::Unknown,
4079            ),
4080            (
4081                "customer_id".to_string(),
4082                crate::expressions::DataType::Unknown,
4083            ),
4084            ("status".to_string(), crate::expressions::DataType::Unknown),
4085        ];
4086        let pay_cols = vec![
4087            (
4088                "payment_id".to_string(),
4089                crate::expressions::DataType::Unknown,
4090            ),
4091            (
4092                "order_id".to_string(),
4093                crate::expressions::DataType::Unknown,
4094            ),
4095            ("amount".to_string(), crate::expressions::DataType::Unknown),
4096        ];
4097        schema.add_table("stg_orders", &order_cols, None).unwrap();
4098        schema.add_table("stg_payments", &pay_cols, None).unwrap();
4099
4100        // order_id should trace back to stg_orders
4101        let node =
4102            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4103                .unwrap();
4104        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4105
4106        // The leaf should be "stg_orders.order_id" (not just "order_id")
4107        let has_table_qualified = all_names
4108            .iter()
4109            .any(|n| n.contains('.') && n.contains("order_id"));
4110        assert!(
4111            has_table_qualified,
4112            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
4113            all_names
4114        );
4115
4116        // amount should trace back to stg_payments
4117        let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
4118            .unwrap();
4119        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4120
4121        let has_table_qualified = all_names
4122            .iter()
4123            .any(|n| n.contains('.') && n.contains("amount"));
4124        assert!(
4125            has_table_qualified,
4126            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
4127            all_names
4128        );
4129    }
4130
4131    #[test]
4132    fn test_lineage_cte_alias_resolution() {
4133        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
4134        let sql = r#"WITH import_stg_items AS (
4135    SELECT item_id, name, status FROM stg_items
4136)
4137SELECT base.item_id, base.status
4138FROM import_stg_items AS base"#;
4139        let expr = parse(sql);
4140
4141        let node = lineage("item_id", &expr, None, false).unwrap();
4142        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4143        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
4144        assert!(
4145            all_names.iter().any(|n| n == "stg_items.item_id"),
4146            "Expected leaf 'stg_items.item_id', got: {:?}",
4147            all_names
4148        );
4149    }
4150
4151    #[test]
4152    fn test_lineage_cte_alias_with_schema_and_star() {
4153        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
4154        let sql = r#"WITH import_stg AS (
4155    SELECT * FROM stg_items
4156)
4157SELECT base.item_id, base.status
4158FROM import_stg AS base"#;
4159        let expr = parse(sql);
4160
4161        let mut schema = MappingSchema::new();
4162        schema
4163            .add_table(
4164                "stg_items",
4165                &[
4166                    ("item_id".to_string(), DataType::Unknown),
4167                    ("name".to_string(), DataType::Unknown),
4168                    ("status".to_string(), DataType::Unknown),
4169                ],
4170                None,
4171            )
4172            .unwrap();
4173
4174        let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
4175            .unwrap();
4176        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4177        assert!(
4178            all_names.iter().any(|n| n == "stg_items.item_id"),
4179            "Expected leaf 'stg_items.item_id', got: {:?}",
4180            all_names
4181        );
4182    }
4183
4184    #[test]
4185    fn test_lineage_cte_alias_with_join() {
4186        // Multiple CTE aliases in a JOIN: each should resolve independently
4187        let sql = r#"WITH
4188    import_users AS (SELECT id, name FROM users),
4189    import_orders AS (SELECT id, user_id, amount FROM orders)
4190SELECT u.name, o.amount
4191FROM import_users AS u
4192LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
4193        let expr = parse(sql);
4194
4195        let node = lineage("name", &expr, None, false).unwrap();
4196        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4197        assert!(
4198            all_names.iter().any(|n| n == "users.name"),
4199            "Expected leaf 'users.name', got: {:?}",
4200            all_names
4201        );
4202
4203        let node = lineage("amount", &expr, None, false).unwrap();
4204        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4205        assert!(
4206            all_names.iter().any(|n| n == "orders.amount"),
4207            "Expected leaf 'orders.amount', got: {:?}",
4208            all_names
4209        );
4210    }
4211
4212    // -----------------------------------------------------------------------
4213    // Quoted CTE name tests — verifying SQL identifier case semantics
4214    // -----------------------------------------------------------------------
4215
4216    #[test]
4217    fn test_lineage_unquoted_cte_case_insensitive() {
4218        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
4219        // MyCte and MYCTE should match.
4220        let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
4221        let node = lineage("col", &expr, None, false).unwrap();
4222        assert_eq!(node.name, "col");
4223        assert!(
4224            !node.downstream.is_empty(),
4225            "Unquoted CTE should resolve case-insensitively"
4226        );
4227    }
4228
4229    #[test]
4230    fn test_lineage_quoted_cte_case_preserved() {
4231        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
4232        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
4233        let node = lineage("col", &expr, None, false).unwrap();
4234        assert_eq!(node.name, "col");
4235        assert!(
4236            !node.downstream.is_empty(),
4237            "Quoted CTE with matching case should resolve"
4238        );
4239    }
4240
4241    #[test]
4242    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
4243        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
4244        // sqlglot treats this as a table reference, not a CTE match.
4245        // Star expansion should NOT resolve through the CTE.
4246        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
4247        // lineage("col", ...) should fail because "mycte" is treated as an external
4248        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
4249        let result = lineage("col", &expr, None, false);
4250        assert!(
4251            result.is_err(),
4252            "Quoted CTE with case mismatch should not expand star: {:?}",
4253            result
4254        );
4255    }
4256
4257    #[test]
4258    fn test_lineage_mixed_quoted_unquoted_cte() {
4259        // Mix of unquoted and quoted CTEs in a nested chain.
4260        let expr = parse(
4261            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
4262        );
4263        let node = lineage("a", &expr, None, false).unwrap();
4264        assert_eq!(node.name, "a");
4265        assert!(
4266            !node.downstream.is_empty(),
4267            "Mixed quoted/unquoted CTE chain should resolve"
4268        );
4269    }
4270
4271    // -----------------------------------------------------------------------
4272    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
4273    // -----------------------------------------------------------------------
4274    //
4275    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
4276    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
4277    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
4278    // direct string comparison for CTE name matching, ignoring the quoted status.
4279    //
4280    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
4281    // and unquoted as case-insensitive. The scope system should do the same.
4282    //
4283    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
4284    // which is broader than the star expansion scope of this PR.
4285
4286    #[test]
4287    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
4288        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
4289        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
4290        // as "mycte" incorrectly resolves to the CTE.
4291        //
4292        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
4293        // case-sensitive: "mycte" should NOT match CTE "MyCte".
4294        //
4295        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
4296        // this test should fail — update the assertion to match correct behavior:
4297        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
4298        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
4299        let node = lineage("col", &expr, None, false).unwrap();
4300        assert!(!node.downstream.is_empty());
4301        let child = &node.downstream[0];
4302        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
4303        assert_eq!(
4304            child.source_name, "MyCte",
4305            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4306             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4307        );
4308    }
4309
4310    #[test]
4311    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
4312        // Known bug: same as above but with qualified column reference ("mycte".col).
4313        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
4314        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
4315        //
4316        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
4317        // this test should fail — update to assert source_name != "MyCte".
4318        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
4319        let node = lineage("col", &expr, None, false).unwrap();
4320        assert!(!node.downstream.is_empty());
4321        let child = &node.downstream[0];
4322        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
4323        assert_eq!(
4324            child.source_name, "MyCte",
4325            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4326             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4327        );
4328    }
4329
4330    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
4331
4332    /// sqlglot: test_node_name_doesnt_contain_comment
4333    /// Comments in column expressions should not affect lineage resolution.
4334    /// NOTE: This test uses SELECT * from a derived table, which is a separate
4335    /// known limitation in polyglot-sql (star expansion in subqueries).
4336    #[test]
4337    #[ignore = "requires derived table star expansion (separate issue)"]
4338    fn test_node_name_doesnt_contain_comment() {
4339        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
4340        let node = lineage("x", &expr, None, false).unwrap();
4341
4342        assert_eq!(node.name, "x");
4343        assert!(!node.downstream.is_empty());
4344    }
4345
4346    /// A line comment between SELECT and the first column wraps the column
4347    /// in an Annotated node. Lineage must unwrap it to find the column name.
4348    /// Verify that commented and uncommented queries produce identical lineage.
4349    #[test]
4350    fn test_comment_before_first_column_in_cte() {
4351        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
4352        let sql_without_comment = "with t as (select 1 as a) select a from t";
4353
4354        // Without comment — baseline
4355        let expr_ok = parse(sql_without_comment);
4356        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
4357
4358        // With comment — should produce identical lineage
4359        let expr_comment = parse(sql_with_comment);
4360        let node_comment = lineage("a", &expr_comment, None, false)
4361            .expect("with comment before first column should succeed");
4362
4363        assert_eq!(node_ok.name, node_comment.name, "node names should match");
4364        assert_eq!(
4365            node_ok.downstream_names(),
4366            node_comment.downstream_names(),
4367            "downstream lineage should be identical with or without comment"
4368        );
4369    }
4370
4371    /// Block comment between SELECT and first column.
4372    #[test]
4373    fn test_block_comment_before_first_column() {
4374        let sql = "with t as (select 1 as a) select /* section */ a from t";
4375        let expr = parse(sql);
4376        let node = lineage("a", &expr, None, false)
4377            .expect("block comment before first column should succeed");
4378        assert_eq!(node.name, "a");
4379        assert!(
4380            !node.downstream.is_empty(),
4381            "should have downstream lineage"
4382        );
4383    }
4384
4385    /// Comment before first column should not affect second column resolution.
4386    #[test]
4387    fn test_comment_before_first_column_second_col_ok() {
4388        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
4389        let expr = parse(sql);
4390
4391        let node_a =
4392            lineage("a", &expr, None, false).expect("column a with comment should succeed");
4393        assert_eq!(node_a.name, "a");
4394
4395        let node_b =
4396            lineage("b", &expr, None, false).expect("column b with comment should succeed");
4397        assert_eq!(node_b.name, "b");
4398    }
4399
4400    /// Aliased column with preceding comment.
4401    #[test]
4402    fn test_comment_before_aliased_column() {
4403        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
4404        let expr = parse(sql);
4405        let node =
4406            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
4407        assert_eq!(node.name, "y");
4408        assert!(
4409            !node.downstream.is_empty(),
4410            "aliased column should have downstream lineage"
4411        );
4412    }
4413}