Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19/// A node in the column lineage graph
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22    /// Name of this lineage step (e.g., "table.column")
23    pub name: String,
24    /// The expression at this node
25    pub expression: Expression,
26    /// The source expression (the full query context)
27    pub source: Expression,
28    /// Downstream nodes that depend on this one
29    pub downstream: Vec<LineageNode>,
30    /// Optional source name (e.g., for derived tables)
31    pub source_name: String,
32    /// Semantic source kind for downstream consumers.
33    pub source_kind: SourceKind,
34    /// User-written source alias when different from canonical source name.
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub source_alias: Option<String>,
37    /// Optional reference node name (e.g., for CTEs)
38    pub reference_node_name: String,
39}
40
41impl LineageNode {
42    /// Create a new lineage node
43    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44        Self {
45            name: name.into(),
46            expression,
47            source,
48            downstream: Vec::new(),
49            source_name: String::new(),
50            source_kind: SourceKind::Unknown,
51            source_alias: None,
52            reference_node_name: String::new(),
53        }
54    }
55
56    /// Iterate over all nodes in the lineage graph using DFS
57    pub fn walk(&self) -> LineageWalker<'_> {
58        LineageWalker { stack: vec![self] }
59    }
60
61    /// Get all downstream column names
62    pub fn downstream_names(&self) -> Vec<String> {
63        self.downstream.iter().map(|n| n.name.clone()).collect()
64    }
65}
66
67fn source_kind_for_scope_context(
68    scope: &Scope,
69    source_name: &str,
70    reference_node_name: &str,
71) -> SourceKind {
72    if source_name.is_empty() && reference_node_name.is_empty() {
73        return SourceKind::Root;
74    }
75    if let Some(source_info) = scope.sources.get(source_name) {
76        return source_info.kind;
77    }
78    if scope.cte_sources.contains_key(source_name) {
79        return SourceKind::Cte;
80    }
81    match scope.scope_type {
82        ScopeType::Cte => SourceKind::Cte,
83        ScopeType::DerivedTable => SourceKind::DerivedTable,
84        ScopeType::Udtf => SourceKind::Virtual,
85        _ => SourceKind::Unknown,
86    }
87}
88
89fn apply_scope_context(
90    node: &mut LineageNode,
91    scope: &Scope,
92    source_name: &str,
93    reference_node_name: &str,
94) {
95    node.source_name = source_name.to_string();
96    node.reference_node_name = reference_node_name.to_string();
97    node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100/// Iterator for walking the lineage graph
101pub struct LineageWalker<'a> {
102    stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106    type Item = &'a LineageNode;
107
108    fn next(&mut self) -> Option<Self::Item> {
109        if let Some(node) = self.stack.pop() {
110            // Add children in reverse order so they're visited in order
111            for child in node.downstream.iter().rev() {
112                self.stack.push(child);
113            }
114            Some(node)
115        } else {
116            None
117        }
118    }
119}
120
121// ---------------------------------------------------------------------------
122// ColumnRef: name or positional index for column lookup
123// ---------------------------------------------------------------------------
124
125/// Column reference for lineage tracing — by name or positional index.
126enum ColumnRef<'a> {
127    Name(&'a str),
128    Index(usize),
129}
130
131// ---------------------------------------------------------------------------
132// Public API
133// ---------------------------------------------------------------------------
134
135/// Build the lineage graph for a column in a SQL query
136///
137/// # Arguments
138/// * `column` - The column name to trace lineage for
139/// * `sql` - The SQL expression (SELECT, UNION, etc.)
140/// * `dialect` - Optional dialect for parsing
141/// * `trim_selects` - If true, trim the source SELECT to only include the target column
142///
143/// # Returns
144/// The root lineage node for the specified column
145///
146/// # Example
147/// ```ignore
148/// use polyglot_sql::lineage::lineage;
149/// use polyglot_sql::parse_one;
150/// use polyglot_sql::DialectType;
151///
152/// let sql = "SELECT a, b + 1 AS c FROM t";
153/// let expr = parse_one(sql, DialectType::Generic).unwrap();
154/// let node = lineage("c", &expr, None, false).unwrap();
155/// ```
156pub fn lineage(
157    column: &str,
158    sql: &Expression,
159    dialect: Option<DialectType>,
160    trim_selects: bool,
161) -> Result<LineageNode> {
162    // Fast path: skip clone when there are no CTEs to expand
163    let effective_sql = lineage_effective_expression(sql);
164    let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
165    if !has_with {
166        return lineage_from_expression(column, sql, dialect, trim_selects);
167    }
168    let mut owned = sql.clone();
169    expand_cte_stars(&mut owned, None);
170    lineage_from_expression(column, &owned, dialect, trim_selects)
171}
172
173/// Build the lineage graph for a column in a SQL query using optional schema metadata.
174///
175/// When `schema` is provided, the query is first qualified with
176/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
177/// ambiguous column references.
178///
179/// # Arguments
180/// * `column` - The column name to trace lineage for
181/// * `sql` - The SQL expression (SELECT, UNION, etc.)
182/// * `schema` - Optional schema used for qualification
183/// * `dialect` - Optional dialect for qualification and lineage handling
184/// * `trim_selects` - If true, trim the source SELECT to only include the target column
185///
186/// # Returns
187/// The root lineage node for the specified column
188pub fn lineage_with_schema(
189    column: &str,
190    sql: &Expression,
191    schema: Option<&dyn Schema>,
192    dialect: Option<DialectType>,
193    trim_selects: bool,
194) -> Result<LineageNode> {
195    let mut qualified_expression = if let Some(schema) = schema {
196        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197            QualifyColumnsOptions::new().with_dialect(dialect_type)
198        } else {
199            QualifyColumnsOptions::new()
200        };
201
202        qualify_columns(sql.clone(), schema, &options).map_err(|e| {
203            Error::internal(format!("Lineage qualification failed with schema: {}", e))
204        })?
205    } else {
206        sql.clone()
207    };
208
209    // Annotate types in-place so lineage nodes carry type information
210    annotate_types(&mut qualified_expression, schema, dialect);
211
212    // Expand CTE stars on the already-owned expression (no extra clone).
213    // Pass schema so that stars from external tables can also be resolved.
214    expand_cte_stars(&mut qualified_expression, schema);
215
216    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220    column: &str,
221    sql: &Expression,
222    dialect: Option<DialectType>,
223    trim_selects: bool,
224) -> Result<LineageNode> {
225    let sql = lineage_effective_expression(sql);
226    let scope = build_scope(sql);
227    to_node(
228        ColumnRef::Name(column),
229        &scope,
230        dialect,
231        "",
232        "",
233        "",
234        trim_selects,
235    )
236}
237
238pub(crate) fn lineage_by_index_from_expression(
239    column_index: usize,
240    sql: &Expression,
241    dialect: Option<DialectType>,
242    trim_selects: bool,
243) -> Result<LineageNode> {
244    let sql = lineage_effective_expression(sql);
245    let scope = build_scope(sql);
246    to_node(
247        ColumnRef::Index(column_index),
248        &scope,
249        dialect,
250        "",
251        "",
252        "",
253        trim_selects,
254    )
255}
256
257fn lineage_effective_expression(sql: &Expression) -> &Expression {
258    match sql {
259        Expression::Prepare(prepare) => &prepare.statement,
260        _ => sql,
261    }
262}
263
264// ---------------------------------------------------------------------------
265// CTE star expansion
266// ---------------------------------------------------------------------------
267
268/// Normalize an identifier for CTE name matching.
269///
270/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
271/// quoted identifiers preserve their original case. This matches sqlglot's
272/// `normalize_identifiers` behavior.
273fn normalize_cte_name(ident: &Identifier) -> String {
274    if ident.quoted {
275        ident.name.clone()
276    } else {
277        ident.name.to_lowercase()
278    }
279}
280
281/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
282/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
283/// which qualify_columns cannot resolve because it processes each SELECT independently.
284///
285/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
286/// by looking up column names in the schema. This enables correct expansion of patterns
287/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
288///
289/// CTE name matching follows SQL identifier semantics: unquoted names are compared
290/// case-insensitively (lowercased), while quoted names preserve their original case.
291/// This matches sqlglot's `normalize_identifiers` behavior.
292pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
293    if let Expression::Prepare(prepare) = expr {
294        expand_cte_stars(&mut prepare.statement, schema);
295        return;
296    }
297
298    let select = match expr {
299        Expression::Select(s) => s,
300        _ => return,
301    };
302
303    let with = match &mut select.with {
304        Some(w) => w,
305        None => return,
306    };
307
308    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
309
310    for cte in &mut with.ctes {
311        let cte_name = normalize_cte_name(&cte.alias);
312
313        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
314        if !cte.columns.is_empty() {
315            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
316            resolved_cte_columns.insert(cte_name, cols);
317            continue;
318        }
319
320        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
321        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
322        // references itself. We detect this conservatively: if the CTE name appears as
323        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
324        // block are still expanded.
325        if with.recursive {
326            let is_self_referencing =
327                if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
328                    let body_sources = get_select_sources(body_select);
329                    body_sources.iter().any(|s| s.normalized == cte_name)
330                } else {
331                    false
332                };
333            if is_self_referencing {
334                continue;
335            }
336        }
337
338        // Get the SELECT from the CTE body (handle UNION by taking left branch)
339        let body_select = match get_leftmost_select_mut(&mut cte.this) {
340            Some(s) => s,
341            None => continue,
342        };
343
344        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
345        resolved_cte_columns.insert(cte_name, columns);
346    }
347
348    // Also expand stars in the outer SELECT itself
349    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
350}
351
352/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
353///
354/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
355/// are determined by the left branch. This matches sqlglot's behavior.
356fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
357    let mut current = expr;
358    for _ in 0..MAX_LINEAGE_DEPTH {
359        match current {
360            Expression::Select(s) => return Some(s),
361            Expression::Union(u) => current = &mut u.left,
362            Expression::Intersect(i) => current = &mut i.left,
363            Expression::Except(e) => current = &mut e.left,
364            Expression::Paren(p) => current = &mut p.this,
365            _ => return None,
366        }
367    }
368    None
369}
370
371/// Rewrite star expressions in a SELECT using resolved CTE column lists.
372/// Falls back to `schema` for external table column lookup.
373/// Returns the list of output column names after expansion.
374fn rewrite_stars_in_select(
375    select: &mut Select,
376    resolved_ctes: &HashMap<String, Vec<String>>,
377    schema: Option<&dyn Schema>,
378) -> Vec<String> {
379    // The AST represents star expressions in two forms depending on syntax:
380    //   - `SELECT *`      → Expression::Star (unqualified star)
381    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
382    // Both must be checked to handle all star patterns.
383    let has_star = select
384        .expressions
385        .iter()
386        .any(|e| matches!(e, Expression::Star(_)));
387    let has_qualified_star = select
388        .expressions
389        .iter()
390        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
391
392    if !has_star && !has_qualified_star {
393        // No stars — just extract column names without rewriting
394        return select
395            .expressions
396            .iter()
397            .filter_map(get_expression_output_name)
398            .collect();
399    }
400
401    let sources = get_select_sources(select);
402    let mut new_expressions = Vec::new();
403    let mut result_columns = Vec::new();
404
405    for expr in &select.expressions {
406        match expr {
407            Expression::Star(star) => {
408                let qual = star.table.as_ref();
409                if let Some(expanded) =
410                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
411                {
412                    for (src_alias, col_name) in &expanded {
413                        let table_id = Identifier::new(src_alias);
414                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
415                        result_columns.push(col_name.clone());
416                    }
417                } else {
418                    new_expressions.push(expr.clone());
419                    result_columns.push("*".to_string());
420                }
421            }
422            Expression::Column(c) if c.name.name == "*" => {
423                let qual = c.table.as_ref();
424                if let Some(expanded) =
425                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
426                {
427                    for (_src_alias, col_name) in &expanded {
428                        // Keep the original table qualifier for qualified stars (table.*)
429                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
430                        result_columns.push(col_name.clone());
431                    }
432                } else {
433                    new_expressions.push(expr.clone());
434                    result_columns.push("*".to_string());
435                }
436            }
437            _ => {
438                new_expressions.push(expr.clone());
439                if let Some(name) = get_expression_output_name(expr) {
440                    result_columns.push(name);
441                }
442            }
443        }
444    }
445
446    select.expressions = new_expressions;
447    result_columns
448}
449
450/// Try to expand a star expression by looking up source columns from resolved CTEs,
451/// falling back to the schema for external tables.
452/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
453/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
454fn expand_star_from_sources(
455    qualifier: Option<&Identifier>,
456    sources: &[SourceInfo],
457    resolved_ctes: &HashMap<String, Vec<String>>,
458    schema: Option<&dyn Schema>,
459) -> Option<Vec<(String, String)>> {
460    let mut expanded = Vec::new();
461
462    if let Some(qual) = qualifier {
463        // Qualified star: table.*
464        let qual_normalized = normalize_cte_name(qual);
465        for src in sources {
466            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
467                // Try CTE first
468                if let Some(cols) = resolved_ctes.get(&src.normalized) {
469                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
470                    return Some(expanded);
471                }
472                // Fall back to schema
473                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
474                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
475                    return Some(expanded);
476                }
477            }
478        }
479        None
480    } else {
481        // Unqualified star: expand all sources.
482        // Intentionally conservative: if any source can't be resolved, the entire
483        // expansion is aborted. Partial expansion would produce an incomplete column
484        // list, causing downstream lineage resolution to silently omit columns.
485        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
486        let mut any_expanded = false;
487        for src in sources {
488            if let Some(cols) = resolved_ctes.get(&src.normalized) {
489                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
490                any_expanded = true;
491            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
492                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
493                any_expanded = true;
494            } else {
495                return None;
496            }
497        }
498        if any_expanded {
499            Some(expanded)
500        } else {
501            None
502        }
503    }
504}
505
506/// Look up column names for a table from the schema.
507fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
508    let schema = schema?;
509    if fq_name.is_empty() {
510        return None;
511    }
512    schema
513        .column_names(fq_name)
514        .ok()
515        .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
516}
517
518/// Create a Column expression with the given name and optional table qualifier.
519fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
520    Expression::Column(Box::new(crate::expressions::Column {
521        name: Identifier::new(name),
522        table: table.cloned(),
523        join_mark: false,
524        trailing_comments: Vec::new(),
525        span: None,
526        inferred_type: None,
527    }))
528}
529
530/// Extract the output name of a SELECT expression.
531fn get_expression_output_name(expr: &Expression) -> Option<String> {
532    match expr {
533        Expression::Alias(a) => Some(a.alias.name.clone()),
534        Expression::Column(c) => Some(c.name.name.clone()),
535        Expression::Identifier(id) => Some(id.name.clone()),
536        Expression::Star(_) => Some("*".to_string()),
537        _ => None,
538    }
539}
540
541/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
542struct SourceInfo {
543    alias: String,
544    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
545    normalized: String,
546    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
547    fq_name: String,
548}
549
550/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
551/// SELECT's FROM and JOIN clauses in a single pass.
552fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
553    let mut sources = Vec::new();
554
555    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
556        fn virtual_source_info(alias: &Identifier) -> SourceInfo {
557            SourceInfo {
558                alias: alias.name.clone(),
559                normalized: normalize_cte_name(alias),
560                fq_name: alias.name.clone(),
561            }
562        }
563
564        fn named_virtual_source_info(alias: &str) -> SourceInfo {
565            SourceInfo {
566                alias: alias.to_string(),
567                normalized: alias.to_lowercase(),
568                fq_name: alias.to_string(),
569            }
570        }
571
572        match expr {
573            Expression::Table(t) => {
574                let normalized = normalize_cte_name(&t.name);
575                let alias = t
576                    .alias
577                    .as_ref()
578                    .map(|a| a.name.clone())
579                    .unwrap_or_else(|| t.name.name.clone());
580                let mut parts = Vec::new();
581                if let Some(catalog) = &t.catalog {
582                    parts.push(catalog.name.clone());
583                }
584                if let Some(schema) = &t.schema {
585                    parts.push(schema.name.clone());
586                }
587                parts.push(t.name.name.clone());
588                let fq_name = parts.join(".");
589                Some(SourceInfo {
590                    alias,
591                    normalized,
592                    fq_name,
593                })
594            }
595            Expression::Subquery(s) => {
596                let alias = s.alias.as_ref()?.name.clone();
597                let normalized = alias.to_lowercase();
598                let fq_name = alias.clone();
599                Some(SourceInfo {
600                    alias,
601                    normalized,
602                    fq_name,
603                })
604            }
605            Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
606            Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
607                Some(virtual_source_info(&a.alias))
608            }
609            Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
610            Expression::LateralView(lateral_view) => lateral_view
611                .table_alias
612                .as_ref()
613                .or_else(|| lateral_view.column_aliases.first())
614                .map(virtual_source_info),
615            Expression::Paren(p) => extract_source(&p.this),
616            _ => None,
617        }
618    }
619
620    if let Some(from) = &select.from {
621        for expr in &from.expressions {
622            if let Some(info) = extract_source(expr) {
623                sources.push(info);
624            }
625        }
626    }
627    for join in &select.joins {
628        if let Some(info) = extract_source(&join.this) {
629            sources.push(info);
630        }
631    }
632    for lateral_view in &select.lateral_views {
633        if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
634        {
635            sources.push(info);
636        }
637    }
638    sources
639}
640
641/// Get all source tables from a lineage graph
642pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
643    let mut tables = HashSet::new();
644    collect_source_tables(node, &mut tables);
645    tables
646}
647
648/// Recursively collect source table names from lineage graph
649pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
650    if let Expression::Table(table) = &node.source {
651        tables.insert(table.name.name.clone());
652    }
653    for child in &node.downstream {
654        collect_source_tables(child, tables);
655    }
656}
657
658// ---------------------------------------------------------------------------
659// Core recursive lineage builder
660// ---------------------------------------------------------------------------
661
662/// Maximum recursion depth for lineage tracing to prevent stack overflow
663/// on circular or deeply nested CTE chains.
664const MAX_LINEAGE_DEPTH: usize = 64;
665
666/// Recursively build a lineage node for a column in a scope.
667fn to_node(
668    column: ColumnRef<'_>,
669    scope: &Scope,
670    dialect: Option<DialectType>,
671    scope_name: &str,
672    source_name: &str,
673    reference_node_name: &str,
674    trim_selects: bool,
675) -> Result<LineageNode> {
676    to_node_inner(
677        column,
678        scope,
679        dialect,
680        scope_name,
681        source_name,
682        reference_node_name,
683        trim_selects,
684        &[],
685        0,
686    )
687}
688
689fn to_node_inner(
690    column: ColumnRef<'_>,
691    scope: &Scope,
692    dialect: Option<DialectType>,
693    scope_name: &str,
694    source_name: &str,
695    reference_node_name: &str,
696    trim_selects: bool,
697    ancestor_cte_scopes: &[Scope],
698    depth: usize,
699) -> Result<LineageNode> {
700    if depth > MAX_LINEAGE_DEPTH {
701        return Err(Error::internal(format!(
702            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
703        )));
704    }
705    let scope_expr = &scope.expression;
706
707    // Build combined CTE scopes: current scope's cte_scopes + ancestors
708    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
709    for s in ancestor_cte_scopes {
710        all_cte_scopes.push(s);
711    }
712
713    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
714    //    but we need the inner query (SELECT/UNION) for column lookup.
715    let effective_expr = match scope_expr {
716        Expression::Cte(cte) => &cte.this,
717        other => other,
718    };
719
720    // 1. Set operations (UNION / INTERSECT / EXCEPT)
721    if matches!(
722        effective_expr,
723        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
724    ) {
725        // For CTE wrapping a set op, create a temporary scope with the inner expression
726        if matches!(scope_expr, Expression::Cte(_)) {
727            let mut inner_scope = Scope::new(effective_expr.clone());
728            inner_scope.union_scopes = scope.union_scopes.clone();
729            inner_scope.sources = scope.sources.clone();
730            inner_scope.cte_sources = scope.cte_sources.clone();
731            inner_scope.cte_scopes = scope.cte_scopes.clone();
732            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
733            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
734            return handle_set_operation(
735                &column,
736                &inner_scope,
737                dialect,
738                scope_name,
739                source_name,
740                reference_node_name,
741                trim_selects,
742                ancestor_cte_scopes,
743                depth,
744            );
745        }
746        return handle_set_operation(
747            &column,
748            scope,
749            dialect,
750            scope_name,
751            source_name,
752            reference_node_name,
753            trim_selects,
754            ancestor_cte_scopes,
755            depth,
756        );
757    }
758
759    // 2. Find the select expression for this column
760    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
761    let column_name = resolve_column_name(&column, &select_expr);
762
763    // 3. Trim source if requested
764    let node_source = if trim_selects {
765        trim_source(effective_expr, &select_expr)
766    } else {
767        effective_expr.clone()
768    };
769
770    // 4. Create the lineage node
771    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
772    apply_scope_context(&mut node, scope, source_name, reference_node_name);
773
774    // 5. Star handling — add downstream for each source
775    if let Expression::Star(star) = &select_expr {
776        let star_table = star
777            .table
778            .as_ref()
779            .map(|identifier| identifier.name.as_str());
780        for (name, source_info) in &scope.sources {
781            if let Some(star_table) = star_table {
782                let table_matches = name.eq_ignore_ascii_case(star_table)
783                    || source_info
784                        .alias
785                        .as_deref()
786                        .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
787                    || matches!(
788                        &source_info.expression,
789                        Expression::Table(table_ref)
790                            if table_name_from_table_ref(table_ref).eq_ignore_ascii_case(star_table)
791                    );
792                if !table_matches {
793                    continue;
794                }
795            }
796
797            let mut child = LineageNode::new(
798                format!("{}.*", name),
799                Expression::Star(crate::expressions::Star {
800                    table: star.table.clone(),
801                    except: None,
802                    replace: None,
803                    rename: None,
804                    trailing_comments: vec![],
805                    span: None,
806                }),
807                source_info.expression.clone(),
808            );
809            apply_source_info_context(&mut child, name, source_info);
810            node.downstream.push(child);
811        }
812        return Ok(node);
813    }
814
815    // 6. Subqueries in select — trace through scalar subqueries
816    let subqueries: Vec<&Expression> =
817        select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
818    for sq_expr in subqueries {
819        if let Expression::Subquery(sq) = sq_expr {
820            for sq_scope in &scope.subquery_scopes {
821                if sq_scope.expression == sq.this {
822                    if let Ok(child) = to_node_inner(
823                        ColumnRef::Index(0),
824                        sq_scope,
825                        dialect,
826                        &column_name,
827                        "",
828                        "",
829                        trim_selects,
830                        ancestor_cte_scopes,
831                        depth + 1,
832                    ) {
833                        node.downstream.push(child);
834                    }
835                    break;
836                }
837            }
838        }
839    }
840
841    // 7. Column references — trace each column to its source
842    let col_refs = find_column_refs_in_expr(&select_expr, dialect);
843    for col_ref in col_refs {
844        let col_name = &col_ref.column;
845        if let Some(ref table_id) = col_ref.table {
846            let tbl = &table_id.name;
847            resolve_qualified_column(
848                &mut node,
849                scope,
850                dialect,
851                tbl,
852                col_name,
853                &column_name,
854                trim_selects,
855                &all_cte_scopes,
856                depth,
857            );
858        } else {
859            resolve_unqualified_column(
860                &mut node,
861                scope,
862                dialect,
863                col_name,
864                &column_name,
865                trim_selects,
866                &all_cte_scopes,
867                depth,
868            );
869        }
870    }
871
872    Ok(node)
873}
874
875// ---------------------------------------------------------------------------
876// Set operation handling
877// ---------------------------------------------------------------------------
878
879fn handle_set_operation(
880    column: &ColumnRef<'_>,
881    scope: &Scope,
882    dialect: Option<DialectType>,
883    scope_name: &str,
884    source_name: &str,
885    reference_node_name: &str,
886    trim_selects: bool,
887    ancestor_cte_scopes: &[Scope],
888    depth: usize,
889) -> Result<LineageNode> {
890    let scope_expr = &scope.expression;
891
892    // Determine column index
893    let col_index = match column {
894        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
895        ColumnRef::Index(i) => *i,
896    };
897
898    let col_name = match column {
899        ColumnRef::Name(name) => name.to_string(),
900        ColumnRef::Index(_) => format!("_{col_index}"),
901    };
902
903    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
904    apply_scope_context(&mut node, scope, source_name, reference_node_name);
905
906    // Recurse into each union branch
907    for branch_scope in &scope.union_scopes {
908        if let Ok(child) = to_node_inner(
909            ColumnRef::Index(col_index),
910            branch_scope,
911            dialect,
912            scope_name,
913            "",
914            "",
915            trim_selects,
916            ancestor_cte_scopes,
917            depth + 1,
918        ) {
919            node.downstream.push(child);
920        }
921    }
922
923    Ok(node)
924}
925
926// ---------------------------------------------------------------------------
927// Column resolution helpers
928// ---------------------------------------------------------------------------
929
930fn resolve_qualified_column(
931    node: &mut LineageNode,
932    scope: &Scope,
933    dialect: Option<DialectType>,
934    table: &str,
935    col_name: &str,
936    parent_name: &str,
937    trim_selects: bool,
938    all_cte_scopes: &[&Scope],
939    depth: usize,
940) {
941    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
942    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
943    let resolved_cte_name = resolve_cte_alias(scope, table);
944    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
945
946    // Check if table is a CTE reference — check both the current scope's cte_sources
947    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
948    let is_cte = scope.cte_sources.contains_key(effective_table)
949        || all_cte_scopes.iter().any(
950            |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
951        );
952    if is_cte {
953        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
954            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
955            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
956            if let Ok(child) = to_node_inner(
957                ColumnRef::Name(col_name),
958                child_scope,
959                dialect,
960                parent_name,
961                effective_table,
962                parent_name,
963                trim_selects,
964                &ancestors,
965                depth + 1,
966            ) {
967                node.downstream.push(child);
968                return;
969            }
970        }
971    }
972
973    // Check if table is a derived table (is_scope = true in sources)
974    if let Some(source_info) = scope.sources.get(table) {
975        if source_info.is_scope {
976            if let Some(child_scope) = find_child_scope(scope, table) {
977                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
978                if let Ok(child) = to_node_inner(
979                    ColumnRef::Name(col_name),
980                    child_scope,
981                    dialect,
982                    parent_name,
983                    table,
984                    parent_name,
985                    trim_selects,
986                    &ancestors,
987                    depth + 1,
988                ) {
989                    node.downstream.push(child);
990                    return;
991                }
992            }
993        }
994    }
995
996    // Base table source found in current scope: preserve alias in the display name
997    // but store the resolved table expression and name for downstream consumers.
998    if let Some(source_info) = scope.sources.get(table) {
999        if !source_info.is_scope {
1000            let mut child = make_table_column_node_from_source(table, col_name, source_info);
1001            if source_info.kind == SourceKind::Virtual {
1002                attach_virtual_source_dependencies(
1003                    &mut child,
1004                    scope,
1005                    dialect,
1006                    table,
1007                    &source_info.expression,
1008                    trim_selects,
1009                    all_cte_scopes,
1010                    depth,
1011                );
1012            }
1013            node.downstream.push(child);
1014            return;
1015        }
1016    }
1017
1018    // Base table or unresolved — terminal node
1019    node.downstream
1020        .push(make_table_column_node(table, col_name));
1021}
1022
1023/// Resolve a FROM alias to the original CTE name.
1024///
1025/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
1026/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
1027/// This function checks if `name` is such an alias and returns the CTE name.
1028fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1029    // If it's already a known CTE name, no resolution needed
1030    if scope.cte_sources.contains_key(name) {
1031        return None;
1032    }
1033    // Check if the source's expression is a CTE — if so, extract the CTE name
1034    if let Some(source_info) = scope.sources.get(name) {
1035        if source_info.is_scope {
1036            if let Expression::Cte(cte) = &source_info.expression {
1037                let cte_name = &cte.alias.name;
1038                if scope.cte_sources.contains_key(cte_name) {
1039                    return Some(cte_name.clone());
1040                }
1041            }
1042        }
1043    }
1044    None
1045}
1046
1047fn resolve_unqualified_column(
1048    node: &mut LineageNode,
1049    scope: &Scope,
1050    dialect: Option<DialectType>,
1051    col_name: &str,
1052    parent_name: &str,
1053    trim_selects: bool,
1054    all_cte_scopes: &[&Scope],
1055    depth: usize,
1056) {
1057    // Try to find which source this column belongs to.
1058    // Build the source list from the actual FROM/JOIN clauses to avoid
1059    // mixing in CTE definitions that are in scope but not referenced.
1060    let from_source_names = source_names_from_from_join(scope);
1061
1062    if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1063        resolve_qualified_column(
1064            node,
1065            scope,
1066            dialect,
1067            &tbl,
1068            col_name,
1069            parent_name,
1070            trim_selects,
1071            all_cte_scopes,
1072            depth,
1073        );
1074        return;
1075    }
1076
1077    if from_source_names.len() == 1 {
1078        let tbl = &from_source_names[0];
1079        resolve_qualified_column(
1080            node,
1081            scope,
1082            dialect,
1083            tbl,
1084            col_name,
1085            parent_name,
1086            trim_selects,
1087            all_cte_scopes,
1088            depth,
1089        );
1090        return;
1091    }
1092
1093    // Multiple sources — can't resolve without schema info, add unqualified node
1094    let child = LineageNode::new(
1095        col_name.to_string(),
1096        Expression::Column(Box::new(crate::expressions::Column {
1097            name: crate::expressions::Identifier::new(col_name.to_string()),
1098            table: None,
1099            join_mark: false,
1100            trailing_comments: vec![],
1101            span: None,
1102            inferred_type: None,
1103        })),
1104        node.source.clone(),
1105    );
1106    node.downstream.push(child);
1107}
1108
1109fn unique_virtual_source_for_column(
1110    scope: &Scope,
1111    source_names: &[String],
1112    col_name: &str,
1113) -> Option<String> {
1114    let mut matches = source_names.iter().filter_map(|source_name| {
1115        let source = scope.sources.get(source_name)?;
1116        if source.kind == SourceKind::Virtual
1117            && virtual_source_output_columns(source)
1118                .any(|column| column.eq_ignore_ascii_case(col_name))
1119        {
1120            Some(source_name.clone())
1121        } else {
1122            None
1123        }
1124    });
1125
1126    let first = matches.next()?;
1127    if matches.next().is_none() {
1128        Some(first)
1129    } else {
1130        None
1131    }
1132}
1133
1134fn virtual_source_output_columns(
1135    source_info: &ScopeSourceInfo,
1136) -> Box<dyn Iterator<Item = String> + '_> {
1137    match &source_info.expression {
1138        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1139        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1140            Box::new(alias_output_columns(alias))
1141        }
1142        Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1143        Expression::LateralView(lateral_view) => {
1144            Box::new(lateral_view_output_columns(lateral_view))
1145        }
1146        _ => Box::new(source_info.alias.clone().into_iter()),
1147    }
1148}
1149
1150fn unnest_output_columns(
1151    unnest: &crate::expressions::UnnestFunc,
1152) -> impl Iterator<Item = String> + '_ {
1153    unnest
1154        .alias
1155        .iter()
1156        .map(|alias| alias.name.clone())
1157        .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1158}
1159
1160fn alias_output_columns(
1161    alias: &crate::expressions::Alias,
1162) -> Box<dyn Iterator<Item = String> + '_> {
1163    if alias.column_aliases.is_empty() {
1164        Box::new(std::iter::once(alias.alias.name.clone()))
1165    } else {
1166        Box::new(
1167            alias
1168                .column_aliases
1169                .iter()
1170                .map(|column| column.name.clone()),
1171        )
1172    }
1173}
1174
1175fn lateral_output_columns(
1176    lateral: &crate::expressions::Lateral,
1177) -> Box<dyn Iterator<Item = String> + '_> {
1178    if lateral.column_aliases.is_empty() {
1179        default_virtual_output_columns(&lateral.this)
1180    } else {
1181        Box::new(lateral.column_aliases.iter().cloned())
1182    }
1183}
1184
1185fn lateral_view_output_columns(
1186    lateral_view: &crate::expressions::LateralView,
1187) -> Box<dyn Iterator<Item = String> + '_> {
1188    Box::new(
1189        lateral_view
1190            .column_aliases
1191            .iter()
1192            .map(|column| column.name.clone()),
1193    )
1194}
1195
1196fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1197    match expr {
1198        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1199        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1200            alias_output_columns(alias)
1201        }
1202        Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1203            Box::new(
1204                ["seq", "key", "path", "index", "value", "this"]
1205                    .into_iter()
1206                    .map(String::from),
1207            )
1208        }
1209        _ => Box::new(std::iter::empty()),
1210    }
1211}
1212
1213fn attach_virtual_source_dependencies(
1214    node: &mut LineageNode,
1215    scope: &Scope,
1216    dialect: Option<DialectType>,
1217    source_alias: &str,
1218    source_expr: &Expression,
1219    trim_selects: bool,
1220    all_cte_scopes: &[&Scope],
1221    depth: usize,
1222) {
1223    let parent_name = node.name.clone();
1224    let mut seen = HashSet::new();
1225    for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1226        let key = (
1227            col_ref.table.as_ref().map(|t| t.name.clone()),
1228            col_ref.column.clone(),
1229        );
1230        if !seen.insert(key) {
1231            continue;
1232        }
1233
1234        if let Some(table_id) = col_ref.table {
1235            let table = table_id.name;
1236            if table == source_alias {
1237                continue;
1238            }
1239            resolve_qualified_column(
1240                node,
1241                scope,
1242                dialect,
1243                &table,
1244                &col_ref.column,
1245                &parent_name,
1246                trim_selects,
1247                all_cte_scopes,
1248                depth + 1,
1249            );
1250        } else {
1251            let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1252            if non_virtual_sources.len() == 1 {
1253                resolve_qualified_column(
1254                    node,
1255                    scope,
1256                    dialect,
1257                    &non_virtual_sources[0],
1258                    &col_ref.column,
1259                    &parent_name,
1260                    trim_selects,
1261                    all_cte_scopes,
1262                    depth + 1,
1263                );
1264            }
1265        }
1266    }
1267}
1268
1269fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1270    fn source_name(expr: &Expression) -> Option<String> {
1271        match expr {
1272            Expression::Table(table) => Some(
1273                table
1274                    .alias
1275                    .as_ref()
1276                    .map(|a| a.name.clone())
1277                    .unwrap_or_else(|| table.name.name.clone()),
1278            ),
1279            Expression::Subquery(subquery) => {
1280                subquery.alias.as_ref().map(|alias| alias.name.clone())
1281            }
1282            Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1283            Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1284                Some(alias.alias.name.clone())
1285            }
1286            Expression::Lateral(lateral) => lateral.alias.clone(),
1287            Expression::LateralView(lateral_view) => lateral_view
1288                .table_alias
1289                .as_ref()
1290                .or_else(|| lateral_view.column_aliases.first())
1291                .map(|alias| alias.name.clone()),
1292            Expression::Paren(paren) => source_name(&paren.this),
1293            _ => None,
1294        }
1295    }
1296
1297    let effective_expr = match &scope.expression {
1298        Expression::Cte(cte) => &cte.this,
1299        expr => expr,
1300    };
1301
1302    let mut names = Vec::new();
1303    let mut seen = std::collections::HashSet::new();
1304
1305    if let Expression::Select(select) = effective_expr {
1306        if let Some(from) = &select.from {
1307            for expr in &from.expressions {
1308                if let Some(name) = source_name(expr) {
1309                    if !name.is_empty() && seen.insert(name.clone()) {
1310                        names.push(name);
1311                    }
1312                }
1313            }
1314        }
1315        for join in &select.joins {
1316            if let Some(name) = source_name(&join.this) {
1317                if !name.is_empty() && seen.insert(name.clone()) {
1318                    names.push(name);
1319                }
1320            }
1321        }
1322        for lateral_view in &select.lateral_views {
1323            if let Some(name) =
1324                source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
1325            {
1326                if !name.is_empty() && seen.insert(name.clone()) {
1327                    names.push(name);
1328                }
1329            }
1330        }
1331    }
1332
1333    names
1334}
1335
1336fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1337    source_names_from_from_join(scope)
1338        .into_iter()
1339        .filter(|name| {
1340            !matches!(
1341                scope.sources.get(name).map(|source| source.kind),
1342                Some(SourceKind::Virtual)
1343            )
1344        })
1345        .collect()
1346}
1347
1348// ---------------------------------------------------------------------------
1349// Helper functions
1350// ---------------------------------------------------------------------------
1351
1352/// Get the alias or name of an expression
1353fn get_alias_or_name(expr: &Expression) -> Option<String> {
1354    match expr {
1355        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1356        Expression::Column(col) => Some(col.name.name.clone()),
1357        Expression::Identifier(id) => Some(id.name.clone()),
1358        Expression::Star(_) => Some("*".to_string()),
1359        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
1360        // Unwrap to get the actual column/alias name from the inner expression.
1361        Expression::Annotated(a) => get_alias_or_name(&a.this),
1362        _ => None,
1363    }
1364}
1365
1366/// Resolve the display name for a column reference.
1367fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1368    match column {
1369        ColumnRef::Name(n) => n.to_string(),
1370        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1371    }
1372}
1373
1374/// Find the select expression matching a column reference.
1375fn find_select_expr(
1376    scope_expr: &Expression,
1377    column: &ColumnRef<'_>,
1378    dialect: Option<DialectType>,
1379) -> Result<Expression> {
1380    if let Expression::Select(ref select) = scope_expr {
1381        match column {
1382            ColumnRef::Name(name) => {
1383                let normalized_name = normalize_column_name(name, dialect);
1384                for expr in &select.expressions {
1385                    if let Some(alias_or_name) = get_alias_or_name(expr) {
1386                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1387                            return Ok(expr.clone());
1388                        }
1389                    }
1390                }
1391                Err(crate::error::Error::parse(
1392                    format!("Cannot find column '{}' in query", name),
1393                    0,
1394                    0,
1395                    0,
1396                    0,
1397                ))
1398            }
1399            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1400                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1401            }),
1402        }
1403    } else {
1404        Err(crate::error::Error::parse(
1405            "Expected SELECT expression for column lookup",
1406            0,
1407            0,
1408            0,
1409            0,
1410        ))
1411    }
1412}
1413
1414/// Find the positional index of a column name in a set operation's first SELECT branch.
1415fn column_to_index(
1416    set_op_expr: &Expression,
1417    name: &str,
1418    dialect: Option<DialectType>,
1419) -> Result<usize> {
1420    let normalized_name = normalize_column_name(name, dialect);
1421    let mut expr = set_op_expr;
1422    loop {
1423        match expr {
1424            Expression::Union(u) => expr = &u.left,
1425            Expression::Intersect(i) => expr = &i.left,
1426            Expression::Except(e) => expr = &e.left,
1427            Expression::Select(select) => {
1428                for (i, e) in select.expressions.iter().enumerate() {
1429                    if let Some(alias_or_name) = get_alias_or_name(e) {
1430                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1431                            return Ok(i);
1432                        }
1433                    }
1434                }
1435                return Err(crate::error::Error::parse(
1436                    format!("Cannot find column '{}' in set operation", name),
1437                    0,
1438                    0,
1439                    0,
1440                    0,
1441                ));
1442            }
1443            _ => {
1444                return Err(crate::error::Error::parse(
1445                    "Expected SELECT or set operation",
1446                    0,
1447                    0,
1448                    0,
1449                    0,
1450                ))
1451            }
1452        }
1453    }
1454}
1455
1456fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1457    normalize_name(name, dialect, false, true)
1458}
1459
1460/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
1461fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1462    if let Expression::Select(select) = select_expr {
1463        let mut trimmed = select.as_ref().clone();
1464        trimmed.expressions = vec![target_expr.clone()];
1465        Expression::Select(Box::new(trimmed))
1466    } else {
1467        select_expr.clone()
1468    }
1469}
1470
1471/// Find the child scope (CTE or derived table) for a given source name.
1472fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1473    // Check CTE scopes
1474    if scope.cte_sources.contains_key(source_name) {
1475        for cte_scope in &scope.cte_scopes {
1476            if let Expression::Cte(cte) = &cte_scope.expression {
1477                if cte.alias.name == source_name {
1478                    return Some(cte_scope);
1479                }
1480            }
1481        }
1482    }
1483
1484    // Check derived table scopes
1485    if let Some(source_info) = scope.sources.get(source_name) {
1486        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1487            if let Expression::Subquery(sq) = &source_info.expression {
1488                for dt_scope in &scope.derived_table_scopes {
1489                    if dt_scope.expression == sq.this {
1490                        return Some(dt_scope);
1491                    }
1492                }
1493            }
1494        }
1495    }
1496
1497    None
1498}
1499
1500/// Find a CTE scope by name, searching through a combined list of CTE scopes.
1501/// This handles nested CTEs where the current scope doesn't have the CTE scope
1502/// as a direct child but knows about it via cte_sources.
1503fn find_child_scope_in<'a>(
1504    all_cte_scopes: &[&'a Scope],
1505    scope: &'a Scope,
1506    source_name: &str,
1507) -> Option<&'a Scope> {
1508    // First try the scope's own cte_scopes
1509    for cte_scope in &scope.cte_scopes {
1510        if let Expression::Cte(cte) = &cte_scope.expression {
1511            if cte.alias.name == source_name {
1512                return Some(cte_scope);
1513            }
1514        }
1515    }
1516
1517    // Then search through all ancestor CTE scopes
1518    for cte_scope in all_cte_scopes {
1519        if let Expression::Cte(cte) = &cte_scope.expression {
1520            if cte.alias.name == source_name {
1521                return Some(cte_scope);
1522            }
1523        }
1524    }
1525
1526    // Fall back to derived table scopes
1527    if let Some(source_info) = scope.sources.get(source_name) {
1528        if source_info.is_scope {
1529            if let Expression::Subquery(sq) = &source_info.expression {
1530                for dt_scope in &scope.derived_table_scopes {
1531                    if dt_scope.expression == sq.this {
1532                        return Some(dt_scope);
1533                    }
1534                }
1535            }
1536        }
1537    }
1538
1539    None
1540}
1541
1542/// Create a terminal lineage node for a table.column reference.
1543fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1544    let mut node = LineageNode::new(
1545        format!("{}.{}", table, column),
1546        Expression::Column(Box::new(crate::expressions::Column {
1547            name: crate::expressions::Identifier::new(column.to_string()),
1548            table: Some(crate::expressions::Identifier::new(table.to_string())),
1549            join_mark: false,
1550            trailing_comments: vec![],
1551            span: None,
1552            inferred_type: None,
1553        })),
1554        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1555    );
1556    node.source_name = table.to_string();
1557    node.source_kind = SourceKind::Table;
1558    node
1559}
1560
1561fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1562    let mut parts: Vec<String> = Vec::new();
1563    if let Some(catalog) = &table_ref.catalog {
1564        parts.push(catalog.name.clone());
1565    }
1566    if let Some(schema) = &table_ref.schema {
1567        parts.push(schema.name.clone());
1568    }
1569    parts.push(table_ref.name.name.clone());
1570    parts.join(".")
1571}
1572
1573fn apply_source_info_context(
1574    node: &mut LineageNode,
1575    source_key: &str,
1576    source_info: &ScopeSourceInfo,
1577) {
1578    node.source_kind = source_info.kind;
1579    node.source_name =
1580        source_info
1581            .lineage_name
1582            .clone()
1583            .unwrap_or_else(|| match &source_info.expression {
1584                Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
1585                _ => source_key.to_string(),
1586            });
1587    node.source_alias = source_info.alias.clone();
1588}
1589
1590fn make_table_column_node_from_source(
1591    source_key: &str,
1592    column: &str,
1593    source_info: &ScopeSourceInfo,
1594) -> LineageNode {
1595    let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
1596    let mut node = LineageNode::new(
1597        format!("{}.{}", lineage_name, column),
1598        Expression::Column(Box::new(crate::expressions::Column {
1599            name: crate::expressions::Identifier::new(column.to_string()),
1600            table: Some(crate::expressions::Identifier::new(
1601                lineage_name.to_string(),
1602            )),
1603            join_mark: false,
1604            trailing_comments: vec![],
1605            span: None,
1606            inferred_type: None,
1607        })),
1608        source_info.expression.clone(),
1609    );
1610
1611    apply_source_info_context(&mut node, source_key, source_info);
1612
1613    node
1614}
1615
1616/// Simple column reference extracted from an expression
1617#[derive(Debug, Clone)]
1618struct SimpleColumnRef {
1619    table: Option<crate::expressions::Identifier>,
1620    column: String,
1621}
1622
1623/// Find all column references in an expression (does not recurse into subqueries).
1624fn find_column_refs_in_expr(
1625    expr: &Expression,
1626    dialect: Option<DialectType>,
1627) -> Vec<SimpleColumnRef> {
1628    let mut refs = Vec::new();
1629    collect_column_refs(expr, dialect, &mut refs);
1630    refs
1631}
1632
1633fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1634    match expr {
1635        Expression::Column(col) => {
1636            col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1637        }
1638        Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1639        _ => false,
1640    }
1641}
1642
1643fn collect_column_refs(
1644    expr: &Expression,
1645    dialect: Option<DialectType>,
1646    refs: &mut Vec<SimpleColumnRef>,
1647) {
1648    let mut stack: Vec<&Expression> = vec![expr];
1649
1650    while let Some(current) = stack.pop() {
1651        match current {
1652            // === Leaf: collect Column references ===
1653            Expression::Column(col) => {
1654                refs.push(SimpleColumnRef {
1655                    table: col.table.clone(),
1656                    column: col.name.name.clone(),
1657                });
1658            }
1659
1660            // === Boundary: don't recurse into subqueries (handled separately) ===
1661            Expression::Subquery(_) | Expression::Exists(_) => {}
1662
1663            // === BinaryOp variants: left, right ===
1664            Expression::And(op)
1665            | Expression::Or(op)
1666            | Expression::Eq(op)
1667            | Expression::Neq(op)
1668            | Expression::Lt(op)
1669            | Expression::Lte(op)
1670            | Expression::Gt(op)
1671            | Expression::Gte(op)
1672            | Expression::Add(op)
1673            | Expression::Sub(op)
1674            | Expression::Mul(op)
1675            | Expression::Div(op)
1676            | Expression::Mod(op)
1677            | Expression::BitwiseAnd(op)
1678            | Expression::BitwiseOr(op)
1679            | Expression::BitwiseXor(op)
1680            | Expression::BitwiseLeftShift(op)
1681            | Expression::BitwiseRightShift(op)
1682            | Expression::Concat(op)
1683            | Expression::Adjacent(op)
1684            | Expression::TsMatch(op)
1685            | Expression::PropertyEQ(op)
1686            | Expression::ArrayContainsAll(op)
1687            | Expression::ArrayContainedBy(op)
1688            | Expression::ArrayOverlaps(op)
1689            | Expression::JSONBContainsAllTopKeys(op)
1690            | Expression::JSONBContainsAnyTopKeys(op)
1691            | Expression::JSONBDeleteAtPath(op)
1692            | Expression::ExtendsLeft(op)
1693            | Expression::ExtendsRight(op)
1694            | Expression::Is(op)
1695            | Expression::MemberOf(op)
1696            | Expression::NullSafeEq(op)
1697            | Expression::NullSafeNeq(op)
1698            | Expression::Glob(op)
1699            | Expression::Match(op) => {
1700                stack.push(&op.left);
1701                stack.push(&op.right);
1702            }
1703
1704            // === UnaryOp variants: this ===
1705            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1706                stack.push(&u.this);
1707            }
1708
1709            // === UnaryFunc variants: this ===
1710            Expression::Upper(f)
1711            | Expression::Lower(f)
1712            | Expression::Length(f)
1713            | Expression::LTrim(f)
1714            | Expression::RTrim(f)
1715            | Expression::Reverse(f)
1716            | Expression::Abs(f)
1717            | Expression::Sqrt(f)
1718            | Expression::Cbrt(f)
1719            | Expression::Ln(f)
1720            | Expression::Exp(f)
1721            | Expression::Sign(f)
1722            | Expression::Date(f)
1723            | Expression::Time(f)
1724            | Expression::DateFromUnixDate(f)
1725            | Expression::UnixDate(f)
1726            | Expression::UnixSeconds(f)
1727            | Expression::UnixMillis(f)
1728            | Expression::UnixMicros(f)
1729            | Expression::TimeStrToDate(f)
1730            | Expression::DateToDi(f)
1731            | Expression::DiToDate(f)
1732            | Expression::TsOrDiToDi(f)
1733            | Expression::TsOrDsToDatetime(f)
1734            | Expression::TsOrDsToTimestamp(f)
1735            | Expression::YearOfWeek(f)
1736            | Expression::YearOfWeekIso(f)
1737            | Expression::Initcap(f)
1738            | Expression::Ascii(f)
1739            | Expression::Chr(f)
1740            | Expression::Soundex(f)
1741            | Expression::ByteLength(f)
1742            | Expression::Hex(f)
1743            | Expression::LowerHex(f)
1744            | Expression::Unicode(f)
1745            | Expression::Radians(f)
1746            | Expression::Degrees(f)
1747            | Expression::Sin(f)
1748            | Expression::Cos(f)
1749            | Expression::Tan(f)
1750            | Expression::Asin(f)
1751            | Expression::Acos(f)
1752            | Expression::Atan(f)
1753            | Expression::IsNan(f)
1754            | Expression::IsInf(f)
1755            | Expression::ArrayLength(f)
1756            | Expression::ArraySize(f)
1757            | Expression::Cardinality(f)
1758            | Expression::ArrayReverse(f)
1759            | Expression::ArrayDistinct(f)
1760            | Expression::ArrayFlatten(f)
1761            | Expression::ArrayCompact(f)
1762            | Expression::Explode(f)
1763            | Expression::ExplodeOuter(f)
1764            | Expression::ToArray(f)
1765            | Expression::MapFromEntries(f)
1766            | Expression::MapKeys(f)
1767            | Expression::MapValues(f)
1768            | Expression::JsonArrayLength(f)
1769            | Expression::JsonKeys(f)
1770            | Expression::JsonType(f)
1771            | Expression::ParseJson(f)
1772            | Expression::ToJson(f)
1773            | Expression::Typeof(f)
1774            | Expression::BitwiseCount(f)
1775            | Expression::Year(f)
1776            | Expression::Month(f)
1777            | Expression::Day(f)
1778            | Expression::Hour(f)
1779            | Expression::Minute(f)
1780            | Expression::Second(f)
1781            | Expression::DayOfWeek(f)
1782            | Expression::DayOfWeekIso(f)
1783            | Expression::DayOfMonth(f)
1784            | Expression::DayOfYear(f)
1785            | Expression::WeekOfYear(f)
1786            | Expression::Quarter(f)
1787            | Expression::Epoch(f)
1788            | Expression::EpochMs(f)
1789            | Expression::TimeStrToUnix(f)
1790            | Expression::SHA(f)
1791            | Expression::SHA1Digest(f)
1792            | Expression::TimeToUnix(f)
1793            | Expression::JSONBool(f)
1794            | Expression::Int64(f)
1795            | Expression::MD5NumberLower64(f)
1796            | Expression::MD5NumberUpper64(f)
1797            | Expression::DateStrToDate(f)
1798            | Expression::DateToDateStr(f) => {
1799                stack.push(&f.this);
1800            }
1801
1802            // === BinaryFunc variants: this, expression ===
1803            Expression::Power(f)
1804            | Expression::NullIf(f)
1805            | Expression::IfNull(f)
1806            | Expression::Nvl(f)
1807            | Expression::UnixToTimeStr(f)
1808            | Expression::Contains(f)
1809            | Expression::StartsWith(f)
1810            | Expression::EndsWith(f)
1811            | Expression::Levenshtein(f)
1812            | Expression::ModFunc(f)
1813            | Expression::Atan2(f)
1814            | Expression::IntDiv(f)
1815            | Expression::AddMonths(f)
1816            | Expression::MonthsBetween(f)
1817            | Expression::NextDay(f)
1818            | Expression::ArrayContains(f)
1819            | Expression::ArrayPosition(f)
1820            | Expression::ArrayAppend(f)
1821            | Expression::ArrayPrepend(f)
1822            | Expression::ArrayUnion(f)
1823            | Expression::ArrayExcept(f)
1824            | Expression::ArrayRemove(f)
1825            | Expression::StarMap(f)
1826            | Expression::MapFromArrays(f)
1827            | Expression::MapContainsKey(f)
1828            | Expression::ElementAt(f)
1829            | Expression::JsonMergePatch(f)
1830            | Expression::JSONBContains(f)
1831            | Expression::JSONBExtract(f) => {
1832                stack.push(&f.this);
1833                stack.push(&f.expression);
1834            }
1835
1836            // === VarArgFunc variants: expressions ===
1837            Expression::Greatest(f)
1838            | Expression::Least(f)
1839            | Expression::Coalesce(f)
1840            | Expression::ArrayConcat(f)
1841            | Expression::ArrayIntersect(f)
1842            | Expression::ArrayZip(f)
1843            | Expression::MapConcat(f)
1844            | Expression::JsonArray(f) => {
1845                for e in &f.expressions {
1846                    stack.push(e);
1847                }
1848            }
1849
1850            // === AggFunc variants: this, filter, having_max, limit ===
1851            Expression::Sum(f)
1852            | Expression::Avg(f)
1853            | Expression::Min(f)
1854            | Expression::Max(f)
1855            | Expression::ArrayAgg(f)
1856            | Expression::CountIf(f)
1857            | Expression::Stddev(f)
1858            | Expression::StddevPop(f)
1859            | Expression::StddevSamp(f)
1860            | Expression::Variance(f)
1861            | Expression::VarPop(f)
1862            | Expression::VarSamp(f)
1863            | Expression::Median(f)
1864            | Expression::Mode(f)
1865            | Expression::First(f)
1866            | Expression::Last(f)
1867            | Expression::AnyValue(f)
1868            | Expression::ApproxDistinct(f)
1869            | Expression::ApproxCountDistinct(f)
1870            | Expression::LogicalAnd(f)
1871            | Expression::LogicalOr(f)
1872            | Expression::Skewness(f)
1873            | Expression::ArrayConcatAgg(f)
1874            | Expression::ArrayUniqueAgg(f)
1875            | Expression::BoolXorAgg(f)
1876            | Expression::BitwiseAndAgg(f)
1877            | Expression::BitwiseOrAgg(f)
1878            | Expression::BitwiseXorAgg(f) => {
1879                stack.push(&f.this);
1880                if let Some(ref filter) = f.filter {
1881                    stack.push(filter);
1882                }
1883                if let Some((ref expr, _)) = f.having_max {
1884                    stack.push(expr);
1885                }
1886                if let Some(ref limit) = f.limit {
1887                    stack.push(limit);
1888                }
1889            }
1890
1891            // === Generic Function / AggregateFunction: args ===
1892            Expression::Function(func) => {
1893                for arg in &func.args {
1894                    stack.push(arg);
1895                }
1896            }
1897            Expression::AggregateFunction(func) => {
1898                for arg in &func.args {
1899                    stack.push(arg);
1900                }
1901                if let Some(ref filter) = func.filter {
1902                    stack.push(filter);
1903                }
1904                if let Some(ref limit) = func.limit {
1905                    stack.push(limit);
1906                }
1907            }
1908
1909            // === WindowFunction: this (skip Over for lineage purposes) ===
1910            Expression::WindowFunction(wf) => {
1911                stack.push(&wf.this);
1912            }
1913
1914            // === Containers and special expressions ===
1915            Expression::Alias(a) => {
1916                stack.push(&a.this);
1917            }
1918            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1919                stack.push(&c.this);
1920                if let Some(ref fmt) = c.format {
1921                    stack.push(fmt);
1922                }
1923                if let Some(ref def) = c.default {
1924                    stack.push(def);
1925                }
1926            }
1927            Expression::Paren(p) => {
1928                stack.push(&p.this);
1929            }
1930            Expression::Annotated(a) => {
1931                stack.push(&a.this);
1932            }
1933            Expression::Case(case) => {
1934                if let Some(ref operand) = case.operand {
1935                    stack.push(operand);
1936                }
1937                for (cond, result) in &case.whens {
1938                    stack.push(cond);
1939                    stack.push(result);
1940                }
1941                if let Some(ref else_expr) = case.else_ {
1942                    stack.push(else_expr);
1943                }
1944            }
1945            Expression::Collation(c) => {
1946                stack.push(&c.this);
1947            }
1948            Expression::In(i) => {
1949                stack.push(&i.this);
1950                for e in &i.expressions {
1951                    stack.push(e);
1952                }
1953                if let Some(ref q) = i.query {
1954                    stack.push(q);
1955                }
1956                if let Some(ref u) = i.unnest {
1957                    stack.push(u);
1958                }
1959            }
1960            Expression::Between(b) => {
1961                stack.push(&b.this);
1962                stack.push(&b.low);
1963                stack.push(&b.high);
1964            }
1965            Expression::IsNull(n) => {
1966                stack.push(&n.this);
1967            }
1968            Expression::IsTrue(t) | Expression::IsFalse(t) => {
1969                stack.push(&t.this);
1970            }
1971            Expression::IsJson(j) => {
1972                stack.push(&j.this);
1973            }
1974            Expression::Like(l) | Expression::ILike(l) => {
1975                stack.push(&l.left);
1976                stack.push(&l.right);
1977                if let Some(ref esc) = l.escape {
1978                    stack.push(esc);
1979                }
1980            }
1981            Expression::SimilarTo(s) => {
1982                stack.push(&s.this);
1983                stack.push(&s.pattern);
1984                if let Some(ref esc) = s.escape {
1985                    stack.push(esc);
1986                }
1987            }
1988            Expression::Ordered(o) => {
1989                stack.push(&o.this);
1990            }
1991            Expression::Array(a) => {
1992                for e in &a.expressions {
1993                    stack.push(e);
1994                }
1995            }
1996            Expression::Tuple(t) => {
1997                for e in &t.expressions {
1998                    stack.push(e);
1999                }
2000            }
2001            Expression::Struct(s) => {
2002                for (_, e) in &s.fields {
2003                    stack.push(e);
2004                }
2005            }
2006            Expression::Subscript(s) => {
2007                stack.push(&s.this);
2008                stack.push(&s.index);
2009            }
2010            Expression::Dot(d) => {
2011                stack.push(&d.this);
2012            }
2013            Expression::MethodCall(m) => {
2014                if !matches!(dialect, Some(DialectType::BigQuery))
2015                    || !is_bigquery_safe_namespace_receiver(&m.this)
2016                {
2017                    stack.push(&m.this);
2018                }
2019                for arg in &m.args {
2020                    stack.push(arg);
2021                }
2022            }
2023            Expression::ArraySlice(s) => {
2024                stack.push(&s.this);
2025                if let Some(ref start) = s.start {
2026                    stack.push(start);
2027                }
2028                if let Some(ref end) = s.end {
2029                    stack.push(end);
2030                }
2031            }
2032            Expression::Lambda(l) => {
2033                stack.push(&l.body);
2034            }
2035            Expression::NamedArgument(n) => {
2036                stack.push(&n.value);
2037            }
2038            Expression::Lateral(l) => {
2039                stack.push(&l.this);
2040                if let Some(ref view) = l.view {
2041                    stack.push(view);
2042                }
2043                if let Some(ref outer) = l.outer {
2044                    stack.push(outer);
2045                }
2046                if let Some(ref ordinality) = l.ordinality {
2047                    stack.push(ordinality);
2048                }
2049            }
2050            Expression::LateralView(lv) => {
2051                stack.push(&lv.this);
2052            }
2053            Expression::TryCatch(t) => {
2054                for stmt in &t.try_body {
2055                    stack.push(stmt);
2056                }
2057                if let Some(catch_body) = &t.catch_body {
2058                    for stmt in catch_body {
2059                        stack.push(stmt);
2060                    }
2061                }
2062            }
2063            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
2064                stack.push(e);
2065            }
2066
2067            // === Custom function structs ===
2068            Expression::Substring(f) => {
2069                stack.push(&f.this);
2070                stack.push(&f.start);
2071                if let Some(ref len) = f.length {
2072                    stack.push(len);
2073                }
2074            }
2075            Expression::Trim(f) => {
2076                stack.push(&f.this);
2077                if let Some(ref chars) = f.characters {
2078                    stack.push(chars);
2079                }
2080            }
2081            Expression::Replace(f) => {
2082                stack.push(&f.this);
2083                stack.push(&f.old);
2084                stack.push(&f.new);
2085            }
2086            Expression::IfFunc(f) => {
2087                stack.push(&f.condition);
2088                stack.push(&f.true_value);
2089                if let Some(ref fv) = f.false_value {
2090                    stack.push(fv);
2091                }
2092            }
2093            Expression::Nvl2(f) => {
2094                stack.push(&f.this);
2095                stack.push(&f.true_value);
2096                stack.push(&f.false_value);
2097            }
2098            Expression::ConcatWs(f) => {
2099                stack.push(&f.separator);
2100                for e in &f.expressions {
2101                    stack.push(e);
2102                }
2103            }
2104            Expression::Count(f) => {
2105                if let Some(ref this) = f.this {
2106                    stack.push(this);
2107                }
2108                if let Some(ref filter) = f.filter {
2109                    stack.push(filter);
2110                }
2111            }
2112            Expression::GroupConcat(f) => {
2113                stack.push(&f.this);
2114                if let Some(ref sep) = f.separator {
2115                    stack.push(sep);
2116                }
2117                if let Some(ref filter) = f.filter {
2118                    stack.push(filter);
2119                }
2120            }
2121            Expression::StringAgg(f) => {
2122                stack.push(&f.this);
2123                if let Some(ref sep) = f.separator {
2124                    stack.push(sep);
2125                }
2126                if let Some(ref filter) = f.filter {
2127                    stack.push(filter);
2128                }
2129                if let Some(ref limit) = f.limit {
2130                    stack.push(limit);
2131                }
2132            }
2133            Expression::ListAgg(f) => {
2134                stack.push(&f.this);
2135                if let Some(ref sep) = f.separator {
2136                    stack.push(sep);
2137                }
2138                if let Some(ref filter) = f.filter {
2139                    stack.push(filter);
2140                }
2141            }
2142            Expression::SumIf(f) => {
2143                stack.push(&f.this);
2144                stack.push(&f.condition);
2145                if let Some(ref filter) = f.filter {
2146                    stack.push(filter);
2147                }
2148            }
2149            Expression::DateAdd(f) | Expression::DateSub(f) => {
2150                stack.push(&f.this);
2151                stack.push(&f.interval);
2152            }
2153            Expression::DateDiff(f) => {
2154                stack.push(&f.this);
2155                stack.push(&f.expression);
2156            }
2157            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
2158                stack.push(&f.this);
2159            }
2160            Expression::Extract(f) => {
2161                stack.push(&f.this);
2162            }
2163            Expression::Round(f) => {
2164                stack.push(&f.this);
2165                if let Some(ref d) = f.decimals {
2166                    stack.push(d);
2167                }
2168            }
2169            Expression::Floor(f) => {
2170                stack.push(&f.this);
2171                if let Some(ref s) = f.scale {
2172                    stack.push(s);
2173                }
2174                if let Some(ref t) = f.to {
2175                    stack.push(t);
2176                }
2177            }
2178            Expression::Ceil(f) => {
2179                stack.push(&f.this);
2180                if let Some(ref d) = f.decimals {
2181                    stack.push(d);
2182                }
2183                if let Some(ref t) = f.to {
2184                    stack.push(t);
2185                }
2186            }
2187            Expression::Log(f) => {
2188                stack.push(&f.this);
2189                if let Some(ref b) = f.base {
2190                    stack.push(b);
2191                }
2192            }
2193            Expression::AtTimeZone(f) => {
2194                stack.push(&f.this);
2195                stack.push(&f.zone);
2196            }
2197            Expression::Lead(f) | Expression::Lag(f) => {
2198                stack.push(&f.this);
2199                if let Some(ref off) = f.offset {
2200                    stack.push(off);
2201                }
2202                if let Some(ref def) = f.default {
2203                    stack.push(def);
2204                }
2205            }
2206            Expression::FirstValue(f) | Expression::LastValue(f) => {
2207                stack.push(&f.this);
2208            }
2209            Expression::NthValue(f) => {
2210                stack.push(&f.this);
2211                stack.push(&f.offset);
2212            }
2213            Expression::Position(f) => {
2214                stack.push(&f.substring);
2215                stack.push(&f.string);
2216                if let Some(ref start) = f.start {
2217                    stack.push(start);
2218                }
2219            }
2220            Expression::Decode(f) => {
2221                stack.push(&f.this);
2222                for (search, result) in &f.search_results {
2223                    stack.push(search);
2224                    stack.push(result);
2225                }
2226                if let Some(ref def) = f.default {
2227                    stack.push(def);
2228                }
2229            }
2230            Expression::CharFunc(f) => {
2231                for arg in &f.args {
2232                    stack.push(arg);
2233                }
2234            }
2235            Expression::ArraySort(f) => {
2236                stack.push(&f.this);
2237                if let Some(ref cmp) = f.comparator {
2238                    stack.push(cmp);
2239                }
2240            }
2241            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2242                stack.push(&f.this);
2243                stack.push(&f.separator);
2244                if let Some(ref nr) = f.null_replacement {
2245                    stack.push(nr);
2246                }
2247            }
2248            Expression::ArrayFilter(f) => {
2249                stack.push(&f.this);
2250                stack.push(&f.filter);
2251            }
2252            Expression::ArrayTransform(f) => {
2253                stack.push(&f.this);
2254                stack.push(&f.transform);
2255            }
2256            Expression::Sequence(f)
2257            | Expression::Generate(f)
2258            | Expression::ExplodingGenerateSeries(f) => {
2259                stack.push(&f.start);
2260                stack.push(&f.stop);
2261                if let Some(ref step) = f.step {
2262                    stack.push(step);
2263                }
2264            }
2265            Expression::JsonExtract(f)
2266            | Expression::JsonExtractScalar(f)
2267            | Expression::JsonQuery(f)
2268            | Expression::JsonValue(f) => {
2269                stack.push(&f.this);
2270                stack.push(&f.path);
2271            }
2272            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2273                stack.push(&f.this);
2274                for p in &f.paths {
2275                    stack.push(p);
2276                }
2277            }
2278            Expression::JsonObject(f) => {
2279                for (k, v) in &f.pairs {
2280                    stack.push(k);
2281                    stack.push(v);
2282                }
2283            }
2284            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2285                stack.push(&f.this);
2286                for (path, val) in &f.path_values {
2287                    stack.push(path);
2288                    stack.push(val);
2289                }
2290            }
2291            Expression::Overlay(f) => {
2292                stack.push(&f.this);
2293                stack.push(&f.replacement);
2294                stack.push(&f.from);
2295                if let Some(ref len) = f.length {
2296                    stack.push(len);
2297                }
2298            }
2299            Expression::Convert(f) => {
2300                stack.push(&f.this);
2301                if let Some(ref style) = f.style {
2302                    stack.push(style);
2303                }
2304            }
2305            Expression::ApproxPercentile(f) => {
2306                stack.push(&f.this);
2307                stack.push(&f.percentile);
2308                if let Some(ref acc) = f.accuracy {
2309                    stack.push(acc);
2310                }
2311                if let Some(ref filter) = f.filter {
2312                    stack.push(filter);
2313                }
2314            }
2315            Expression::Percentile(f)
2316            | Expression::PercentileCont(f)
2317            | Expression::PercentileDisc(f) => {
2318                stack.push(&f.this);
2319                stack.push(&f.percentile);
2320                if let Some(ref filter) = f.filter {
2321                    stack.push(filter);
2322                }
2323            }
2324            Expression::WithinGroup(f) => {
2325                stack.push(&f.this);
2326            }
2327            Expression::Left(f) | Expression::Right(f) => {
2328                stack.push(&f.this);
2329                stack.push(&f.length);
2330            }
2331            Expression::Repeat(f) => {
2332                stack.push(&f.this);
2333                stack.push(&f.times);
2334            }
2335            Expression::Lpad(f) | Expression::Rpad(f) => {
2336                stack.push(&f.this);
2337                stack.push(&f.length);
2338                if let Some(ref fill) = f.fill {
2339                    stack.push(fill);
2340                }
2341            }
2342            Expression::Split(f) => {
2343                stack.push(&f.this);
2344                stack.push(&f.delimiter);
2345            }
2346            Expression::RegexpLike(f) => {
2347                stack.push(&f.this);
2348                stack.push(&f.pattern);
2349                if let Some(ref flags) = f.flags {
2350                    stack.push(flags);
2351                }
2352            }
2353            Expression::RegexpReplace(f) => {
2354                stack.push(&f.this);
2355                stack.push(&f.pattern);
2356                stack.push(&f.replacement);
2357                if let Some(ref flags) = f.flags {
2358                    stack.push(flags);
2359                }
2360            }
2361            Expression::RegexpExtract(f) => {
2362                stack.push(&f.this);
2363                stack.push(&f.pattern);
2364                if let Some(ref group) = f.group {
2365                    stack.push(group);
2366                }
2367            }
2368            Expression::ToDate(f) => {
2369                stack.push(&f.this);
2370                if let Some(ref fmt) = f.format {
2371                    stack.push(fmt);
2372                }
2373            }
2374            Expression::ToTimestamp(f) => {
2375                stack.push(&f.this);
2376                if let Some(ref fmt) = f.format {
2377                    stack.push(fmt);
2378                }
2379            }
2380            Expression::DateFormat(f) | Expression::FormatDate(f) => {
2381                stack.push(&f.this);
2382                stack.push(&f.format);
2383            }
2384            Expression::LastDay(f) => {
2385                stack.push(&f.this);
2386            }
2387            Expression::FromUnixtime(f) => {
2388                stack.push(&f.this);
2389                if let Some(ref fmt) = f.format {
2390                    stack.push(fmt);
2391                }
2392            }
2393            Expression::UnixTimestamp(f) => {
2394                if let Some(ref this) = f.this {
2395                    stack.push(this);
2396                }
2397                if let Some(ref fmt) = f.format {
2398                    stack.push(fmt);
2399                }
2400            }
2401            Expression::MakeDate(f) => {
2402                stack.push(&f.year);
2403                stack.push(&f.month);
2404                stack.push(&f.day);
2405            }
2406            Expression::MakeTimestamp(f) => {
2407                stack.push(&f.year);
2408                stack.push(&f.month);
2409                stack.push(&f.day);
2410                stack.push(&f.hour);
2411                stack.push(&f.minute);
2412                stack.push(&f.second);
2413                if let Some(ref tz) = f.timezone {
2414                    stack.push(tz);
2415                }
2416            }
2417            Expression::TruncFunc(f) => {
2418                stack.push(&f.this);
2419                if let Some(ref d) = f.decimals {
2420                    stack.push(d);
2421                }
2422            }
2423            Expression::ArrayFunc(f) => {
2424                for e in &f.expressions {
2425                    stack.push(e);
2426                }
2427            }
2428            Expression::Unnest(f) => {
2429                stack.push(&f.this);
2430                for e in &f.expressions {
2431                    stack.push(e);
2432                }
2433            }
2434            Expression::StructFunc(f) => {
2435                for (_, e) in &f.fields {
2436                    stack.push(e);
2437                }
2438            }
2439            Expression::StructExtract(f) => {
2440                stack.push(&f.this);
2441            }
2442            Expression::NamedStruct(f) => {
2443                for (k, v) in &f.pairs {
2444                    stack.push(k);
2445                    stack.push(v);
2446                }
2447            }
2448            Expression::MapFunc(f) => {
2449                for k in &f.keys {
2450                    stack.push(k);
2451                }
2452                for v in &f.values {
2453                    stack.push(v);
2454                }
2455            }
2456            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2457                stack.push(&f.this);
2458                stack.push(&f.transform);
2459            }
2460            Expression::JsonArrayAgg(f) => {
2461                stack.push(&f.this);
2462                if let Some(ref filter) = f.filter {
2463                    stack.push(filter);
2464                }
2465            }
2466            Expression::JsonObjectAgg(f) => {
2467                stack.push(&f.key);
2468                stack.push(&f.value);
2469                if let Some(ref filter) = f.filter {
2470                    stack.push(filter);
2471                }
2472            }
2473            Expression::NTile(f) => {
2474                if let Some(ref n) = f.num_buckets {
2475                    stack.push(n);
2476                }
2477            }
2478            Expression::Rand(f) => {
2479                if let Some(ref s) = f.seed {
2480                    stack.push(s);
2481                }
2482                if let Some(ref lo) = f.lower {
2483                    stack.push(lo);
2484                }
2485                if let Some(ref hi) = f.upper {
2486                    stack.push(hi);
2487                }
2488            }
2489            Expression::Any(q) | Expression::All(q) => {
2490                stack.push(&q.this);
2491                stack.push(&q.subquery);
2492            }
2493            Expression::Overlaps(o) => {
2494                if let Some(ref this) = o.this {
2495                    stack.push(this);
2496                }
2497                if let Some(ref expr) = o.expression {
2498                    stack.push(expr);
2499                }
2500                if let Some(ref ls) = o.left_start {
2501                    stack.push(ls);
2502                }
2503                if let Some(ref le) = o.left_end {
2504                    stack.push(le);
2505                }
2506                if let Some(ref rs) = o.right_start {
2507                    stack.push(rs);
2508                }
2509                if let Some(ref re) = o.right_end {
2510                    stack.push(re);
2511                }
2512            }
2513            Expression::Interval(i) => {
2514                if let Some(ref this) = i.this {
2515                    stack.push(this);
2516                }
2517            }
2518            Expression::TimeStrToTime(f) => {
2519                stack.push(&f.this);
2520                if let Some(ref zone) = f.zone {
2521                    stack.push(zone);
2522                }
2523            }
2524            Expression::JSONBExtractScalar(f) => {
2525                stack.push(&f.this);
2526                stack.push(&f.expression);
2527                if let Some(ref jt) = f.json_type {
2528                    stack.push(jt);
2529                }
2530            }
2531            Expression::JSONExtract(f) => {
2532                stack.push(&f.this);
2533                stack.push(&f.expression);
2534                for e in &f.expressions {
2535                    stack.push(e);
2536                }
2537                if let Some(ref option) = f.option {
2538                    stack.push(option);
2539                }
2540                if let Some(ref on_condition) = f.on_condition {
2541                    stack.push(on_condition);
2542                }
2543            }
2544
2545            // === True leaves and non-expression-bearing nodes ===
2546            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
2547            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
2548            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
2549            _ => {}
2550        }
2551    }
2552}
2553
2554// ---------------------------------------------------------------------------
2555// Tests
2556// ---------------------------------------------------------------------------
2557
2558#[cfg(test)]
2559mod tests {
2560    use super::*;
2561    use crate::dialects::{Dialect, DialectType};
2562    use crate::expressions::DataType;
2563    use crate::optimizer::annotate_types::annotate_types;
2564    use crate::parse_one;
2565    use crate::schema::{MappingSchema, Schema};
2566
2567    fn parse(sql: &str) -> Expression {
2568        let dialect = Dialect::get(DialectType::Generic);
2569        let ast = dialect.parse(sql).unwrap();
2570        ast.into_iter().next().unwrap()
2571    }
2572
2573    #[test]
2574    fn test_simple_lineage() {
2575        let expr = parse("SELECT a FROM t");
2576        let node = lineage("a", &expr, None, false).unwrap();
2577
2578        assert_eq!(node.name, "a");
2579        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2580        // Should trace to t.a
2581        let names = node.downstream_names();
2582        assert!(
2583            names.iter().any(|n| n == "t.a"),
2584            "Expected t.a in downstream, got: {:?}",
2585            names
2586        );
2587    }
2588
2589    #[test]
2590    fn test_lineage_walk() {
2591        let root = LineageNode {
2592            name: "col_a".to_string(),
2593            expression: Expression::Null(crate::expressions::Null),
2594            source: Expression::Null(crate::expressions::Null),
2595            downstream: vec![LineageNode::new(
2596                "t.a",
2597                Expression::Null(crate::expressions::Null),
2598                Expression::Null(crate::expressions::Null),
2599            )],
2600            source_name: String::new(),
2601            source_kind: SourceKind::Unknown,
2602            source_alias: None,
2603            reference_node_name: String::new(),
2604        };
2605
2606        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2607        assert_eq!(names.len(), 2);
2608        assert_eq!(names[0], "col_a");
2609        assert_eq!(names[1], "t.a");
2610    }
2611
2612    #[test]
2613    fn test_aliased_column() {
2614        let expr = parse("SELECT a + 1 AS b FROM t");
2615        let node = lineage("b", &expr, None, false).unwrap();
2616
2617        assert_eq!(node.name, "b");
2618        // Should trace through the expression to t.a
2619        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2620        assert!(
2621            all_names.iter().any(|n| n.contains("a")),
2622            "Expected to trace to column a, got: {:?}",
2623            all_names
2624        );
2625    }
2626
2627    #[test]
2628    fn test_qualified_column() {
2629        let expr = parse("SELECT t.a FROM t");
2630        let node = lineage("a", &expr, None, false).unwrap();
2631
2632        assert_eq!(node.name, "a");
2633        let names = node.downstream_names();
2634        assert!(
2635            names.iter().any(|n| n == "t.a"),
2636            "Expected t.a, got: {:?}",
2637            names
2638        );
2639    }
2640
2641    #[test]
2642    fn test_unqualified_column() {
2643        let expr = parse("SELECT a FROM t");
2644        let node = lineage("a", &expr, None, false).unwrap();
2645
2646        // Unqualified but single source → resolved to t.a
2647        let names = node.downstream_names();
2648        assert!(
2649            names.iter().any(|n| n == "t.a"),
2650            "Expected t.a, got: {:?}",
2651            names
2652        );
2653    }
2654
2655    #[test]
2656    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2657        let query = "SELECT name FROM users";
2658        let dialect = Dialect::get(DialectType::BigQuery);
2659        let expr = dialect
2660            .parse(query)
2661            .unwrap()
2662            .into_iter()
2663            .next()
2664            .expect("expected one expression");
2665
2666        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2667        schema
2668            .add_table("users", &[("name".into(), DataType::Text)], None)
2669            .expect("schema setup");
2670
2671        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2672            .expect("lineage without schema");
2673        let mut expr_without = node_without_schema.expression.clone();
2674        annotate_types(
2675            &mut expr_without,
2676            Some(&schema),
2677            Some(DialectType::BigQuery),
2678        );
2679        assert_eq!(
2680            expr_without.inferred_type(),
2681            None,
2682            "Expected unresolved root type without schema-aware lineage qualification"
2683        );
2684
2685        let node_with_schema = lineage_with_schema(
2686            "name",
2687            &expr,
2688            Some(&schema),
2689            Some(DialectType::BigQuery),
2690            false,
2691        )
2692        .expect("lineage with schema");
2693        let mut expr_with = node_with_schema.expression.clone();
2694        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2695
2696        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2697    }
2698
2699    #[test]
2700    fn test_lineage_with_schema_correlated_scalar_subquery() {
2701        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2702        let dialect = Dialect::get(DialectType::BigQuery);
2703        let expr = dialect
2704            .parse(query)
2705            .unwrap()
2706            .into_iter()
2707            .next()
2708            .expect("expected one expression");
2709
2710        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2711        schema
2712            .add_table(
2713                "t1",
2714                &[("id".into(), DataType::BigInt { length: None })],
2715                None,
2716            )
2717            .expect("schema setup");
2718        schema
2719            .add_table(
2720                "t2",
2721                &[
2722                    ("id".into(), DataType::BigInt { length: None }),
2723                    ("val".into(), DataType::BigInt { length: None }),
2724                ],
2725                None,
2726            )
2727            .expect("schema setup");
2728
2729        let node = lineage_with_schema(
2730            "id",
2731            &expr,
2732            Some(&schema),
2733            Some(DialectType::BigQuery),
2734            false,
2735        )
2736        .expect("lineage_with_schema should handle correlated scalar subqueries");
2737
2738        assert_eq!(node.name, "id");
2739    }
2740
2741    #[test]
2742    fn test_lineage_with_schema_join_using() {
2743        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2744        let dialect = Dialect::get(DialectType::BigQuery);
2745        let expr = dialect
2746            .parse(query)
2747            .unwrap()
2748            .into_iter()
2749            .next()
2750            .expect("expected one expression");
2751
2752        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2753        schema
2754            .add_table(
2755                "t1",
2756                &[("a".into(), DataType::BigInt { length: None })],
2757                None,
2758            )
2759            .expect("schema setup");
2760        schema
2761            .add_table(
2762                "t2",
2763                &[("a".into(), DataType::BigInt { length: None })],
2764                None,
2765            )
2766            .expect("schema setup");
2767
2768        let node = lineage_with_schema(
2769            "a",
2770            &expr,
2771            Some(&schema),
2772            Some(DialectType::BigQuery),
2773            false,
2774        )
2775        .expect("lineage_with_schema should handle JOIN USING");
2776
2777        assert_eq!(node.name, "a");
2778    }
2779
2780    #[test]
2781    fn test_lineage_with_schema_qualified_table_name() {
2782        let query = "SELECT a FROM raw.t1";
2783        let dialect = Dialect::get(DialectType::BigQuery);
2784        let expr = dialect
2785            .parse(query)
2786            .unwrap()
2787            .into_iter()
2788            .next()
2789            .expect("expected one expression");
2790
2791        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2792        schema
2793            .add_table(
2794                "raw.t1",
2795                &[("a".into(), DataType::BigInt { length: None })],
2796                None,
2797            )
2798            .expect("schema setup");
2799
2800        let node = lineage_with_schema(
2801            "a",
2802            &expr,
2803            Some(&schema),
2804            Some(DialectType::BigQuery),
2805            false,
2806        )
2807        .expect("lineage_with_schema should handle dotted schema.table names");
2808
2809        assert_eq!(node.name, "a");
2810    }
2811
2812    #[test]
2813    fn test_lineage_with_schema_none_matches_lineage() {
2814        let expr = parse("SELECT a FROM t");
2815        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2816        let with_none =
2817            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2818
2819        assert_eq!(with_none.name, baseline.name);
2820        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2821    }
2822
2823    #[test]
2824    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2825        let dialect = Dialect::get(DialectType::BigQuery);
2826        let expr = dialect
2827            .parse("SELECT Name AS name FROM teams")
2828            .unwrap()
2829            .into_iter()
2830            .next()
2831            .expect("expected one expression");
2832
2833        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2834        schema
2835            .add_table(
2836                "teams",
2837                &[("Name".into(), DataType::String { length: None })],
2838                None,
2839            )
2840            .expect("schema setup");
2841
2842        let node = lineage_with_schema(
2843            "name",
2844            &expr,
2845            Some(&schema),
2846            Some(DialectType::BigQuery),
2847            false,
2848        )
2849        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2850
2851        let names = node.downstream_names();
2852        assert!(
2853            names.iter().any(|n| n == "teams.Name"),
2854            "Expected teams.Name in downstream, got: {:?}",
2855            names
2856        );
2857    }
2858
2859    #[test]
2860    fn test_lineage_bigquery_mixed_case_alias_lookup() {
2861        let dialect = Dialect::get(DialectType::BigQuery);
2862        let expr = dialect
2863            .parse("SELECT Name AS Name FROM teams")
2864            .unwrap()
2865            .into_iter()
2866            .next()
2867            .expect("expected one expression");
2868
2869        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2870            .expect("lineage should resolve mixed-case aliases in BigQuery");
2871
2872        assert_eq!(node.name, "name");
2873    }
2874
2875    #[test]
2876    fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2877        let expr = parse_one(
2878            r#"
2879SELECT date_val AS week_start
2880FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2881"#,
2882            DialectType::BigQuery,
2883        )
2884        .expect("parse");
2885
2886        let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2887            .expect("lineage should resolve UNNEST alias as a source");
2888        let child = node
2889            .downstream
2890            .first()
2891            .expect("week_start should have downstream lineage");
2892
2893        assert_eq!(child.name, "_0.date_val");
2894        assert_eq!(child.source_name, "_0");
2895        assert_eq!(child.source_kind, SourceKind::Virtual);
2896        assert_eq!(child.source_alias.as_deref(), Some("date_val"));
2897
2898        let Expression::Column(column) = &child.expression else {
2899            panic!(
2900                "expected downstream column expression, got {:?}",
2901                child.expression
2902            );
2903        };
2904        assert_eq!(column.name.name, "date_val");
2905        assert_eq!(
2906            column.table.as_ref().map(|table| table.name.as_str()),
2907            Some("_0")
2908        );
2909        assert!(
2910            matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
2911            "expected UNNEST source expression, got {:?}",
2912            child.source
2913        );
2914    }
2915
2916    #[test]
2917    fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
2918        let expr =
2919            parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
2920
2921        let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2922        let child = node.downstream.first().expect("id should have lineage");
2923
2924        assert_eq!(child.name, "date_val.id");
2925        assert_eq!(child.source_name, "date_val");
2926        assert_eq!(child.source_kind, SourceKind::Table);
2927        assert_eq!(child.source_alias, None);
2928    }
2929
2930    #[test]
2931    fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
2932        let expr = parse_one(
2933            r#"
2934SELECT a.a AS first_value, b.b AS second_value
2935FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
2936JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
2937"#,
2938            DialectType::BigQuery,
2939        )
2940        .expect("parse");
2941
2942        let first =
2943            lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2944        let second =
2945            lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2946
2947        let first_child = first.downstream.first().expect("first source");
2948        let second_child = second.downstream.first().expect("second source");
2949
2950        assert_eq!(first_child.name, "_0.a");
2951        assert_eq!(first_child.source_name, "_0");
2952        assert_eq!(first_child.source_alias.as_deref(), Some("a"));
2953        assert_eq!(first_child.source_kind, SourceKind::Virtual);
2954
2955        assert_eq!(second_child.name, "_1.b");
2956        assert_eq!(second_child.source_name, "_1");
2957        assert_eq!(second_child.source_alias.as_deref(), Some("b"));
2958        assert_eq!(second_child.source_kind, SourceKind::Virtual);
2959    }
2960
2961    #[test]
2962    fn test_lineage_table_backed_unnest_points_to_real_source_column() {
2963        let expr = parse_one(
2964            r#"
2965SELECT item.item AS item
2966FROM t JOIN UNNEST(t.items) AS item ON TRUE
2967"#,
2968            DialectType::BigQuery,
2969        )
2970        .expect("parse");
2971
2972        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2973        let virtual_child = node.downstream.first().expect("virtual item source");
2974        assert_eq!(virtual_child.name, "_0.item");
2975        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2976
2977        let real_child = virtual_child
2978            .downstream
2979            .first()
2980            .expect("UNNEST(t.items) should depend on t.items");
2981        assert_eq!(real_child.name, "t.items");
2982        assert_eq!(real_child.source_name, "t");
2983        assert_eq!(real_child.source_kind, SourceKind::Table);
2984    }
2985
2986    #[test]
2987    fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
2988        let expr = parse_one(
2989            r#"
2990SELECT item AS item
2991FROM t JOIN UNNEST(t.items) AS item ON TRUE
2992"#,
2993            DialectType::BigQuery,
2994        )
2995        .expect("parse");
2996
2997        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2998        let virtual_child = node.downstream.first().expect("virtual item source");
2999        assert_eq!(virtual_child.name, "_0.item");
3000        assert_eq!(virtual_child.source_name, "_0");
3001        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3002        assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
3003
3004        let real_child = virtual_child
3005            .downstream
3006            .first()
3007            .expect("UNNEST(t.items) should depend on t.items");
3008        assert_eq!(real_child.name, "t.items");
3009        assert_eq!(real_child.source_name, "t");
3010        assert_eq!(real_child.source_kind, SourceKind::Table);
3011    }
3012
3013    #[test]
3014    fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
3015        let cases = [
3016            (
3017                DialectType::PostgreSQL,
3018                "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
3019            ),
3020            (
3021                DialectType::Presto,
3022                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3023            ),
3024            (
3025                DialectType::Trino,
3026                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3027            ),
3028        ];
3029
3030        for (dialect, sql) in cases {
3031            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3032            let node = lineage("out", &expr, Some(dialect), false)
3033                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3034            let virtual_child = node
3035                .downstream
3036                .first()
3037                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3038
3039            assert_eq!(
3040                virtual_child.name, "_0.x",
3041                "unexpected virtual child for {dialect:?}"
3042            );
3043            assert_eq!(virtual_child.source_name, "_0");
3044            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3045            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3046
3047            let real_child = virtual_child
3048                .downstream
3049                .first()
3050                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3051            assert_eq!(real_child.name, "t.items");
3052            assert_eq!(real_child.source_kind, SourceKind::Table);
3053        }
3054    }
3055
3056    #[test]
3057    fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
3058        let cases = [
3059            (
3060                DialectType::Spark,
3061                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3062            ),
3063            (
3064                DialectType::Hive,
3065                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3066            ),
3067        ];
3068
3069        for (dialect, sql) in cases {
3070            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3071            let node = lineage("out", &expr, Some(dialect), false)
3072                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3073            let virtual_child = node
3074                .downstream
3075                .first()
3076                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3077
3078            assert_eq!(virtual_child.name, "_0.x");
3079            assert_eq!(virtual_child.source_name, "_0");
3080            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3081            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3082
3083            let real_child = virtual_child
3084                .downstream
3085                .first()
3086                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3087            assert_eq!(real_child.name, "t.items");
3088            assert_eq!(real_child.source_kind, SourceKind::Table);
3089        }
3090    }
3091
3092    #[test]
3093    fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
3094        let expr = parse_one(
3095            "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
3096            DialectType::Snowflake,
3097        )
3098        .expect("parse");
3099
3100        let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
3101        let virtual_child = node.downstream.first().expect("virtual flatten source");
3102        assert_eq!(virtual_child.name, "_0.value");
3103        assert_eq!(virtual_child.source_name, "_0");
3104        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3105        assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
3106
3107        let real_child = virtual_child
3108            .downstream
3109            .first()
3110            .expect("FLATTEN input should depend on raw_events.payload");
3111        assert_eq!(real_child.name, "raw_events.payload");
3112        assert_eq!(real_child.source_kind, SourceKind::Table);
3113    }
3114
3115    #[test]
3116    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
3117        let expr = parse_one(
3118            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3119            DialectType::Snowflake,
3120        )
3121        .expect("parse");
3122
3123        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3124        schema
3125            .add_table(
3126                "fact.some_daily_metrics",
3127                &[("date_utc".to_string(), DataType::Date)],
3128                None,
3129            )
3130            .expect("schema setup");
3131
3132        let node = lineage_with_schema(
3133            "recency",
3134            &expr,
3135            Some(&schema),
3136            Some(DialectType::Snowflake),
3137            false,
3138        )
3139        .expect("lineage_with_schema should not treat date part as a column");
3140
3141        let names = node.downstream_names();
3142        assert!(
3143            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3144            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3145            names
3146        );
3147        assert!(
3148            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3149            "Did not expect date part to appear as lineage column, got: {:?}",
3150            names
3151        );
3152    }
3153
3154    #[test]
3155    fn test_snowflake_datediff_parses_to_typed_ast() {
3156        let expr = parse_one(
3157            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3158            DialectType::Snowflake,
3159        )
3160        .expect("parse");
3161
3162        match expr {
3163            Expression::Select(select) => match &select.expressions[0] {
3164                Expression::Alias(alias) => match &alias.this {
3165                    Expression::DateDiff(f) => {
3166                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
3167                    }
3168                    other => panic!("expected DateDiff, got {other:?}"),
3169                },
3170                other => panic!("expected Alias, got {other:?}"),
3171            },
3172            other => panic!("expected Select, got {other:?}"),
3173        }
3174    }
3175
3176    #[test]
3177    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
3178        let expr = parse_one(
3179            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3180            DialectType::Snowflake,
3181        )
3182        .expect("parse");
3183
3184        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3185        schema
3186            .add_table(
3187                "fact.some_daily_metrics",
3188                &[("date_utc".to_string(), DataType::Date)],
3189                None,
3190            )
3191            .expect("schema setup");
3192
3193        let node = lineage_with_schema(
3194            "next_day",
3195            &expr,
3196            Some(&schema),
3197            Some(DialectType::Snowflake),
3198            false,
3199        )
3200        .expect("lineage_with_schema should not treat DATEADD date part as a column");
3201
3202        let names = node.downstream_names();
3203        assert!(
3204            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3205            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3206            names
3207        );
3208        assert!(
3209            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3210            "Did not expect date part to appear as lineage column, got: {:?}",
3211            names
3212        );
3213    }
3214
3215    #[test]
3216    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
3217        let expr = parse_one(
3218            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3219            DialectType::Snowflake,
3220        )
3221        .expect("parse");
3222
3223        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3224        schema
3225            .add_table(
3226                "fact.some_daily_metrics",
3227                &[("date_utc".to_string(), DataType::Date)],
3228                None,
3229            )
3230            .expect("schema setup");
3231
3232        let node = lineage_with_schema(
3233            "day_part",
3234            &expr,
3235            Some(&schema),
3236            Some(DialectType::Snowflake),
3237            false,
3238        )
3239        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
3240
3241        let names = node.downstream_names();
3242        assert!(
3243            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3244            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3245            names
3246        );
3247        assert!(
3248            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3249            "Did not expect date part to appear as lineage column, got: {:?}",
3250            names
3251        );
3252    }
3253
3254    #[test]
3255    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
3256        let expr = parse_one(
3257            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3258            DialectType::Snowflake,
3259        )
3260        .expect("parse");
3261
3262        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3263        schema
3264            .add_table(
3265                "fact.some_daily_metrics",
3266                &[("date_utc".to_string(), DataType::Date)],
3267                None,
3268            )
3269            .expect("schema setup");
3270
3271        let node = lineage_with_schema(
3272            "day_part",
3273            &expr,
3274            Some(&schema),
3275            Some(DialectType::Snowflake),
3276            false,
3277        )
3278        .expect("quoted DATE_PART should continue to work");
3279
3280        let names = node.downstream_names();
3281        assert!(
3282            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3283            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3284            names
3285        );
3286    }
3287
3288    #[test]
3289    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
3290        let expr = parse_one(
3291            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3292            DialectType::Snowflake,
3293        )
3294        .expect("parse");
3295
3296        match expr {
3297            Expression::Select(select) => match &select.expressions[0] {
3298                Expression::Alias(alias) => match &alias.this {
3299                    Expression::Function(f) => {
3300                        assert_eq!(f.name.to_uppercase(), "DATEADD");
3301                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3302                    }
3303                    other => panic!("expected generic DATEADD function, got {other:?}"),
3304                },
3305                other => panic!("expected Alias, got {other:?}"),
3306            },
3307            other => panic!("expected Select, got {other:?}"),
3308        }
3309    }
3310
3311    #[test]
3312    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
3313        let expr = parse_one(
3314            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3315            DialectType::Snowflake,
3316        )
3317        .expect("parse");
3318
3319        match expr {
3320            Expression::Select(select) => match &select.expressions[0] {
3321                Expression::Alias(alias) => match &alias.this {
3322                    Expression::Function(f) => {
3323                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
3324                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3325                    }
3326                    other => panic!("expected generic DATE_PART function, got {other:?}"),
3327                },
3328                other => panic!("expected Alias, got {other:?}"),
3329            },
3330            other => panic!("expected Select, got {other:?}"),
3331        }
3332    }
3333
3334    #[test]
3335    fn test_snowflake_date_part_string_literal_stays_generic_function() {
3336        let expr = parse_one(
3337            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3338            DialectType::Snowflake,
3339        )
3340        .expect("parse");
3341
3342        match expr {
3343            Expression::Select(select) => match &select.expressions[0] {
3344                Expression::Alias(alias) => match &alias.this {
3345                    Expression::Function(f) => {
3346                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
3347                    }
3348                    other => panic!("expected generic DATE_PART function, got {other:?}"),
3349                },
3350                other => panic!("expected Alias, got {other:?}"),
3351            },
3352            other => panic!("expected Select, got {other:?}"),
3353        }
3354    }
3355
3356    #[test]
3357    fn test_lineage_join() {
3358        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3359
3360        let node_a = lineage("a", &expr, None, false).unwrap();
3361        let names_a = node_a.downstream_names();
3362        assert!(
3363            names_a.iter().any(|n| n == "t.a"),
3364            "Expected t.a, got: {:?}",
3365            names_a
3366        );
3367
3368        let node_b = lineage("b", &expr, None, false).unwrap();
3369        let names_b = node_b.downstream_names();
3370        assert!(
3371            names_b.iter().any(|n| n == "s.b"),
3372            "Expected s.b, got: {:?}",
3373            names_b
3374        );
3375    }
3376
3377    #[test]
3378    fn test_lineage_alias_leaf_has_resolved_source_name() {
3379        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
3380        let node = lineage("col1", &expr, None, false).unwrap();
3381
3382        // Keep alias in the display lineage edge.
3383        let names = node.downstream_names();
3384        assert!(
3385            names.iter().any(|n| n == "t1.col1"),
3386            "Expected aliased column edge t1.col1, got: {:?}",
3387            names
3388        );
3389
3390        // Leaf should expose the resolved base table for consumers.
3391        let leaf = node
3392            .downstream
3393            .iter()
3394            .find(|n| n.name == "t1.col1")
3395            .expect("Expected t1.col1 leaf");
3396        assert_eq!(leaf.source_name, "table1");
3397        match &leaf.source {
3398            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
3399            _ => panic!("Expected leaf source to be a table expression"),
3400        }
3401    }
3402
3403    #[test]
3404    fn test_lineage_derived_table() {
3405        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
3406        let node = lineage("a", &expr, None, false).unwrap();
3407
3408        assert_eq!(node.name, "a");
3409        // Should trace through the derived table to t.a
3410        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3411        assert!(
3412            all_names.iter().any(|n| n == "t.a"),
3413            "Expected to trace through derived table to t.a, got: {:?}",
3414            all_names
3415        );
3416    }
3417
3418    #[test]
3419    fn test_lineage_cte() {
3420        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
3421        let node = lineage("a", &expr, None, false).unwrap();
3422
3423        assert_eq!(node.name, "a");
3424        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3425        assert!(
3426            all_names.iter().any(|n| n == "t.a"),
3427            "Expected to trace through CTE to t.a, got: {:?}",
3428            all_names
3429        );
3430    }
3431
3432    #[test]
3433    fn test_lineage_union() {
3434        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
3435        let node = lineage("a", &expr, None, false).unwrap();
3436
3437        assert_eq!(node.name, "a");
3438        // Should have 2 downstream branches
3439        assert_eq!(
3440            node.downstream.len(),
3441            2,
3442            "Expected 2 branches for UNION, got {}",
3443            node.downstream.len()
3444        );
3445    }
3446
3447    #[test]
3448    fn test_lineage_cte_union() {
3449        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
3450        let node = lineage("a", &expr, None, false).unwrap();
3451
3452        // Should trace through CTE into both UNION branches
3453        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3454        assert!(
3455            all_names.len() >= 3,
3456            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
3457            all_names
3458        );
3459    }
3460
3461    #[test]
3462    fn test_lineage_star() {
3463        let expr = parse("SELECT * FROM t");
3464        let node = lineage("*", &expr, None, false).unwrap();
3465
3466        assert_eq!(node.name, "*");
3467        // Should have downstream for table t
3468        assert!(
3469            !node.downstream.is_empty(),
3470            "Star should produce downstream nodes"
3471        );
3472    }
3473
3474    #[test]
3475    fn test_lineage_subquery_in_select() {
3476        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
3477        let node = lineage("x", &expr, None, false).unwrap();
3478
3479        assert_eq!(node.name, "x");
3480        // Should have traced into the scalar subquery
3481        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3482        assert!(
3483            all_names.len() >= 2,
3484            "Expected tracing into scalar subquery, got: {:?}",
3485            all_names
3486        );
3487    }
3488
3489    #[test]
3490    fn test_lineage_multiple_columns() {
3491        let expr = parse("SELECT a, b FROM t");
3492
3493        let node_a = lineage("a", &expr, None, false).unwrap();
3494        let node_b = lineage("b", &expr, None, false).unwrap();
3495
3496        assert_eq!(node_a.name, "a");
3497        assert_eq!(node_b.name, "b");
3498
3499        // Each should trace independently
3500        let names_a = node_a.downstream_names();
3501        let names_b = node_b.downstream_names();
3502        assert!(names_a.iter().any(|n| n == "t.a"));
3503        assert!(names_b.iter().any(|n| n == "t.b"));
3504    }
3505
3506    #[test]
3507    fn test_get_source_tables() {
3508        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3509        let node = lineage("a", &expr, None, false).unwrap();
3510
3511        let tables = get_source_tables(&node);
3512        assert!(
3513            tables.contains("t"),
3514            "Expected source table 't', got: {:?}",
3515            tables
3516        );
3517    }
3518
3519    #[test]
3520    fn test_lineage_column_not_found() {
3521        let expr = parse("SELECT a FROM t");
3522        let result = lineage("nonexistent", &expr, None, false);
3523        assert!(result.is_err());
3524    }
3525
3526    #[test]
3527    fn test_lineage_nested_cte() {
3528        let expr = parse(
3529            "WITH cte1 AS (SELECT a FROM t), \
3530             cte2 AS (SELECT a FROM cte1) \
3531             SELECT a FROM cte2",
3532        );
3533        let node = lineage("a", &expr, None, false).unwrap();
3534
3535        // Should trace through cte2 → cte1 → t
3536        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3537        assert!(
3538            all_names.len() >= 3,
3539            "Expected to trace through nested CTEs, got: {:?}",
3540            all_names
3541        );
3542    }
3543
3544    #[test]
3545    fn test_trim_selects_true() {
3546        let expr = parse("SELECT a, b, c FROM t");
3547        let node = lineage("a", &expr, None, true).unwrap();
3548
3549        // The source should be trimmed to only include 'a'
3550        if let Expression::Select(select) = &node.source {
3551            assert_eq!(
3552                select.expressions.len(),
3553                1,
3554                "Trimmed source should have 1 expression, got {}",
3555                select.expressions.len()
3556            );
3557        } else {
3558            panic!("Expected Select source");
3559        }
3560    }
3561
3562    #[test]
3563    fn test_trim_selects_false() {
3564        let expr = parse("SELECT a, b, c FROM t");
3565        let node = lineage("a", &expr, None, false).unwrap();
3566
3567        // The source should keep all columns
3568        if let Expression::Select(select) = &node.source {
3569            assert_eq!(
3570                select.expressions.len(),
3571                3,
3572                "Untrimmed source should have 3 expressions"
3573            );
3574        } else {
3575            panic!("Expected Select source");
3576        }
3577    }
3578
3579    #[test]
3580    fn test_lineage_expression_in_select() {
3581        let expr = parse("SELECT a + b AS c FROM t");
3582        let node = lineage("c", &expr, None, false).unwrap();
3583
3584        // Should trace to both a and b from t
3585        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3586        assert!(
3587            all_names.len() >= 3,
3588            "Expected to trace a + b to both columns, got: {:?}",
3589            all_names
3590        );
3591    }
3592
3593    #[test]
3594    fn test_set_operation_by_index() {
3595        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3596
3597        // Trace column "a" which is at index 0
3598        let node = lineage("a", &expr, None, false).unwrap();
3599
3600        // UNION branches should be traced by index
3601        assert_eq!(node.downstream.len(), 2);
3602    }
3603
3604    // --- Tests for column lineage inside function calls (issue #18) ---
3605
3606    fn print_node(node: &LineageNode, indent: usize) {
3607        let pad = "  ".repeat(indent);
3608        println!(
3609            "{pad}name={:?} source_name={:?}",
3610            node.name, node.source_name
3611        );
3612        for child in &node.downstream {
3613            print_node(child, indent + 1);
3614        }
3615    }
3616
3617    #[test]
3618    fn test_issue18_repro() {
3619        // Exact scenario from the issue
3620        let query = "SELECT UPPER(name) as upper_name FROM users";
3621        println!("Query: {query}\n");
3622
3623        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3624        let exprs = dialect.parse(query).unwrap();
3625        let expr = &exprs[0];
3626
3627        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3628        println!("lineage(\"upper_name\"):");
3629        print_node(&node, 1);
3630
3631        let names = node.downstream_names();
3632        assert!(
3633            names.iter().any(|n| n == "users.name"),
3634            "Expected users.name in downstream, got: {:?}",
3635            names
3636        );
3637    }
3638
3639    #[test]
3640    fn test_lineage_bigquery_safe_namespace_issue207() {
3641        let query = r#"
3642WITH import_cte AS (
3643  SELECT timestamp, data, operation
3644  FROM `project`.`dataset`.`source_table`
3645),
3646transform_cte AS (
3647  SELECT
3648    timestamp,
3649    SAFE.PARSE_JSON(data) AS json_data
3650  FROM import_cte
3651)
3652SELECT json_data FROM transform_cte
3653"#;
3654        let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3655        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3656            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3657        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3658
3659        assert!(
3660            names.iter().any(|name| name == "source_table.data"),
3661            "expected source_table.data in lineage, got {names:?}"
3662        );
3663        assert!(
3664            !names
3665                .iter()
3666                .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3667            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3668        );
3669    }
3670
3671    #[test]
3672    fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3673        let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3674        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3675            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3676        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3677
3678        assert!(
3679            names.iter().any(|name| name == "t.data"),
3680            "expected t.data in lineage, got {names:?}"
3681        );
3682        assert!(
3683            !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3684            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3685        );
3686    }
3687
3688    #[test]
3689    fn test_lineage_method_call_receiver_control() {
3690        let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3691        let node = lineage("out", &expr, None, false).expect("lineage");
3692        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3693
3694        assert!(
3695            names.iter().any(|name| name == "t.obj"),
3696            "expected ordinary method receiver to remain in lineage, got {names:?}"
3697        );
3698        assert!(
3699            names.iter().any(|name| name == "t.arg"),
3700            "expected method argument in lineage, got {names:?}"
3701        );
3702    }
3703
3704    #[test]
3705    fn test_lineage_upper_function() {
3706        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3707        let node = lineage("upper_name", &expr, None, false).unwrap();
3708
3709        let names = node.downstream_names();
3710        assert!(
3711            names.iter().any(|n| n == "users.name"),
3712            "Expected users.name in downstream, got: {:?}",
3713            names
3714        );
3715    }
3716
3717    #[test]
3718    fn test_lineage_round_function() {
3719        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3720        let node = lineage("rounded", &expr, None, false).unwrap();
3721
3722        let names = node.downstream_names();
3723        assert!(
3724            names.iter().any(|n| n == "products.price"),
3725            "Expected products.price in downstream, got: {:?}",
3726            names
3727        );
3728    }
3729
3730    #[test]
3731    fn test_lineage_coalesce_function() {
3732        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3733        let node = lineage("val", &expr, None, false).unwrap();
3734
3735        let names = node.downstream_names();
3736        assert!(
3737            names.iter().any(|n| n == "t.a"),
3738            "Expected t.a in downstream, got: {:?}",
3739            names
3740        );
3741        assert!(
3742            names.iter().any(|n| n == "t.b"),
3743            "Expected t.b in downstream, got: {:?}",
3744            names
3745        );
3746    }
3747
3748    #[test]
3749    fn test_lineage_count_function() {
3750        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3751        let node = lineage("cnt", &expr, None, false).unwrap();
3752
3753        let names = node.downstream_names();
3754        assert!(
3755            names.iter().any(|n| n == "t.id"),
3756            "Expected t.id in downstream, got: {:?}",
3757            names
3758        );
3759    }
3760
3761    #[test]
3762    fn test_lineage_sum_function() {
3763        let expr = parse("SELECT SUM(amount) AS total FROM t");
3764        let node = lineage("total", &expr, None, false).unwrap();
3765
3766        let names = node.downstream_names();
3767        assert!(
3768            names.iter().any(|n| n == "t.amount"),
3769            "Expected t.amount in downstream, got: {:?}",
3770            names
3771        );
3772    }
3773
3774    #[test]
3775    fn test_lineage_case_with_nested_functions() {
3776        let expr =
3777            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3778        let node = lineage("result", &expr, None, false).unwrap();
3779
3780        let names = node.downstream_names();
3781        assert!(
3782            names.iter().any(|n| n == "t.x"),
3783            "Expected t.x in downstream, got: {:?}",
3784            names
3785        );
3786        assert!(
3787            names.iter().any(|n| n == "t.name"),
3788            "Expected t.name in downstream, got: {:?}",
3789            names
3790        );
3791    }
3792
3793    #[test]
3794    fn test_lineage_substring_function() {
3795        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3796        let node = lineage("short", &expr, None, false).unwrap();
3797
3798        let names = node.downstream_names();
3799        assert!(
3800            names.iter().any(|n| n == "t.name"),
3801            "Expected t.name in downstream, got: {:?}",
3802            names
3803        );
3804    }
3805
3806    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
3807
3808    #[test]
3809    fn test_lineage_cte_select_star() {
3810        // Ported from sqlglot: test_lineage_source_with_star
3811        // WITH y AS (SELECT * FROM x) SELECT a FROM y
3812        // After star expansion: SELECT y.a AS a FROM y
3813        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3814        let node = lineage("a", &expr, None, false).unwrap();
3815
3816        assert_eq!(node.name, "a");
3817        // Should successfully resolve column 'a' through the CTE
3818        // (previously failed with "Cannot find column 'a' in query")
3819        assert!(
3820            !node.downstream.is_empty(),
3821            "Expected downstream nodes tracing through CTE, got none"
3822        );
3823    }
3824
3825    #[test]
3826    fn test_lineage_cte_select_star_renamed_column() {
3827        // dbt standard pattern: CTE with column rename + outer SELECT *
3828        // This is the primary use case for dbt projects (jaffle-shop etc.)
3829        let expr =
3830            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3831        let node = lineage("customer_id", &expr, None, false).unwrap();
3832
3833        assert_eq!(node.name, "customer_id");
3834        // Should trace customer_id → renamed CTE → source.id
3835        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3836        assert!(
3837            all_names.len() >= 2,
3838            "Expected at least 2 nodes (customer_id → source), got: {:?}",
3839            all_names
3840        );
3841    }
3842
3843    #[test]
3844    fn test_lineage_cte_select_star_multiple_columns() {
3845        // CTE exposes multiple columns, outer SELECT * should resolve each
3846        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3847
3848        for col in &["a", "b", "c"] {
3849            let node = lineage(col, &expr, None, false).unwrap();
3850            assert_eq!(node.name, *col);
3851            // Verify lineage resolves without error (star expanded to explicit columns)
3852            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3853            assert!(
3854                all_names.len() >= 2,
3855                "Expected at least 2 nodes for column {}, got: {:?}",
3856                col,
3857                all_names
3858            );
3859        }
3860    }
3861
3862    #[test]
3863    fn test_lineage_nested_cte_select_star() {
3864        // Nested CTE star expansion: cte2 references cte1 via SELECT *
3865        let expr = parse(
3866            "WITH cte1 AS (SELECT a FROM t), \
3867             cte2 AS (SELECT * FROM cte1) \
3868             SELECT * FROM cte2",
3869        );
3870        let node = lineage("a", &expr, None, false).unwrap();
3871
3872        assert_eq!(node.name, "a");
3873        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3874        assert!(
3875            all_names.len() >= 3,
3876            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3877            all_names
3878        );
3879    }
3880
3881    #[test]
3882    fn test_lineage_three_level_nested_cte_star() {
3883        // Three-level nested CTE: cte3 → cte2 → cte1 → t
3884        let expr = parse(
3885            "WITH cte1 AS (SELECT x FROM t), \
3886             cte2 AS (SELECT * FROM cte1), \
3887             cte3 AS (SELECT * FROM cte2) \
3888             SELECT * FROM cte3",
3889        );
3890        let node = lineage("x", &expr, None, false).unwrap();
3891
3892        assert_eq!(node.name, "x");
3893        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3894        assert!(
3895            all_names.len() >= 4,
3896            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3897            all_names
3898        );
3899    }
3900
3901    #[test]
3902    fn test_lineage_cte_union_star() {
3903        // CTE with UNION body, outer SELECT * should resolve from left branch
3904        let expr = parse(
3905            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3906             SELECT * FROM cte",
3907        );
3908        let node = lineage("a", &expr, None, false).unwrap();
3909
3910        assert_eq!(node.name, "a");
3911        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3912        assert!(
3913            all_names.len() >= 2,
3914            "Expected at least 2 nodes for CTE union star, got: {:?}",
3915            all_names
3916        );
3917    }
3918
3919    #[test]
3920    fn test_lineage_cte_star_unknown_table() {
3921        // When CTE references an unknown table, star expansion is skipped gracefully
3922        // and lineage falls back to normal resolution (which may fail)
3923        let expr = parse(
3924            "WITH cte AS (SELECT * FROM unknown_table) \
3925             SELECT * FROM cte",
3926        );
3927        // This should not panic — it may succeed or fail depending on resolution,
3928        // but should not crash
3929        let _result = lineage("x", &expr, None, false);
3930    }
3931
3932    #[test]
3933    fn test_lineage_cte_explicit_columns() {
3934        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
3935        let expr = parse(
3936            "WITH cte(x, y) AS (SELECT a, b FROM t) \
3937             SELECT * FROM cte",
3938        );
3939        let node = lineage("x", &expr, None, false).unwrap();
3940        assert_eq!(node.name, "x");
3941    }
3942
3943    #[test]
3944    fn test_lineage_cte_qualified_star() {
3945        // Qualified star: SELECT cte.* FROM cte
3946        let expr = parse(
3947            "WITH cte AS (SELECT a, b FROM t) \
3948             SELECT cte.* FROM cte",
3949        );
3950        for col in &["a", "b"] {
3951            let node = lineage(col, &expr, None, false).unwrap();
3952            assert_eq!(node.name, *col);
3953            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3954            assert!(
3955                all_names.len() >= 2,
3956                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3957                col,
3958                all_names
3959            );
3960        }
3961    }
3962
3963    #[test]
3964    fn test_lineage_subquery_select_star() {
3965        // Ported from sqlglot: test_select_star
3966        // SELECT x FROM (SELECT * FROM table_a)
3967        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3968        let node = lineage("x", &expr, None, false).unwrap();
3969
3970        assert_eq!(node.name, "x");
3971        assert!(
3972            !node.downstream.is_empty(),
3973            "Expected downstream nodes for subquery with SELECT *, got none"
3974        );
3975    }
3976
3977    #[test]
3978    fn test_lineage_cte_star_with_schema_external_table() {
3979        // CTE references an external table via SELECT * — schema enables expansion
3980        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3981SELECT * FROM orders"#;
3982        let expr = parse(sql);
3983
3984        let mut schema = MappingSchema::new();
3985        let cols = vec![
3986            ("order_id".to_string(), DataType::Unknown),
3987            ("customer_id".to_string(), DataType::Unknown),
3988            ("amount".to_string(), DataType::Unknown),
3989        ];
3990        schema.add_table("stg_orders", &cols, None).unwrap();
3991
3992        let node =
3993            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3994                .unwrap();
3995        assert_eq!(node.name, "order_id");
3996    }
3997
3998    #[test]
3999    fn test_lineage_cte_star_with_schema_three_part_name() {
4000        // CTE references an external table with fully-qualified 3-part name
4001        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
4002SELECT * FROM orders"#;
4003        let expr = parse(sql);
4004
4005        let mut schema = MappingSchema::new();
4006        let cols = vec![
4007            ("order_id".to_string(), DataType::Unknown),
4008            ("customer_id".to_string(), DataType::Unknown),
4009        ];
4010        schema
4011            .add_table("db.schema.stg_orders", &cols, None)
4012            .unwrap();
4013
4014        let node = lineage_with_schema(
4015            "customer_id",
4016            &expr,
4017            Some(&schema as &dyn Schema),
4018            None,
4019            false,
4020        )
4021        .unwrap();
4022        assert_eq!(node.name, "customer_id");
4023    }
4024
4025    #[test]
4026    fn test_lineage_cte_star_with_schema_nested() {
4027        // Nested CTEs: outer CTE references inner CTE with SELECT *,
4028        // inner CTE references external table with SELECT *
4029        let sql = r#"WITH
4030            raw AS (SELECT * FROM external_table),
4031            enriched AS (SELECT * FROM raw)
4032        SELECT * FROM enriched"#;
4033        let expr = parse(sql);
4034
4035        let mut schema = MappingSchema::new();
4036        let cols = vec![
4037            ("id".to_string(), DataType::Unknown),
4038            ("name".to_string(), DataType::Unknown),
4039        ];
4040        schema.add_table("external_table", &cols, None).unwrap();
4041
4042        let node =
4043            lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4044        assert_eq!(node.name, "name");
4045    }
4046
4047    #[test]
4048    fn test_lineage_cte_qualified_star_with_schema() {
4049        // CTE uses qualified star (orders.*) from a CTE whose columns
4050        // come from an external table via SELECT *
4051        let sql = r#"WITH
4052            orders AS (SELECT * FROM stg_orders),
4053            enriched AS (
4054                SELECT orders.*, 'extra' AS extra
4055                FROM orders
4056            )
4057        SELECT * FROM enriched"#;
4058        let expr = parse(sql);
4059
4060        let mut schema = MappingSchema::new();
4061        let cols = vec![
4062            ("order_id".to_string(), DataType::Unknown),
4063            ("total".to_string(), DataType::Unknown),
4064        ];
4065        schema.add_table("stg_orders", &cols, None).unwrap();
4066
4067        let node =
4068            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4069                .unwrap();
4070        assert_eq!(node.name, "order_id");
4071
4072        // Also verify the extra column works
4073        let extra =
4074            lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4075        assert_eq!(extra.name, "extra");
4076    }
4077
4078    #[test]
4079    fn test_lineage_cte_star_without_schema_still_works() {
4080        // Without schema, CTE-to-CTE star expansion still works
4081        let sql = r#"WITH
4082            cte1 AS (SELECT id, name FROM raw_table),
4083            cte2 AS (SELECT * FROM cte1)
4084        SELECT * FROM cte2"#;
4085        let expr = parse(sql);
4086
4087        // No schema — should still resolve through CTE chain
4088        let node = lineage("id", &expr, None, false).unwrap();
4089        assert_eq!(node.name, "id");
4090    }
4091
4092    #[test]
4093    fn test_lineage_nested_cte_star_with_join_and_schema() {
4094        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
4095        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
4096        let sql = r#"WITH
4097base_orders AS (
4098    SELECT * FROM stg_orders
4099),
4100with_payments AS (
4101    SELECT
4102        base_orders.*,
4103        p.amount
4104    FROM base_orders
4105    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
4106),
4107final_cte AS (
4108    SELECT * FROM with_payments
4109)
4110SELECT * FROM final_cte"#;
4111        let expr = parse(sql);
4112
4113        let mut schema = MappingSchema::new();
4114        let order_cols = vec![
4115            (
4116                "order_id".to_string(),
4117                crate::expressions::DataType::Unknown,
4118            ),
4119            (
4120                "customer_id".to_string(),
4121                crate::expressions::DataType::Unknown,
4122            ),
4123            ("status".to_string(), crate::expressions::DataType::Unknown),
4124        ];
4125        let pay_cols = vec![
4126            (
4127                "payment_id".to_string(),
4128                crate::expressions::DataType::Unknown,
4129            ),
4130            (
4131                "order_id".to_string(),
4132                crate::expressions::DataType::Unknown,
4133            ),
4134            ("amount".to_string(), crate::expressions::DataType::Unknown),
4135        ];
4136        schema.add_table("stg_orders", &order_cols, None).unwrap();
4137        schema.add_table("stg_payments", &pay_cols, None).unwrap();
4138
4139        // order_id should trace back to stg_orders
4140        let node =
4141            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4142                .unwrap();
4143        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4144
4145        // The leaf should be "stg_orders.order_id" (not just "order_id")
4146        let has_table_qualified = all_names
4147            .iter()
4148            .any(|n| n.contains('.') && n.contains("order_id"));
4149        assert!(
4150            has_table_qualified,
4151            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
4152            all_names
4153        );
4154
4155        // amount should trace back to stg_payments
4156        let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
4157            .unwrap();
4158        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4159
4160        let has_table_qualified = all_names
4161            .iter()
4162            .any(|n| n.contains('.') && n.contains("amount"));
4163        assert!(
4164            has_table_qualified,
4165            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
4166            all_names
4167        );
4168    }
4169
4170    #[test]
4171    fn test_lineage_cte_alias_resolution() {
4172        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
4173        let sql = r#"WITH import_stg_items AS (
4174    SELECT item_id, name, status FROM stg_items
4175)
4176SELECT base.item_id, base.status
4177FROM import_stg_items AS base"#;
4178        let expr = parse(sql);
4179
4180        let node = lineage("item_id", &expr, None, false).unwrap();
4181        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4182        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
4183        assert!(
4184            all_names.iter().any(|n| n == "stg_items.item_id"),
4185            "Expected leaf 'stg_items.item_id', got: {:?}",
4186            all_names
4187        );
4188    }
4189
4190    #[test]
4191    fn test_lineage_cte_alias_with_schema_and_star() {
4192        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
4193        let sql = r#"WITH import_stg AS (
4194    SELECT * FROM stg_items
4195)
4196SELECT base.item_id, base.status
4197FROM import_stg AS base"#;
4198        let expr = parse(sql);
4199
4200        let mut schema = MappingSchema::new();
4201        schema
4202            .add_table(
4203                "stg_items",
4204                &[
4205                    ("item_id".to_string(), DataType::Unknown),
4206                    ("name".to_string(), DataType::Unknown),
4207                    ("status".to_string(), DataType::Unknown),
4208                ],
4209                None,
4210            )
4211            .unwrap();
4212
4213        let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
4214            .unwrap();
4215        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4216        assert!(
4217            all_names.iter().any(|n| n == "stg_items.item_id"),
4218            "Expected leaf 'stg_items.item_id', got: {:?}",
4219            all_names
4220        );
4221    }
4222
4223    #[test]
4224    fn test_lineage_cte_alias_with_join() {
4225        // Multiple CTE aliases in a JOIN: each should resolve independently
4226        let sql = r#"WITH
4227    import_users AS (SELECT id, name FROM users),
4228    import_orders AS (SELECT id, user_id, amount FROM orders)
4229SELECT u.name, o.amount
4230FROM import_users AS u
4231LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
4232        let expr = parse(sql);
4233
4234        let node = lineage("name", &expr, None, false).unwrap();
4235        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4236        assert!(
4237            all_names.iter().any(|n| n == "users.name"),
4238            "Expected leaf 'users.name', got: {:?}",
4239            all_names
4240        );
4241
4242        let node = lineage("amount", &expr, None, false).unwrap();
4243        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4244        assert!(
4245            all_names.iter().any(|n| n == "orders.amount"),
4246            "Expected leaf 'orders.amount', got: {:?}",
4247            all_names
4248        );
4249    }
4250
4251    // -----------------------------------------------------------------------
4252    // Quoted CTE name tests — verifying SQL identifier case semantics
4253    // -----------------------------------------------------------------------
4254
4255    #[test]
4256    fn test_lineage_unquoted_cte_case_insensitive() {
4257        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
4258        // MyCte and MYCTE should match.
4259        let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
4260        let node = lineage("col", &expr, None, false).unwrap();
4261        assert_eq!(node.name, "col");
4262        assert!(
4263            !node.downstream.is_empty(),
4264            "Unquoted CTE should resolve case-insensitively"
4265        );
4266    }
4267
4268    #[test]
4269    fn test_lineage_quoted_cte_case_preserved() {
4270        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
4271        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
4272        let node = lineage("col", &expr, None, false).unwrap();
4273        assert_eq!(node.name, "col");
4274        assert!(
4275            !node.downstream.is_empty(),
4276            "Quoted CTE with matching case should resolve"
4277        );
4278    }
4279
4280    #[test]
4281    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
4282        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
4283        // sqlglot treats this as a table reference, not a CTE match.
4284        // Star expansion should NOT resolve through the CTE.
4285        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
4286        // lineage("col", ...) should fail because "mycte" is treated as an external
4287        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
4288        let result = lineage("col", &expr, None, false);
4289        assert!(
4290            result.is_err(),
4291            "Quoted CTE with case mismatch should not expand star: {:?}",
4292            result
4293        );
4294    }
4295
4296    #[test]
4297    fn test_lineage_mixed_quoted_unquoted_cte() {
4298        // Mix of unquoted and quoted CTEs in a nested chain.
4299        let expr = parse(
4300            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
4301        );
4302        let node = lineage("a", &expr, None, false).unwrap();
4303        assert_eq!(node.name, "a");
4304        assert!(
4305            !node.downstream.is_empty(),
4306            "Mixed quoted/unquoted CTE chain should resolve"
4307        );
4308    }
4309
4310    // -----------------------------------------------------------------------
4311    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
4312    // -----------------------------------------------------------------------
4313    //
4314    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
4315    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
4316    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
4317    // direct string comparison for CTE name matching, ignoring the quoted status.
4318    //
4319    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
4320    // and unquoted as case-insensitive. The scope system should do the same.
4321    //
4322    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
4323    // which is broader than the star expansion scope of this PR.
4324
4325    #[test]
4326    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
4327        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
4328        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
4329        // as "mycte" incorrectly resolves to the CTE.
4330        //
4331        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
4332        // case-sensitive: "mycte" should NOT match CTE "MyCte".
4333        //
4334        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
4335        // this test should fail — update the assertion to match correct behavior:
4336        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
4337        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
4338        let node = lineage("col", &expr, None, false).unwrap();
4339        assert!(!node.downstream.is_empty());
4340        let child = &node.downstream[0];
4341        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
4342        assert_eq!(
4343            child.source_name, "MyCte",
4344            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4345             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4346        );
4347    }
4348
4349    #[test]
4350    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
4351        // Known bug: same as above but with qualified column reference ("mycte".col).
4352        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
4353        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
4354        //
4355        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
4356        // this test should fail — update to assert source_name != "MyCte".
4357        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
4358        let node = lineage("col", &expr, None, false).unwrap();
4359        assert!(!node.downstream.is_empty());
4360        let child = &node.downstream[0];
4361        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
4362        assert_eq!(
4363            child.source_name, "MyCte",
4364            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4365             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4366        );
4367    }
4368
4369    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
4370
4371    /// sqlglot: test_node_name_doesnt_contain_comment
4372    /// Comments in column expressions should not affect lineage resolution.
4373    /// NOTE: This test uses SELECT * from a derived table, which is a separate
4374    /// known limitation in polyglot-sql (star expansion in subqueries).
4375    #[test]
4376    #[ignore = "requires derived table star expansion (separate issue)"]
4377    fn test_node_name_doesnt_contain_comment() {
4378        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
4379        let node = lineage("x", &expr, None, false).unwrap();
4380
4381        assert_eq!(node.name, "x");
4382        assert!(!node.downstream.is_empty());
4383    }
4384
4385    /// A line comment between SELECT and the first column wraps the column
4386    /// in an Annotated node. Lineage must unwrap it to find the column name.
4387    /// Verify that commented and uncommented queries produce identical lineage.
4388    #[test]
4389    fn test_comment_before_first_column_in_cte() {
4390        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
4391        let sql_without_comment = "with t as (select 1 as a) select a from t";
4392
4393        // Without comment — baseline
4394        let expr_ok = parse(sql_without_comment);
4395        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
4396
4397        // With comment — should produce identical lineage
4398        let expr_comment = parse(sql_with_comment);
4399        let node_comment = lineage("a", &expr_comment, None, false)
4400            .expect("with comment before first column should succeed");
4401
4402        assert_eq!(node_ok.name, node_comment.name, "node names should match");
4403        assert_eq!(
4404            node_ok.downstream_names(),
4405            node_comment.downstream_names(),
4406            "downstream lineage should be identical with or without comment"
4407        );
4408    }
4409
4410    /// Block comment between SELECT and first column.
4411    #[test]
4412    fn test_block_comment_before_first_column() {
4413        let sql = "with t as (select 1 as a) select /* section */ a from t";
4414        let expr = parse(sql);
4415        let node = lineage("a", &expr, None, false)
4416            .expect("block comment before first column should succeed");
4417        assert_eq!(node.name, "a");
4418        assert!(
4419            !node.downstream.is_empty(),
4420            "should have downstream lineage"
4421        );
4422    }
4423
4424    /// Comment before first column should not affect second column resolution.
4425    #[test]
4426    fn test_comment_before_first_column_second_col_ok() {
4427        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
4428        let expr = parse(sql);
4429
4430        let node_a =
4431            lineage("a", &expr, None, false).expect("column a with comment should succeed");
4432        assert_eq!(node_a.name, "a");
4433
4434        let node_b =
4435            lineage("b", &expr, None, false).expect("column b with comment should succeed");
4436        assert_eq!(node_b.name, "b");
4437    }
4438
4439    /// Aliased column with preceding comment.
4440    #[test]
4441    fn test_comment_before_aliased_column() {
4442        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
4443        let expr = parse(sql);
4444        let node =
4445            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
4446        assert_eq!(node.name, "y");
4447        assert!(
4448            !node.downstream.is_empty(),
4449            "aliased column should have downstream lineage"
4450        );
4451    }
4452}