Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19/// A node in the column lineage graph
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22    /// Name of this lineage step (e.g., "table.column")
23    pub name: String,
24    /// The expression at this node
25    pub expression: Expression,
26    /// The source expression (the full query context)
27    pub source: Expression,
28    /// Downstream nodes that depend on this one
29    pub downstream: Vec<LineageNode>,
30    /// Optional source name (e.g., for derived tables)
31    pub source_name: String,
32    /// Semantic source kind for downstream consumers.
33    pub source_kind: SourceKind,
34    /// User-written source alias when different from canonical source name.
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub source_alias: Option<String>,
37    /// Optional reference node name (e.g., for CTEs)
38    pub reference_node_name: String,
39}
40
41impl LineageNode {
42    /// Create a new lineage node
43    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44        Self {
45            name: name.into(),
46            expression,
47            source,
48            downstream: Vec::new(),
49            source_name: String::new(),
50            source_kind: SourceKind::Unknown,
51            source_alias: None,
52            reference_node_name: String::new(),
53        }
54    }
55
56    /// Iterate over all nodes in the lineage graph using DFS
57    pub fn walk(&self) -> LineageWalker<'_> {
58        LineageWalker { stack: vec![self] }
59    }
60
61    /// Get all downstream column names
62    pub fn downstream_names(&self) -> Vec<String> {
63        self.downstream.iter().map(|n| n.name.clone()).collect()
64    }
65}
66
67fn source_kind_for_scope_context(
68    scope: &Scope,
69    source_name: &str,
70    reference_node_name: &str,
71) -> SourceKind {
72    if source_name.is_empty() && reference_node_name.is_empty() {
73        return SourceKind::Root;
74    }
75    if let Some(source_info) = scope.sources.get(source_name) {
76        return source_info.kind;
77    }
78    if scope.cte_sources.contains_key(source_name) {
79        return SourceKind::Cte;
80    }
81    match scope.scope_type {
82        ScopeType::Cte => SourceKind::Cte,
83        ScopeType::DerivedTable => SourceKind::DerivedTable,
84        ScopeType::Udtf => SourceKind::Virtual,
85        _ => SourceKind::Unknown,
86    }
87}
88
89fn apply_scope_context(
90    node: &mut LineageNode,
91    scope: &Scope,
92    source_name: &str,
93    reference_node_name: &str,
94) {
95    node.source_name = source_name.to_string();
96    node.reference_node_name = reference_node_name.to_string();
97    node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100/// Iterator for walking the lineage graph
101pub struct LineageWalker<'a> {
102    stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106    type Item = &'a LineageNode;
107
108    fn next(&mut self) -> Option<Self::Item> {
109        if let Some(node) = self.stack.pop() {
110            // Add children in reverse order so they're visited in order
111            for child in node.downstream.iter().rev() {
112                self.stack.push(child);
113            }
114            Some(node)
115        } else {
116            None
117        }
118    }
119}
120
121// ---------------------------------------------------------------------------
122// ColumnRef: name or positional index for column lookup
123// ---------------------------------------------------------------------------
124
125/// Column reference for lineage tracing — by name or positional index.
126enum ColumnRef<'a> {
127    Name(&'a str),
128    Index(usize),
129}
130
131// ---------------------------------------------------------------------------
132// Public API
133// ---------------------------------------------------------------------------
134
135/// Build the lineage graph for a column in a SQL query
136///
137/// # Arguments
138/// * `column` - The column name to trace lineage for
139/// * `sql` - The SQL expression (SELECT, UNION, etc.)
140/// * `dialect` - Optional dialect for parsing
141/// * `trim_selects` - If true, trim the source SELECT to only include the target column
142///
143/// # Returns
144/// The root lineage node for the specified column
145///
146/// # Example
147/// ```ignore
148/// use polyglot_sql::lineage::lineage;
149/// use polyglot_sql::parse_one;
150/// use polyglot_sql::DialectType;
151///
152/// let sql = "SELECT a, b + 1 AS c FROM t";
153/// let expr = parse_one(sql, DialectType::Generic).unwrap();
154/// let node = lineage("c", &expr, None, false).unwrap();
155/// ```
156pub fn lineage(
157    column: &str,
158    sql: &Expression,
159    dialect: Option<DialectType>,
160    trim_selects: bool,
161) -> Result<LineageNode> {
162    // Fast path: skip clone when there are no CTEs to expand
163    let effective_sql = lineage_effective_expression(sql);
164    let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
165    if !has_with {
166        return lineage_from_expression(column, sql, dialect, trim_selects);
167    }
168    let mut owned = sql.clone();
169    expand_cte_stars(&mut owned, None);
170    lineage_from_expression(column, &owned, dialect, trim_selects)
171}
172
173/// Build the lineage graph for a column in a SQL query using optional schema metadata.
174///
175/// When `schema` is provided, the query is first qualified with
176/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
177/// ambiguous column references.
178///
179/// # Arguments
180/// * `column` - The column name to trace lineage for
181/// * `sql` - The SQL expression (SELECT, UNION, etc.)
182/// * `schema` - Optional schema used for qualification
183/// * `dialect` - Optional dialect for qualification and lineage handling
184/// * `trim_selects` - If true, trim the source SELECT to only include the target column
185///
186/// # Returns
187/// The root lineage node for the specified column
188pub fn lineage_with_schema(
189    column: &str,
190    sql: &Expression,
191    schema: Option<&dyn Schema>,
192    dialect: Option<DialectType>,
193    trim_selects: bool,
194) -> Result<LineageNode> {
195    let mut qualified_expression = if let Some(schema) = schema {
196        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197            QualifyColumnsOptions::new().with_dialect(dialect_type)
198        } else {
199            QualifyColumnsOptions::new()
200        };
201
202        qualify_columns(sql.clone(), schema, &options).map_err(|e| {
203            Error::internal(format!("Lineage qualification failed with schema: {}", e))
204        })?
205    } else {
206        sql.clone()
207    };
208
209    // Annotate types in-place so lineage nodes carry type information
210    annotate_types(&mut qualified_expression, schema, dialect);
211
212    // Expand CTE stars on the already-owned expression (no extra clone).
213    // Pass schema so that stars from external tables can also be resolved.
214    expand_cte_stars(&mut qualified_expression, schema);
215
216    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220    column: &str,
221    sql: &Expression,
222    dialect: Option<DialectType>,
223    trim_selects: bool,
224) -> Result<LineageNode> {
225    let sql = lineage_effective_expression(sql);
226    let scope = build_scope(sql);
227    to_node(
228        ColumnRef::Name(column),
229        &scope,
230        dialect,
231        "",
232        "",
233        "",
234        trim_selects,
235    )
236}
237
238pub(crate) fn lineage_by_index_from_expression(
239    column_index: usize,
240    sql: &Expression,
241    dialect: Option<DialectType>,
242    trim_selects: bool,
243) -> Result<LineageNode> {
244    let sql = lineage_effective_expression(sql);
245    let scope = build_scope(sql);
246    to_node(
247        ColumnRef::Index(column_index),
248        &scope,
249        dialect,
250        "",
251        "",
252        "",
253        trim_selects,
254    )
255}
256
257fn lineage_effective_expression(sql: &Expression) -> &Expression {
258    match sql {
259        Expression::Prepare(prepare) => &prepare.statement,
260        _ => sql,
261    }
262}
263
264// ---------------------------------------------------------------------------
265// CTE star expansion
266// ---------------------------------------------------------------------------
267
268/// Normalize an identifier for CTE name matching.
269///
270/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
271/// quoted identifiers preserve their original case. This matches sqlglot's
272/// `normalize_identifiers` behavior.
273fn normalize_cte_name(ident: &Identifier) -> String {
274    if ident.quoted {
275        ident.name.clone()
276    } else {
277        ident.name.to_lowercase()
278    }
279}
280
281/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
282/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
283/// which qualify_columns cannot resolve because it processes each SELECT independently.
284///
285/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
286/// by looking up column names in the schema. This enables correct expansion of patterns
287/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
288///
289/// CTE name matching follows SQL identifier semantics: unquoted names are compared
290/// case-insensitively (lowercased), while quoted names preserve their original case.
291/// This matches sqlglot's `normalize_identifiers` behavior.
292pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
293    if let Expression::Prepare(prepare) = expr {
294        expand_cte_stars(&mut prepare.statement, schema);
295        return;
296    }
297
298    let select = match expr {
299        Expression::Select(s) => s,
300        _ => return,
301    };
302
303    let with = match &mut select.with {
304        Some(w) => w,
305        None => return,
306    };
307
308    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
309
310    for cte in &mut with.ctes {
311        let cte_name = normalize_cte_name(&cte.alias);
312
313        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
314        if !cte.columns.is_empty() {
315            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
316            resolved_cte_columns.insert(cte_name, cols);
317            continue;
318        }
319
320        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
321        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
322        // references itself. We detect this conservatively: if the CTE name appears as
323        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
324        // block are still expanded.
325        if with.recursive {
326            let is_self_referencing =
327                if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
328                    let body_sources = get_select_sources(body_select);
329                    body_sources.iter().any(|s| s.normalized == cte_name)
330                } else {
331                    false
332                };
333            if is_self_referencing {
334                continue;
335            }
336        }
337
338        // Get the SELECT from the CTE body (handle UNION by taking left branch)
339        let body_select = match get_leftmost_select_mut(&mut cte.this) {
340            Some(s) => s,
341            None => continue,
342        };
343
344        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
345        resolved_cte_columns.insert(cte_name, columns);
346    }
347
348    // Also expand stars in the outer SELECT itself
349    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
350}
351
352/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
353///
354/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
355/// are determined by the left branch. This matches sqlglot's behavior.
356fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
357    let mut current = expr;
358    for _ in 0..MAX_LINEAGE_DEPTH {
359        match current {
360            Expression::Select(s) => return Some(s),
361            Expression::Union(u) => current = &mut u.left,
362            Expression::Intersect(i) => current = &mut i.left,
363            Expression::Except(e) => current = &mut e.left,
364            Expression::Paren(p) => current = &mut p.this,
365            _ => return None,
366        }
367    }
368    None
369}
370
371/// Rewrite star expressions in a SELECT using resolved CTE column lists.
372/// Falls back to `schema` for external table column lookup.
373/// Returns the list of output column names after expansion.
374fn rewrite_stars_in_select(
375    select: &mut Select,
376    resolved_ctes: &HashMap<String, Vec<String>>,
377    schema: Option<&dyn Schema>,
378) -> Vec<String> {
379    // The AST represents star expressions in two forms depending on syntax:
380    //   - `SELECT *`      → Expression::Star (unqualified star)
381    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
382    // Both must be checked to handle all star patterns.
383    let has_star = select
384        .expressions
385        .iter()
386        .any(|e| matches!(e, Expression::Star(_)));
387    let has_qualified_star = select
388        .expressions
389        .iter()
390        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
391
392    if !has_star && !has_qualified_star {
393        // No stars — just extract column names without rewriting
394        return select
395            .expressions
396            .iter()
397            .filter_map(get_expression_output_name)
398            .collect();
399    }
400
401    let sources = get_select_sources(select);
402    let mut new_expressions = Vec::new();
403    let mut result_columns = Vec::new();
404
405    for expr in &select.expressions {
406        match expr {
407            Expression::Star(star) => {
408                let qual = star.table.as_ref();
409                if let Some(expanded) =
410                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
411                {
412                    for (src_alias, col_name) in &expanded {
413                        let table_id = Identifier::new(src_alias);
414                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
415                        result_columns.push(col_name.clone());
416                    }
417                } else {
418                    new_expressions.push(expr.clone());
419                    result_columns.push("*".to_string());
420                }
421            }
422            Expression::Column(c) if c.name.name == "*" => {
423                let qual = c.table.as_ref();
424                if let Some(expanded) =
425                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
426                {
427                    for (_src_alias, col_name) in &expanded {
428                        // Keep the original table qualifier for qualified stars (table.*)
429                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
430                        result_columns.push(col_name.clone());
431                    }
432                } else {
433                    new_expressions.push(expr.clone());
434                    result_columns.push("*".to_string());
435                }
436            }
437            _ => {
438                new_expressions.push(expr.clone());
439                if let Some(name) = get_expression_output_name(expr) {
440                    result_columns.push(name);
441                }
442            }
443        }
444    }
445
446    select.expressions = new_expressions;
447    result_columns
448}
449
450/// Try to expand a star expression by looking up source columns from resolved CTEs,
451/// falling back to the schema for external tables.
452/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
453/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
454fn expand_star_from_sources(
455    qualifier: Option<&Identifier>,
456    sources: &[SourceInfo],
457    resolved_ctes: &HashMap<String, Vec<String>>,
458    schema: Option<&dyn Schema>,
459) -> Option<Vec<(String, String)>> {
460    let mut expanded = Vec::new();
461
462    if let Some(qual) = qualifier {
463        // Qualified star: table.*
464        let qual_normalized = normalize_cte_name(qual);
465        for src in sources {
466            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
467                // Try CTE first
468                if let Some(cols) = resolved_ctes.get(&src.normalized) {
469                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
470                    return Some(expanded);
471                }
472                // Fall back to schema
473                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
474                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
475                    return Some(expanded);
476                }
477            }
478        }
479        None
480    } else {
481        // Unqualified star: expand all sources.
482        // Intentionally conservative: if any source can't be resolved, the entire
483        // expansion is aborted. Partial expansion would produce an incomplete column
484        // list, causing downstream lineage resolution to silently omit columns.
485        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
486        let mut any_expanded = false;
487        for src in sources {
488            if let Some(cols) = resolved_ctes.get(&src.normalized) {
489                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
490                any_expanded = true;
491            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
492                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
493                any_expanded = true;
494            } else {
495                return None;
496            }
497        }
498        if any_expanded {
499            Some(expanded)
500        } else {
501            None
502        }
503    }
504}
505
506/// Look up column names for a table from the schema.
507fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
508    let schema = schema?;
509    if fq_name.is_empty() {
510        return None;
511    }
512    schema
513        .column_names(fq_name)
514        .ok()
515        .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
516}
517
518/// Create a Column expression with the given name and optional table qualifier.
519fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
520    Expression::Column(Box::new(crate::expressions::Column {
521        name: Identifier::new(name),
522        table: table.cloned(),
523        join_mark: false,
524        trailing_comments: Vec::new(),
525        span: None,
526        inferred_type: None,
527    }))
528}
529
530/// Extract the output name of a SELECT expression.
531fn get_expression_output_name(expr: &Expression) -> Option<String> {
532    match expr {
533        Expression::Alias(a) => Some(a.alias.name.clone()),
534        Expression::Column(c) => Some(c.name.name.clone()),
535        Expression::Identifier(id) => Some(id.name.clone()),
536        Expression::Star(_) => Some("*".to_string()),
537        _ => None,
538    }
539}
540
541/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
542struct SourceInfo {
543    alias: String,
544    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
545    normalized: String,
546    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
547    fq_name: String,
548}
549
550/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
551/// SELECT's FROM and JOIN clauses in a single pass.
552fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
553    let mut sources = Vec::new();
554
555    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
556        fn virtual_source_info(alias: &Identifier) -> SourceInfo {
557            SourceInfo {
558                alias: alias.name.clone(),
559                normalized: normalize_cte_name(alias),
560                fq_name: alias.name.clone(),
561            }
562        }
563
564        fn named_virtual_source_info(alias: &str) -> SourceInfo {
565            SourceInfo {
566                alias: alias.to_string(),
567                normalized: alias.to_lowercase(),
568                fq_name: alias.to_string(),
569            }
570        }
571
572        match expr {
573            Expression::Table(t) => {
574                let normalized = normalize_cte_name(&t.name);
575                let alias = t
576                    .alias
577                    .as_ref()
578                    .map(|a| a.name.clone())
579                    .unwrap_or_else(|| t.name.name.clone());
580                let mut parts = Vec::new();
581                if let Some(catalog) = &t.catalog {
582                    parts.push(catalog.name.clone());
583                }
584                if let Some(schema) = &t.schema {
585                    parts.push(schema.name.clone());
586                }
587                parts.push(t.name.name.clone());
588                let fq_name = parts.join(".");
589                Some(SourceInfo {
590                    alias,
591                    normalized,
592                    fq_name,
593                })
594            }
595            Expression::Subquery(s) => {
596                let alias = s.alias.as_ref()?.name.clone();
597                let normalized = alias.to_lowercase();
598                let fq_name = alias.clone();
599                Some(SourceInfo {
600                    alias,
601                    normalized,
602                    fq_name,
603                })
604            }
605            Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
606            Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
607                Some(virtual_source_info(&a.alias))
608            }
609            Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
610            Expression::LateralView(lateral_view) => lateral_view
611                .table_alias
612                .as_ref()
613                .or_else(|| lateral_view.column_aliases.first())
614                .map(virtual_source_info),
615            Expression::Paren(p) => extract_source(&p.this),
616            _ => None,
617        }
618    }
619
620    if let Some(from) = &select.from {
621        for expr in &from.expressions {
622            if let Some(info) = extract_source(expr) {
623                sources.push(info);
624            }
625        }
626    }
627    for join in &select.joins {
628        if let Some(info) = extract_source(&join.this) {
629            sources.push(info);
630        }
631    }
632    for lateral_view in &select.lateral_views {
633        if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
634        {
635            sources.push(info);
636        }
637    }
638    sources
639}
640
641/// Get all source tables from a lineage graph
642pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
643    let mut tables = HashSet::new();
644    collect_source_tables(node, &mut tables);
645    tables
646}
647
648/// Recursively collect source table names from lineage graph
649pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
650    if let Expression::Table(table) = &node.source {
651        tables.insert(table.name.name.clone());
652    }
653    for child in &node.downstream {
654        collect_source_tables(child, tables);
655    }
656}
657
658// ---------------------------------------------------------------------------
659// Core recursive lineage builder
660// ---------------------------------------------------------------------------
661
662/// Maximum recursion depth for lineage tracing to prevent stack overflow
663/// on circular or deeply nested CTE chains.
664const MAX_LINEAGE_DEPTH: usize = 64;
665
666/// Recursively build a lineage node for a column in a scope.
667fn to_node(
668    column: ColumnRef<'_>,
669    scope: &Scope,
670    dialect: Option<DialectType>,
671    scope_name: &str,
672    source_name: &str,
673    reference_node_name: &str,
674    trim_selects: bool,
675) -> Result<LineageNode> {
676    to_node_inner(
677        column,
678        scope,
679        dialect,
680        scope_name,
681        source_name,
682        reference_node_name,
683        trim_selects,
684        &[],
685        0,
686    )
687}
688
689fn to_node_inner(
690    column: ColumnRef<'_>,
691    scope: &Scope,
692    dialect: Option<DialectType>,
693    scope_name: &str,
694    source_name: &str,
695    reference_node_name: &str,
696    trim_selects: bool,
697    ancestor_cte_scopes: &[Scope],
698    depth: usize,
699) -> Result<LineageNode> {
700    if depth > MAX_LINEAGE_DEPTH {
701        return Err(Error::internal(format!(
702            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
703        )));
704    }
705    let scope_expr = &scope.expression;
706
707    // Build combined CTE scopes: current scope's cte_scopes + ancestors
708    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
709    for s in ancestor_cte_scopes {
710        all_cte_scopes.push(s);
711    }
712
713    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
714    //    but we need the inner query (SELECT/UNION) for column lookup.
715    let effective_expr = match scope_expr {
716        Expression::Cte(cte) => &cte.this,
717        other => other,
718    };
719
720    // 1. Set operations (UNION / INTERSECT / EXCEPT)
721    if matches!(
722        effective_expr,
723        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
724    ) {
725        // For CTE wrapping a set op, create a temporary scope with the inner expression
726        if matches!(scope_expr, Expression::Cte(_)) {
727            let mut inner_scope = Scope::new(effective_expr.clone());
728            inner_scope.union_scopes = scope.union_scopes.clone();
729            inner_scope.sources = scope.sources.clone();
730            inner_scope.cte_sources = scope.cte_sources.clone();
731            inner_scope.cte_scopes = scope.cte_scopes.clone();
732            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
733            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
734            return handle_set_operation(
735                &column,
736                &inner_scope,
737                dialect,
738                scope_name,
739                source_name,
740                reference_node_name,
741                trim_selects,
742                ancestor_cte_scopes,
743                depth,
744            );
745        }
746        return handle_set_operation(
747            &column,
748            scope,
749            dialect,
750            scope_name,
751            source_name,
752            reference_node_name,
753            trim_selects,
754            ancestor_cte_scopes,
755            depth,
756        );
757    }
758
759    // 2. Find the select expression for this column
760    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
761    let column_name = resolve_column_name(&column, &select_expr);
762
763    // 3. Trim source if requested
764    let node_source = if trim_selects {
765        trim_source(effective_expr, &select_expr)
766    } else {
767        effective_expr.clone()
768    };
769
770    // 4. Create the lineage node
771    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
772    apply_scope_context(&mut node, scope, source_name, reference_node_name);
773
774    // 5. Star handling — add downstream for each source
775    if matches!(&select_expr, Expression::Star(_)) {
776        for (name, source_info) in &scope.sources {
777            let mut child = LineageNode::new(
778                format!("{}.*", name),
779                Expression::Star(crate::expressions::Star {
780                    table: None,
781                    except: None,
782                    replace: None,
783                    rename: None,
784                    trailing_comments: vec![],
785                    span: None,
786                }),
787                source_info.expression.clone(),
788            );
789            apply_source_info_context(&mut child, name, source_info);
790            node.downstream.push(child);
791        }
792        return Ok(node);
793    }
794
795    // 6. Subqueries in select — trace through scalar subqueries
796    let subqueries: Vec<&Expression> =
797        select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
798    for sq_expr in subqueries {
799        if let Expression::Subquery(sq) = sq_expr {
800            for sq_scope in &scope.subquery_scopes {
801                if sq_scope.expression == sq.this {
802                    if let Ok(child) = to_node_inner(
803                        ColumnRef::Index(0),
804                        sq_scope,
805                        dialect,
806                        &column_name,
807                        "",
808                        "",
809                        trim_selects,
810                        ancestor_cte_scopes,
811                        depth + 1,
812                    ) {
813                        node.downstream.push(child);
814                    }
815                    break;
816                }
817            }
818        }
819    }
820
821    // 7. Column references — trace each column to its source
822    let col_refs = find_column_refs_in_expr(&select_expr, dialect);
823    for col_ref in col_refs {
824        let col_name = &col_ref.column;
825        if let Some(ref table_id) = col_ref.table {
826            let tbl = &table_id.name;
827            resolve_qualified_column(
828                &mut node,
829                scope,
830                dialect,
831                tbl,
832                col_name,
833                &column_name,
834                trim_selects,
835                &all_cte_scopes,
836                depth,
837            );
838        } else {
839            resolve_unqualified_column(
840                &mut node,
841                scope,
842                dialect,
843                col_name,
844                &column_name,
845                trim_selects,
846                &all_cte_scopes,
847                depth,
848            );
849        }
850    }
851
852    Ok(node)
853}
854
855// ---------------------------------------------------------------------------
856// Set operation handling
857// ---------------------------------------------------------------------------
858
859fn handle_set_operation(
860    column: &ColumnRef<'_>,
861    scope: &Scope,
862    dialect: Option<DialectType>,
863    scope_name: &str,
864    source_name: &str,
865    reference_node_name: &str,
866    trim_selects: bool,
867    ancestor_cte_scopes: &[Scope],
868    depth: usize,
869) -> Result<LineageNode> {
870    let scope_expr = &scope.expression;
871
872    // Determine column index
873    let col_index = match column {
874        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
875        ColumnRef::Index(i) => *i,
876    };
877
878    let col_name = match column {
879        ColumnRef::Name(name) => name.to_string(),
880        ColumnRef::Index(_) => format!("_{col_index}"),
881    };
882
883    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
884    apply_scope_context(&mut node, scope, source_name, reference_node_name);
885
886    // Recurse into each union branch
887    for branch_scope in &scope.union_scopes {
888        if let Ok(child) = to_node_inner(
889            ColumnRef::Index(col_index),
890            branch_scope,
891            dialect,
892            scope_name,
893            "",
894            "",
895            trim_selects,
896            ancestor_cte_scopes,
897            depth + 1,
898        ) {
899            node.downstream.push(child);
900        }
901    }
902
903    Ok(node)
904}
905
906// ---------------------------------------------------------------------------
907// Column resolution helpers
908// ---------------------------------------------------------------------------
909
910fn resolve_qualified_column(
911    node: &mut LineageNode,
912    scope: &Scope,
913    dialect: Option<DialectType>,
914    table: &str,
915    col_name: &str,
916    parent_name: &str,
917    trim_selects: bool,
918    all_cte_scopes: &[&Scope],
919    depth: usize,
920) {
921    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
922    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
923    let resolved_cte_name = resolve_cte_alias(scope, table);
924    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
925
926    // Check if table is a CTE reference — check both the current scope's cte_sources
927    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
928    let is_cte = scope.cte_sources.contains_key(effective_table)
929        || all_cte_scopes.iter().any(
930            |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
931        );
932    if is_cte {
933        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
934            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
935            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
936            if let Ok(child) = to_node_inner(
937                ColumnRef::Name(col_name),
938                child_scope,
939                dialect,
940                parent_name,
941                effective_table,
942                parent_name,
943                trim_selects,
944                &ancestors,
945                depth + 1,
946            ) {
947                node.downstream.push(child);
948                return;
949            }
950        }
951    }
952
953    // Check if table is a derived table (is_scope = true in sources)
954    if let Some(source_info) = scope.sources.get(table) {
955        if source_info.is_scope {
956            if let Some(child_scope) = find_child_scope(scope, table) {
957                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
958                if let Ok(child) = to_node_inner(
959                    ColumnRef::Name(col_name),
960                    child_scope,
961                    dialect,
962                    parent_name,
963                    table,
964                    parent_name,
965                    trim_selects,
966                    &ancestors,
967                    depth + 1,
968                ) {
969                    node.downstream.push(child);
970                    return;
971                }
972            }
973        }
974    }
975
976    // Base table source found in current scope: preserve alias in the display name
977    // but store the resolved table expression and name for downstream consumers.
978    if let Some(source_info) = scope.sources.get(table) {
979        if !source_info.is_scope {
980            let mut child = make_table_column_node_from_source(table, col_name, source_info);
981            if source_info.kind == SourceKind::Virtual {
982                attach_virtual_source_dependencies(
983                    &mut child,
984                    scope,
985                    dialect,
986                    table,
987                    &source_info.expression,
988                    trim_selects,
989                    all_cte_scopes,
990                    depth,
991                );
992            }
993            node.downstream.push(child);
994            return;
995        }
996    }
997
998    // Base table or unresolved — terminal node
999    node.downstream
1000        .push(make_table_column_node(table, col_name));
1001}
1002
1003/// Resolve a FROM alias to the original CTE name.
1004///
1005/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
1006/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
1007/// This function checks if `name` is such an alias and returns the CTE name.
1008fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1009    // If it's already a known CTE name, no resolution needed
1010    if scope.cte_sources.contains_key(name) {
1011        return None;
1012    }
1013    // Check if the source's expression is a CTE — if so, extract the CTE name
1014    if let Some(source_info) = scope.sources.get(name) {
1015        if source_info.is_scope {
1016            if let Expression::Cte(cte) = &source_info.expression {
1017                let cte_name = &cte.alias.name;
1018                if scope.cte_sources.contains_key(cte_name) {
1019                    return Some(cte_name.clone());
1020                }
1021            }
1022        }
1023    }
1024    None
1025}
1026
1027fn resolve_unqualified_column(
1028    node: &mut LineageNode,
1029    scope: &Scope,
1030    dialect: Option<DialectType>,
1031    col_name: &str,
1032    parent_name: &str,
1033    trim_selects: bool,
1034    all_cte_scopes: &[&Scope],
1035    depth: usize,
1036) {
1037    // Try to find which source this column belongs to.
1038    // Build the source list from the actual FROM/JOIN clauses to avoid
1039    // mixing in CTE definitions that are in scope but not referenced.
1040    let from_source_names = source_names_from_from_join(scope);
1041
1042    if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1043        resolve_qualified_column(
1044            node,
1045            scope,
1046            dialect,
1047            &tbl,
1048            col_name,
1049            parent_name,
1050            trim_selects,
1051            all_cte_scopes,
1052            depth,
1053        );
1054        return;
1055    }
1056
1057    if from_source_names.len() == 1 {
1058        let tbl = &from_source_names[0];
1059        resolve_qualified_column(
1060            node,
1061            scope,
1062            dialect,
1063            tbl,
1064            col_name,
1065            parent_name,
1066            trim_selects,
1067            all_cte_scopes,
1068            depth,
1069        );
1070        return;
1071    }
1072
1073    // Multiple sources — can't resolve without schema info, add unqualified node
1074    let child = LineageNode::new(
1075        col_name.to_string(),
1076        Expression::Column(Box::new(crate::expressions::Column {
1077            name: crate::expressions::Identifier::new(col_name.to_string()),
1078            table: None,
1079            join_mark: false,
1080            trailing_comments: vec![],
1081            span: None,
1082            inferred_type: None,
1083        })),
1084        node.source.clone(),
1085    );
1086    node.downstream.push(child);
1087}
1088
1089fn unique_virtual_source_for_column(
1090    scope: &Scope,
1091    source_names: &[String],
1092    col_name: &str,
1093) -> Option<String> {
1094    let mut matches = source_names.iter().filter_map(|source_name| {
1095        let source = scope.sources.get(source_name)?;
1096        if source.kind == SourceKind::Virtual
1097            && virtual_source_output_columns(source)
1098                .any(|column| column.eq_ignore_ascii_case(col_name))
1099        {
1100            Some(source_name.clone())
1101        } else {
1102            None
1103        }
1104    });
1105
1106    let first = matches.next()?;
1107    if matches.next().is_none() {
1108        Some(first)
1109    } else {
1110        None
1111    }
1112}
1113
1114fn virtual_source_output_columns(
1115    source_info: &ScopeSourceInfo,
1116) -> Box<dyn Iterator<Item = String> + '_> {
1117    match &source_info.expression {
1118        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1119        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1120            Box::new(alias_output_columns(alias))
1121        }
1122        Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1123        Expression::LateralView(lateral_view) => {
1124            Box::new(lateral_view_output_columns(lateral_view))
1125        }
1126        _ => Box::new(source_info.alias.clone().into_iter()),
1127    }
1128}
1129
1130fn unnest_output_columns(
1131    unnest: &crate::expressions::UnnestFunc,
1132) -> impl Iterator<Item = String> + '_ {
1133    unnest
1134        .alias
1135        .iter()
1136        .map(|alias| alias.name.clone())
1137        .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1138}
1139
1140fn alias_output_columns(
1141    alias: &crate::expressions::Alias,
1142) -> Box<dyn Iterator<Item = String> + '_> {
1143    if alias.column_aliases.is_empty() {
1144        Box::new(std::iter::once(alias.alias.name.clone()))
1145    } else {
1146        Box::new(
1147            alias
1148                .column_aliases
1149                .iter()
1150                .map(|column| column.name.clone()),
1151        )
1152    }
1153}
1154
1155fn lateral_output_columns(
1156    lateral: &crate::expressions::Lateral,
1157) -> Box<dyn Iterator<Item = String> + '_> {
1158    if lateral.column_aliases.is_empty() {
1159        default_virtual_output_columns(&lateral.this)
1160    } else {
1161        Box::new(lateral.column_aliases.iter().cloned())
1162    }
1163}
1164
1165fn lateral_view_output_columns(
1166    lateral_view: &crate::expressions::LateralView,
1167) -> Box<dyn Iterator<Item = String> + '_> {
1168    Box::new(
1169        lateral_view
1170            .column_aliases
1171            .iter()
1172            .map(|column| column.name.clone()),
1173    )
1174}
1175
1176fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1177    match expr {
1178        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1179        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1180            alias_output_columns(alias)
1181        }
1182        Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1183            Box::new(
1184                ["seq", "key", "path", "index", "value", "this"]
1185                    .into_iter()
1186                    .map(String::from),
1187            )
1188        }
1189        _ => Box::new(std::iter::empty()),
1190    }
1191}
1192
1193fn attach_virtual_source_dependencies(
1194    node: &mut LineageNode,
1195    scope: &Scope,
1196    dialect: Option<DialectType>,
1197    source_alias: &str,
1198    source_expr: &Expression,
1199    trim_selects: bool,
1200    all_cte_scopes: &[&Scope],
1201    depth: usize,
1202) {
1203    let parent_name = node.name.clone();
1204    let mut seen = HashSet::new();
1205    for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1206        let key = (
1207            col_ref.table.as_ref().map(|t| t.name.clone()),
1208            col_ref.column.clone(),
1209        );
1210        if !seen.insert(key) {
1211            continue;
1212        }
1213
1214        if let Some(table_id) = col_ref.table {
1215            let table = table_id.name;
1216            if table == source_alias {
1217                continue;
1218            }
1219            resolve_qualified_column(
1220                node,
1221                scope,
1222                dialect,
1223                &table,
1224                &col_ref.column,
1225                &parent_name,
1226                trim_selects,
1227                all_cte_scopes,
1228                depth + 1,
1229            );
1230        } else {
1231            let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1232            if non_virtual_sources.len() == 1 {
1233                resolve_qualified_column(
1234                    node,
1235                    scope,
1236                    dialect,
1237                    &non_virtual_sources[0],
1238                    &col_ref.column,
1239                    &parent_name,
1240                    trim_selects,
1241                    all_cte_scopes,
1242                    depth + 1,
1243                );
1244            }
1245        }
1246    }
1247}
1248
1249fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1250    fn source_name(expr: &Expression) -> Option<String> {
1251        match expr {
1252            Expression::Table(table) => Some(
1253                table
1254                    .alias
1255                    .as_ref()
1256                    .map(|a| a.name.clone())
1257                    .unwrap_or_else(|| table.name.name.clone()),
1258            ),
1259            Expression::Subquery(subquery) => {
1260                subquery.alias.as_ref().map(|alias| alias.name.clone())
1261            }
1262            Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1263            Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1264                Some(alias.alias.name.clone())
1265            }
1266            Expression::Lateral(lateral) => lateral.alias.clone(),
1267            Expression::LateralView(lateral_view) => lateral_view
1268                .table_alias
1269                .as_ref()
1270                .or_else(|| lateral_view.column_aliases.first())
1271                .map(|alias| alias.name.clone()),
1272            Expression::Paren(paren) => source_name(&paren.this),
1273            _ => None,
1274        }
1275    }
1276
1277    let effective_expr = match &scope.expression {
1278        Expression::Cte(cte) => &cte.this,
1279        expr => expr,
1280    };
1281
1282    let mut names = Vec::new();
1283    let mut seen = std::collections::HashSet::new();
1284
1285    if let Expression::Select(select) = effective_expr {
1286        if let Some(from) = &select.from {
1287            for expr in &from.expressions {
1288                if let Some(name) = source_name(expr) {
1289                    if !name.is_empty() && seen.insert(name.clone()) {
1290                        names.push(name);
1291                    }
1292                }
1293            }
1294        }
1295        for join in &select.joins {
1296            if let Some(name) = source_name(&join.this) {
1297                if !name.is_empty() && seen.insert(name.clone()) {
1298                    names.push(name);
1299                }
1300            }
1301        }
1302        for lateral_view in &select.lateral_views {
1303            if let Some(name) =
1304                source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
1305            {
1306                if !name.is_empty() && seen.insert(name.clone()) {
1307                    names.push(name);
1308                }
1309            }
1310        }
1311    }
1312
1313    names
1314}
1315
1316fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1317    source_names_from_from_join(scope)
1318        .into_iter()
1319        .filter(|name| {
1320            !matches!(
1321                scope.sources.get(name).map(|source| source.kind),
1322                Some(SourceKind::Virtual)
1323            )
1324        })
1325        .collect()
1326}
1327
1328// ---------------------------------------------------------------------------
1329// Helper functions
1330// ---------------------------------------------------------------------------
1331
1332/// Get the alias or name of an expression
1333fn get_alias_or_name(expr: &Expression) -> Option<String> {
1334    match expr {
1335        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1336        Expression::Column(col) => Some(col.name.name.clone()),
1337        Expression::Identifier(id) => Some(id.name.clone()),
1338        Expression::Star(_) => Some("*".to_string()),
1339        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
1340        // Unwrap to get the actual column/alias name from the inner expression.
1341        Expression::Annotated(a) => get_alias_or_name(&a.this),
1342        _ => None,
1343    }
1344}
1345
1346/// Resolve the display name for a column reference.
1347fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1348    match column {
1349        ColumnRef::Name(n) => n.to_string(),
1350        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1351    }
1352}
1353
1354/// Find the select expression matching a column reference.
1355fn find_select_expr(
1356    scope_expr: &Expression,
1357    column: &ColumnRef<'_>,
1358    dialect: Option<DialectType>,
1359) -> Result<Expression> {
1360    if let Expression::Select(ref select) = scope_expr {
1361        match column {
1362            ColumnRef::Name(name) => {
1363                let normalized_name = normalize_column_name(name, dialect);
1364                for expr in &select.expressions {
1365                    if let Some(alias_or_name) = get_alias_or_name(expr) {
1366                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1367                            return Ok(expr.clone());
1368                        }
1369                    }
1370                }
1371                Err(crate::error::Error::parse(
1372                    format!("Cannot find column '{}' in query", name),
1373                    0,
1374                    0,
1375                    0,
1376                    0,
1377                ))
1378            }
1379            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1380                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1381            }),
1382        }
1383    } else {
1384        Err(crate::error::Error::parse(
1385            "Expected SELECT expression for column lookup",
1386            0,
1387            0,
1388            0,
1389            0,
1390        ))
1391    }
1392}
1393
1394/// Find the positional index of a column name in a set operation's first SELECT branch.
1395fn column_to_index(
1396    set_op_expr: &Expression,
1397    name: &str,
1398    dialect: Option<DialectType>,
1399) -> Result<usize> {
1400    let normalized_name = normalize_column_name(name, dialect);
1401    let mut expr = set_op_expr;
1402    loop {
1403        match expr {
1404            Expression::Union(u) => expr = &u.left,
1405            Expression::Intersect(i) => expr = &i.left,
1406            Expression::Except(e) => expr = &e.left,
1407            Expression::Select(select) => {
1408                for (i, e) in select.expressions.iter().enumerate() {
1409                    if let Some(alias_or_name) = get_alias_or_name(e) {
1410                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1411                            return Ok(i);
1412                        }
1413                    }
1414                }
1415                return Err(crate::error::Error::parse(
1416                    format!("Cannot find column '{}' in set operation", name),
1417                    0,
1418                    0,
1419                    0,
1420                    0,
1421                ));
1422            }
1423            _ => {
1424                return Err(crate::error::Error::parse(
1425                    "Expected SELECT or set operation",
1426                    0,
1427                    0,
1428                    0,
1429                    0,
1430                ))
1431            }
1432        }
1433    }
1434}
1435
1436fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1437    normalize_name(name, dialect, false, true)
1438}
1439
1440/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
1441fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1442    if let Expression::Select(select) = select_expr {
1443        let mut trimmed = select.as_ref().clone();
1444        trimmed.expressions = vec![target_expr.clone()];
1445        Expression::Select(Box::new(trimmed))
1446    } else {
1447        select_expr.clone()
1448    }
1449}
1450
1451/// Find the child scope (CTE or derived table) for a given source name.
1452fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1453    // Check CTE scopes
1454    if scope.cte_sources.contains_key(source_name) {
1455        for cte_scope in &scope.cte_scopes {
1456            if let Expression::Cte(cte) = &cte_scope.expression {
1457                if cte.alias.name == source_name {
1458                    return Some(cte_scope);
1459                }
1460            }
1461        }
1462    }
1463
1464    // Check derived table scopes
1465    if let Some(source_info) = scope.sources.get(source_name) {
1466        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1467            if let Expression::Subquery(sq) = &source_info.expression {
1468                for dt_scope in &scope.derived_table_scopes {
1469                    if dt_scope.expression == sq.this {
1470                        return Some(dt_scope);
1471                    }
1472                }
1473            }
1474        }
1475    }
1476
1477    None
1478}
1479
1480/// Find a CTE scope by name, searching through a combined list of CTE scopes.
1481/// This handles nested CTEs where the current scope doesn't have the CTE scope
1482/// as a direct child but knows about it via cte_sources.
1483fn find_child_scope_in<'a>(
1484    all_cte_scopes: &[&'a Scope],
1485    scope: &'a Scope,
1486    source_name: &str,
1487) -> Option<&'a Scope> {
1488    // First try the scope's own cte_scopes
1489    for cte_scope in &scope.cte_scopes {
1490        if let Expression::Cte(cte) = &cte_scope.expression {
1491            if cte.alias.name == source_name {
1492                return Some(cte_scope);
1493            }
1494        }
1495    }
1496
1497    // Then search through all ancestor CTE scopes
1498    for cte_scope in all_cte_scopes {
1499        if let Expression::Cte(cte) = &cte_scope.expression {
1500            if cte.alias.name == source_name {
1501                return Some(cte_scope);
1502            }
1503        }
1504    }
1505
1506    // Fall back to derived table scopes
1507    if let Some(source_info) = scope.sources.get(source_name) {
1508        if source_info.is_scope {
1509            if let Expression::Subquery(sq) = &source_info.expression {
1510                for dt_scope in &scope.derived_table_scopes {
1511                    if dt_scope.expression == sq.this {
1512                        return Some(dt_scope);
1513                    }
1514                }
1515            }
1516        }
1517    }
1518
1519    None
1520}
1521
1522/// Create a terminal lineage node for a table.column reference.
1523fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1524    let mut node = LineageNode::new(
1525        format!("{}.{}", table, column),
1526        Expression::Column(Box::new(crate::expressions::Column {
1527            name: crate::expressions::Identifier::new(column.to_string()),
1528            table: Some(crate::expressions::Identifier::new(table.to_string())),
1529            join_mark: false,
1530            trailing_comments: vec![],
1531            span: None,
1532            inferred_type: None,
1533        })),
1534        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1535    );
1536    node.source_name = table.to_string();
1537    node.source_kind = SourceKind::Table;
1538    node
1539}
1540
1541fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1542    let mut parts: Vec<String> = Vec::new();
1543    if let Some(catalog) = &table_ref.catalog {
1544        parts.push(catalog.name.clone());
1545    }
1546    if let Some(schema) = &table_ref.schema {
1547        parts.push(schema.name.clone());
1548    }
1549    parts.push(table_ref.name.name.clone());
1550    parts.join(".")
1551}
1552
1553fn apply_source_info_context(
1554    node: &mut LineageNode,
1555    source_key: &str,
1556    source_info: &ScopeSourceInfo,
1557) {
1558    node.source_kind = source_info.kind;
1559    node.source_name =
1560        source_info
1561            .lineage_name
1562            .clone()
1563            .unwrap_or_else(|| match &source_info.expression {
1564                Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
1565                _ => source_key.to_string(),
1566            });
1567    node.source_alias = source_info.alias.clone();
1568}
1569
1570fn make_table_column_node_from_source(
1571    source_key: &str,
1572    column: &str,
1573    source_info: &ScopeSourceInfo,
1574) -> LineageNode {
1575    let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
1576    let mut node = LineageNode::new(
1577        format!("{}.{}", lineage_name, column),
1578        Expression::Column(Box::new(crate::expressions::Column {
1579            name: crate::expressions::Identifier::new(column.to_string()),
1580            table: Some(crate::expressions::Identifier::new(
1581                lineage_name.to_string(),
1582            )),
1583            join_mark: false,
1584            trailing_comments: vec![],
1585            span: None,
1586            inferred_type: None,
1587        })),
1588        source_info.expression.clone(),
1589    );
1590
1591    apply_source_info_context(&mut node, source_key, source_info);
1592
1593    node
1594}
1595
1596/// Simple column reference extracted from an expression
1597#[derive(Debug, Clone)]
1598struct SimpleColumnRef {
1599    table: Option<crate::expressions::Identifier>,
1600    column: String,
1601}
1602
1603/// Find all column references in an expression (does not recurse into subqueries).
1604fn find_column_refs_in_expr(
1605    expr: &Expression,
1606    dialect: Option<DialectType>,
1607) -> Vec<SimpleColumnRef> {
1608    let mut refs = Vec::new();
1609    collect_column_refs(expr, dialect, &mut refs);
1610    refs
1611}
1612
1613fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1614    match expr {
1615        Expression::Column(col) => {
1616            col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1617        }
1618        Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1619        _ => false,
1620    }
1621}
1622
1623fn collect_column_refs(
1624    expr: &Expression,
1625    dialect: Option<DialectType>,
1626    refs: &mut Vec<SimpleColumnRef>,
1627) {
1628    let mut stack: Vec<&Expression> = vec![expr];
1629
1630    while let Some(current) = stack.pop() {
1631        match current {
1632            // === Leaf: collect Column references ===
1633            Expression::Column(col) => {
1634                refs.push(SimpleColumnRef {
1635                    table: col.table.clone(),
1636                    column: col.name.name.clone(),
1637                });
1638            }
1639
1640            // === Boundary: don't recurse into subqueries (handled separately) ===
1641            Expression::Subquery(_) | Expression::Exists(_) => {}
1642
1643            // === BinaryOp variants: left, right ===
1644            Expression::And(op)
1645            | Expression::Or(op)
1646            | Expression::Eq(op)
1647            | Expression::Neq(op)
1648            | Expression::Lt(op)
1649            | Expression::Lte(op)
1650            | Expression::Gt(op)
1651            | Expression::Gte(op)
1652            | Expression::Add(op)
1653            | Expression::Sub(op)
1654            | Expression::Mul(op)
1655            | Expression::Div(op)
1656            | Expression::Mod(op)
1657            | Expression::BitwiseAnd(op)
1658            | Expression::BitwiseOr(op)
1659            | Expression::BitwiseXor(op)
1660            | Expression::BitwiseLeftShift(op)
1661            | Expression::BitwiseRightShift(op)
1662            | Expression::Concat(op)
1663            | Expression::Adjacent(op)
1664            | Expression::TsMatch(op)
1665            | Expression::PropertyEQ(op)
1666            | Expression::ArrayContainsAll(op)
1667            | Expression::ArrayContainedBy(op)
1668            | Expression::ArrayOverlaps(op)
1669            | Expression::JSONBContainsAllTopKeys(op)
1670            | Expression::JSONBContainsAnyTopKeys(op)
1671            | Expression::JSONBDeleteAtPath(op)
1672            | Expression::ExtendsLeft(op)
1673            | Expression::ExtendsRight(op)
1674            | Expression::Is(op)
1675            | Expression::MemberOf(op)
1676            | Expression::NullSafeEq(op)
1677            | Expression::NullSafeNeq(op)
1678            | Expression::Glob(op)
1679            | Expression::Match(op) => {
1680                stack.push(&op.left);
1681                stack.push(&op.right);
1682            }
1683
1684            // === UnaryOp variants: this ===
1685            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1686                stack.push(&u.this);
1687            }
1688
1689            // === UnaryFunc variants: this ===
1690            Expression::Upper(f)
1691            | Expression::Lower(f)
1692            | Expression::Length(f)
1693            | Expression::LTrim(f)
1694            | Expression::RTrim(f)
1695            | Expression::Reverse(f)
1696            | Expression::Abs(f)
1697            | Expression::Sqrt(f)
1698            | Expression::Cbrt(f)
1699            | Expression::Ln(f)
1700            | Expression::Exp(f)
1701            | Expression::Sign(f)
1702            | Expression::Date(f)
1703            | Expression::Time(f)
1704            | Expression::DateFromUnixDate(f)
1705            | Expression::UnixDate(f)
1706            | Expression::UnixSeconds(f)
1707            | Expression::UnixMillis(f)
1708            | Expression::UnixMicros(f)
1709            | Expression::TimeStrToDate(f)
1710            | Expression::DateToDi(f)
1711            | Expression::DiToDate(f)
1712            | Expression::TsOrDiToDi(f)
1713            | Expression::TsOrDsToDatetime(f)
1714            | Expression::TsOrDsToTimestamp(f)
1715            | Expression::YearOfWeek(f)
1716            | Expression::YearOfWeekIso(f)
1717            | Expression::Initcap(f)
1718            | Expression::Ascii(f)
1719            | Expression::Chr(f)
1720            | Expression::Soundex(f)
1721            | Expression::ByteLength(f)
1722            | Expression::Hex(f)
1723            | Expression::LowerHex(f)
1724            | Expression::Unicode(f)
1725            | Expression::Radians(f)
1726            | Expression::Degrees(f)
1727            | Expression::Sin(f)
1728            | Expression::Cos(f)
1729            | Expression::Tan(f)
1730            | Expression::Asin(f)
1731            | Expression::Acos(f)
1732            | Expression::Atan(f)
1733            | Expression::IsNan(f)
1734            | Expression::IsInf(f)
1735            | Expression::ArrayLength(f)
1736            | Expression::ArraySize(f)
1737            | Expression::Cardinality(f)
1738            | Expression::ArrayReverse(f)
1739            | Expression::ArrayDistinct(f)
1740            | Expression::ArrayFlatten(f)
1741            | Expression::ArrayCompact(f)
1742            | Expression::Explode(f)
1743            | Expression::ExplodeOuter(f)
1744            | Expression::ToArray(f)
1745            | Expression::MapFromEntries(f)
1746            | Expression::MapKeys(f)
1747            | Expression::MapValues(f)
1748            | Expression::JsonArrayLength(f)
1749            | Expression::JsonKeys(f)
1750            | Expression::JsonType(f)
1751            | Expression::ParseJson(f)
1752            | Expression::ToJson(f)
1753            | Expression::Typeof(f)
1754            | Expression::BitwiseCount(f)
1755            | Expression::Year(f)
1756            | Expression::Month(f)
1757            | Expression::Day(f)
1758            | Expression::Hour(f)
1759            | Expression::Minute(f)
1760            | Expression::Second(f)
1761            | Expression::DayOfWeek(f)
1762            | Expression::DayOfWeekIso(f)
1763            | Expression::DayOfMonth(f)
1764            | Expression::DayOfYear(f)
1765            | Expression::WeekOfYear(f)
1766            | Expression::Quarter(f)
1767            | Expression::Epoch(f)
1768            | Expression::EpochMs(f)
1769            | Expression::TimeStrToUnix(f)
1770            | Expression::SHA(f)
1771            | Expression::SHA1Digest(f)
1772            | Expression::TimeToUnix(f)
1773            | Expression::JSONBool(f)
1774            | Expression::Int64(f)
1775            | Expression::MD5NumberLower64(f)
1776            | Expression::MD5NumberUpper64(f)
1777            | Expression::DateStrToDate(f)
1778            | Expression::DateToDateStr(f) => {
1779                stack.push(&f.this);
1780            }
1781
1782            // === BinaryFunc variants: this, expression ===
1783            Expression::Power(f)
1784            | Expression::NullIf(f)
1785            | Expression::IfNull(f)
1786            | Expression::Nvl(f)
1787            | Expression::UnixToTimeStr(f)
1788            | Expression::Contains(f)
1789            | Expression::StartsWith(f)
1790            | Expression::EndsWith(f)
1791            | Expression::Levenshtein(f)
1792            | Expression::ModFunc(f)
1793            | Expression::Atan2(f)
1794            | Expression::IntDiv(f)
1795            | Expression::AddMonths(f)
1796            | Expression::MonthsBetween(f)
1797            | Expression::NextDay(f)
1798            | Expression::ArrayContains(f)
1799            | Expression::ArrayPosition(f)
1800            | Expression::ArrayAppend(f)
1801            | Expression::ArrayPrepend(f)
1802            | Expression::ArrayUnion(f)
1803            | Expression::ArrayExcept(f)
1804            | Expression::ArrayRemove(f)
1805            | Expression::StarMap(f)
1806            | Expression::MapFromArrays(f)
1807            | Expression::MapContainsKey(f)
1808            | Expression::ElementAt(f)
1809            | Expression::JsonMergePatch(f)
1810            | Expression::JSONBContains(f)
1811            | Expression::JSONBExtract(f) => {
1812                stack.push(&f.this);
1813                stack.push(&f.expression);
1814            }
1815
1816            // === VarArgFunc variants: expressions ===
1817            Expression::Greatest(f)
1818            | Expression::Least(f)
1819            | Expression::Coalesce(f)
1820            | Expression::ArrayConcat(f)
1821            | Expression::ArrayIntersect(f)
1822            | Expression::ArrayZip(f)
1823            | Expression::MapConcat(f)
1824            | Expression::JsonArray(f) => {
1825                for e in &f.expressions {
1826                    stack.push(e);
1827                }
1828            }
1829
1830            // === AggFunc variants: this, filter, having_max, limit ===
1831            Expression::Sum(f)
1832            | Expression::Avg(f)
1833            | Expression::Min(f)
1834            | Expression::Max(f)
1835            | Expression::ArrayAgg(f)
1836            | Expression::CountIf(f)
1837            | Expression::Stddev(f)
1838            | Expression::StddevPop(f)
1839            | Expression::StddevSamp(f)
1840            | Expression::Variance(f)
1841            | Expression::VarPop(f)
1842            | Expression::VarSamp(f)
1843            | Expression::Median(f)
1844            | Expression::Mode(f)
1845            | Expression::First(f)
1846            | Expression::Last(f)
1847            | Expression::AnyValue(f)
1848            | Expression::ApproxDistinct(f)
1849            | Expression::ApproxCountDistinct(f)
1850            | Expression::LogicalAnd(f)
1851            | Expression::LogicalOr(f)
1852            | Expression::Skewness(f)
1853            | Expression::ArrayConcatAgg(f)
1854            | Expression::ArrayUniqueAgg(f)
1855            | Expression::BoolXorAgg(f)
1856            | Expression::BitwiseAndAgg(f)
1857            | Expression::BitwiseOrAgg(f)
1858            | Expression::BitwiseXorAgg(f) => {
1859                stack.push(&f.this);
1860                if let Some(ref filter) = f.filter {
1861                    stack.push(filter);
1862                }
1863                if let Some((ref expr, _)) = f.having_max {
1864                    stack.push(expr);
1865                }
1866                if let Some(ref limit) = f.limit {
1867                    stack.push(limit);
1868                }
1869            }
1870
1871            // === Generic Function / AggregateFunction: args ===
1872            Expression::Function(func) => {
1873                for arg in &func.args {
1874                    stack.push(arg);
1875                }
1876            }
1877            Expression::AggregateFunction(func) => {
1878                for arg in &func.args {
1879                    stack.push(arg);
1880                }
1881                if let Some(ref filter) = func.filter {
1882                    stack.push(filter);
1883                }
1884                if let Some(ref limit) = func.limit {
1885                    stack.push(limit);
1886                }
1887            }
1888
1889            // === WindowFunction: this (skip Over for lineage purposes) ===
1890            Expression::WindowFunction(wf) => {
1891                stack.push(&wf.this);
1892            }
1893
1894            // === Containers and special expressions ===
1895            Expression::Alias(a) => {
1896                stack.push(&a.this);
1897            }
1898            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1899                stack.push(&c.this);
1900                if let Some(ref fmt) = c.format {
1901                    stack.push(fmt);
1902                }
1903                if let Some(ref def) = c.default {
1904                    stack.push(def);
1905                }
1906            }
1907            Expression::Paren(p) => {
1908                stack.push(&p.this);
1909            }
1910            Expression::Annotated(a) => {
1911                stack.push(&a.this);
1912            }
1913            Expression::Case(case) => {
1914                if let Some(ref operand) = case.operand {
1915                    stack.push(operand);
1916                }
1917                for (cond, result) in &case.whens {
1918                    stack.push(cond);
1919                    stack.push(result);
1920                }
1921                if let Some(ref else_expr) = case.else_ {
1922                    stack.push(else_expr);
1923                }
1924            }
1925            Expression::Collation(c) => {
1926                stack.push(&c.this);
1927            }
1928            Expression::In(i) => {
1929                stack.push(&i.this);
1930                for e in &i.expressions {
1931                    stack.push(e);
1932                }
1933                if let Some(ref q) = i.query {
1934                    stack.push(q);
1935                }
1936                if let Some(ref u) = i.unnest {
1937                    stack.push(u);
1938                }
1939            }
1940            Expression::Between(b) => {
1941                stack.push(&b.this);
1942                stack.push(&b.low);
1943                stack.push(&b.high);
1944            }
1945            Expression::IsNull(n) => {
1946                stack.push(&n.this);
1947            }
1948            Expression::IsTrue(t) | Expression::IsFalse(t) => {
1949                stack.push(&t.this);
1950            }
1951            Expression::IsJson(j) => {
1952                stack.push(&j.this);
1953            }
1954            Expression::Like(l) | Expression::ILike(l) => {
1955                stack.push(&l.left);
1956                stack.push(&l.right);
1957                if let Some(ref esc) = l.escape {
1958                    stack.push(esc);
1959                }
1960            }
1961            Expression::SimilarTo(s) => {
1962                stack.push(&s.this);
1963                stack.push(&s.pattern);
1964                if let Some(ref esc) = s.escape {
1965                    stack.push(esc);
1966                }
1967            }
1968            Expression::Ordered(o) => {
1969                stack.push(&o.this);
1970            }
1971            Expression::Array(a) => {
1972                for e in &a.expressions {
1973                    stack.push(e);
1974                }
1975            }
1976            Expression::Tuple(t) => {
1977                for e in &t.expressions {
1978                    stack.push(e);
1979                }
1980            }
1981            Expression::Struct(s) => {
1982                for (_, e) in &s.fields {
1983                    stack.push(e);
1984                }
1985            }
1986            Expression::Subscript(s) => {
1987                stack.push(&s.this);
1988                stack.push(&s.index);
1989            }
1990            Expression::Dot(d) => {
1991                stack.push(&d.this);
1992            }
1993            Expression::MethodCall(m) => {
1994                if !matches!(dialect, Some(DialectType::BigQuery))
1995                    || !is_bigquery_safe_namespace_receiver(&m.this)
1996                {
1997                    stack.push(&m.this);
1998                }
1999                for arg in &m.args {
2000                    stack.push(arg);
2001                }
2002            }
2003            Expression::ArraySlice(s) => {
2004                stack.push(&s.this);
2005                if let Some(ref start) = s.start {
2006                    stack.push(start);
2007                }
2008                if let Some(ref end) = s.end {
2009                    stack.push(end);
2010                }
2011            }
2012            Expression::Lambda(l) => {
2013                stack.push(&l.body);
2014            }
2015            Expression::NamedArgument(n) => {
2016                stack.push(&n.value);
2017            }
2018            Expression::Lateral(l) => {
2019                stack.push(&l.this);
2020                if let Some(ref view) = l.view {
2021                    stack.push(view);
2022                }
2023                if let Some(ref outer) = l.outer {
2024                    stack.push(outer);
2025                }
2026                if let Some(ref ordinality) = l.ordinality {
2027                    stack.push(ordinality);
2028                }
2029            }
2030            Expression::LateralView(lv) => {
2031                stack.push(&lv.this);
2032            }
2033            Expression::TryCatch(t) => {
2034                for stmt in &t.try_body {
2035                    stack.push(stmt);
2036                }
2037                if let Some(catch_body) = &t.catch_body {
2038                    for stmt in catch_body {
2039                        stack.push(stmt);
2040                    }
2041                }
2042            }
2043            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
2044                stack.push(e);
2045            }
2046
2047            // === Custom function structs ===
2048            Expression::Substring(f) => {
2049                stack.push(&f.this);
2050                stack.push(&f.start);
2051                if let Some(ref len) = f.length {
2052                    stack.push(len);
2053                }
2054            }
2055            Expression::Trim(f) => {
2056                stack.push(&f.this);
2057                if let Some(ref chars) = f.characters {
2058                    stack.push(chars);
2059                }
2060            }
2061            Expression::Replace(f) => {
2062                stack.push(&f.this);
2063                stack.push(&f.old);
2064                stack.push(&f.new);
2065            }
2066            Expression::IfFunc(f) => {
2067                stack.push(&f.condition);
2068                stack.push(&f.true_value);
2069                if let Some(ref fv) = f.false_value {
2070                    stack.push(fv);
2071                }
2072            }
2073            Expression::Nvl2(f) => {
2074                stack.push(&f.this);
2075                stack.push(&f.true_value);
2076                stack.push(&f.false_value);
2077            }
2078            Expression::ConcatWs(f) => {
2079                stack.push(&f.separator);
2080                for e in &f.expressions {
2081                    stack.push(e);
2082                }
2083            }
2084            Expression::Count(f) => {
2085                if let Some(ref this) = f.this {
2086                    stack.push(this);
2087                }
2088                if let Some(ref filter) = f.filter {
2089                    stack.push(filter);
2090                }
2091            }
2092            Expression::GroupConcat(f) => {
2093                stack.push(&f.this);
2094                if let Some(ref sep) = f.separator {
2095                    stack.push(sep);
2096                }
2097                if let Some(ref filter) = f.filter {
2098                    stack.push(filter);
2099                }
2100            }
2101            Expression::StringAgg(f) => {
2102                stack.push(&f.this);
2103                if let Some(ref sep) = f.separator {
2104                    stack.push(sep);
2105                }
2106                if let Some(ref filter) = f.filter {
2107                    stack.push(filter);
2108                }
2109                if let Some(ref limit) = f.limit {
2110                    stack.push(limit);
2111                }
2112            }
2113            Expression::ListAgg(f) => {
2114                stack.push(&f.this);
2115                if let Some(ref sep) = f.separator {
2116                    stack.push(sep);
2117                }
2118                if let Some(ref filter) = f.filter {
2119                    stack.push(filter);
2120                }
2121            }
2122            Expression::SumIf(f) => {
2123                stack.push(&f.this);
2124                stack.push(&f.condition);
2125                if let Some(ref filter) = f.filter {
2126                    stack.push(filter);
2127                }
2128            }
2129            Expression::DateAdd(f) | Expression::DateSub(f) => {
2130                stack.push(&f.this);
2131                stack.push(&f.interval);
2132            }
2133            Expression::DateDiff(f) => {
2134                stack.push(&f.this);
2135                stack.push(&f.expression);
2136            }
2137            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
2138                stack.push(&f.this);
2139            }
2140            Expression::Extract(f) => {
2141                stack.push(&f.this);
2142            }
2143            Expression::Round(f) => {
2144                stack.push(&f.this);
2145                if let Some(ref d) = f.decimals {
2146                    stack.push(d);
2147                }
2148            }
2149            Expression::Floor(f) => {
2150                stack.push(&f.this);
2151                if let Some(ref s) = f.scale {
2152                    stack.push(s);
2153                }
2154                if let Some(ref t) = f.to {
2155                    stack.push(t);
2156                }
2157            }
2158            Expression::Ceil(f) => {
2159                stack.push(&f.this);
2160                if let Some(ref d) = f.decimals {
2161                    stack.push(d);
2162                }
2163                if let Some(ref t) = f.to {
2164                    stack.push(t);
2165                }
2166            }
2167            Expression::Log(f) => {
2168                stack.push(&f.this);
2169                if let Some(ref b) = f.base {
2170                    stack.push(b);
2171                }
2172            }
2173            Expression::AtTimeZone(f) => {
2174                stack.push(&f.this);
2175                stack.push(&f.zone);
2176            }
2177            Expression::Lead(f) | Expression::Lag(f) => {
2178                stack.push(&f.this);
2179                if let Some(ref off) = f.offset {
2180                    stack.push(off);
2181                }
2182                if let Some(ref def) = f.default {
2183                    stack.push(def);
2184                }
2185            }
2186            Expression::FirstValue(f) | Expression::LastValue(f) => {
2187                stack.push(&f.this);
2188            }
2189            Expression::NthValue(f) => {
2190                stack.push(&f.this);
2191                stack.push(&f.offset);
2192            }
2193            Expression::Position(f) => {
2194                stack.push(&f.substring);
2195                stack.push(&f.string);
2196                if let Some(ref start) = f.start {
2197                    stack.push(start);
2198                }
2199            }
2200            Expression::Decode(f) => {
2201                stack.push(&f.this);
2202                for (search, result) in &f.search_results {
2203                    stack.push(search);
2204                    stack.push(result);
2205                }
2206                if let Some(ref def) = f.default {
2207                    stack.push(def);
2208                }
2209            }
2210            Expression::CharFunc(f) => {
2211                for arg in &f.args {
2212                    stack.push(arg);
2213                }
2214            }
2215            Expression::ArraySort(f) => {
2216                stack.push(&f.this);
2217                if let Some(ref cmp) = f.comparator {
2218                    stack.push(cmp);
2219                }
2220            }
2221            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2222                stack.push(&f.this);
2223                stack.push(&f.separator);
2224                if let Some(ref nr) = f.null_replacement {
2225                    stack.push(nr);
2226                }
2227            }
2228            Expression::ArrayFilter(f) => {
2229                stack.push(&f.this);
2230                stack.push(&f.filter);
2231            }
2232            Expression::ArrayTransform(f) => {
2233                stack.push(&f.this);
2234                stack.push(&f.transform);
2235            }
2236            Expression::Sequence(f)
2237            | Expression::Generate(f)
2238            | Expression::ExplodingGenerateSeries(f) => {
2239                stack.push(&f.start);
2240                stack.push(&f.stop);
2241                if let Some(ref step) = f.step {
2242                    stack.push(step);
2243                }
2244            }
2245            Expression::JsonExtract(f)
2246            | Expression::JsonExtractScalar(f)
2247            | Expression::JsonQuery(f)
2248            | Expression::JsonValue(f) => {
2249                stack.push(&f.this);
2250                stack.push(&f.path);
2251            }
2252            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2253                stack.push(&f.this);
2254                for p in &f.paths {
2255                    stack.push(p);
2256                }
2257            }
2258            Expression::JsonObject(f) => {
2259                for (k, v) in &f.pairs {
2260                    stack.push(k);
2261                    stack.push(v);
2262                }
2263            }
2264            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2265                stack.push(&f.this);
2266                for (path, val) in &f.path_values {
2267                    stack.push(path);
2268                    stack.push(val);
2269                }
2270            }
2271            Expression::Overlay(f) => {
2272                stack.push(&f.this);
2273                stack.push(&f.replacement);
2274                stack.push(&f.from);
2275                if let Some(ref len) = f.length {
2276                    stack.push(len);
2277                }
2278            }
2279            Expression::Convert(f) => {
2280                stack.push(&f.this);
2281                if let Some(ref style) = f.style {
2282                    stack.push(style);
2283                }
2284            }
2285            Expression::ApproxPercentile(f) => {
2286                stack.push(&f.this);
2287                stack.push(&f.percentile);
2288                if let Some(ref acc) = f.accuracy {
2289                    stack.push(acc);
2290                }
2291                if let Some(ref filter) = f.filter {
2292                    stack.push(filter);
2293                }
2294            }
2295            Expression::Percentile(f)
2296            | Expression::PercentileCont(f)
2297            | Expression::PercentileDisc(f) => {
2298                stack.push(&f.this);
2299                stack.push(&f.percentile);
2300                if let Some(ref filter) = f.filter {
2301                    stack.push(filter);
2302                }
2303            }
2304            Expression::WithinGroup(f) => {
2305                stack.push(&f.this);
2306            }
2307            Expression::Left(f) | Expression::Right(f) => {
2308                stack.push(&f.this);
2309                stack.push(&f.length);
2310            }
2311            Expression::Repeat(f) => {
2312                stack.push(&f.this);
2313                stack.push(&f.times);
2314            }
2315            Expression::Lpad(f) | Expression::Rpad(f) => {
2316                stack.push(&f.this);
2317                stack.push(&f.length);
2318                if let Some(ref fill) = f.fill {
2319                    stack.push(fill);
2320                }
2321            }
2322            Expression::Split(f) => {
2323                stack.push(&f.this);
2324                stack.push(&f.delimiter);
2325            }
2326            Expression::RegexpLike(f) => {
2327                stack.push(&f.this);
2328                stack.push(&f.pattern);
2329                if let Some(ref flags) = f.flags {
2330                    stack.push(flags);
2331                }
2332            }
2333            Expression::RegexpReplace(f) => {
2334                stack.push(&f.this);
2335                stack.push(&f.pattern);
2336                stack.push(&f.replacement);
2337                if let Some(ref flags) = f.flags {
2338                    stack.push(flags);
2339                }
2340            }
2341            Expression::RegexpExtract(f) => {
2342                stack.push(&f.this);
2343                stack.push(&f.pattern);
2344                if let Some(ref group) = f.group {
2345                    stack.push(group);
2346                }
2347            }
2348            Expression::ToDate(f) => {
2349                stack.push(&f.this);
2350                if let Some(ref fmt) = f.format {
2351                    stack.push(fmt);
2352                }
2353            }
2354            Expression::ToTimestamp(f) => {
2355                stack.push(&f.this);
2356                if let Some(ref fmt) = f.format {
2357                    stack.push(fmt);
2358                }
2359            }
2360            Expression::DateFormat(f) | Expression::FormatDate(f) => {
2361                stack.push(&f.this);
2362                stack.push(&f.format);
2363            }
2364            Expression::LastDay(f) => {
2365                stack.push(&f.this);
2366            }
2367            Expression::FromUnixtime(f) => {
2368                stack.push(&f.this);
2369                if let Some(ref fmt) = f.format {
2370                    stack.push(fmt);
2371                }
2372            }
2373            Expression::UnixTimestamp(f) => {
2374                if let Some(ref this) = f.this {
2375                    stack.push(this);
2376                }
2377                if let Some(ref fmt) = f.format {
2378                    stack.push(fmt);
2379                }
2380            }
2381            Expression::MakeDate(f) => {
2382                stack.push(&f.year);
2383                stack.push(&f.month);
2384                stack.push(&f.day);
2385            }
2386            Expression::MakeTimestamp(f) => {
2387                stack.push(&f.year);
2388                stack.push(&f.month);
2389                stack.push(&f.day);
2390                stack.push(&f.hour);
2391                stack.push(&f.minute);
2392                stack.push(&f.second);
2393                if let Some(ref tz) = f.timezone {
2394                    stack.push(tz);
2395                }
2396            }
2397            Expression::TruncFunc(f) => {
2398                stack.push(&f.this);
2399                if let Some(ref d) = f.decimals {
2400                    stack.push(d);
2401                }
2402            }
2403            Expression::ArrayFunc(f) => {
2404                for e in &f.expressions {
2405                    stack.push(e);
2406                }
2407            }
2408            Expression::Unnest(f) => {
2409                stack.push(&f.this);
2410                for e in &f.expressions {
2411                    stack.push(e);
2412                }
2413            }
2414            Expression::StructFunc(f) => {
2415                for (_, e) in &f.fields {
2416                    stack.push(e);
2417                }
2418            }
2419            Expression::StructExtract(f) => {
2420                stack.push(&f.this);
2421            }
2422            Expression::NamedStruct(f) => {
2423                for (k, v) in &f.pairs {
2424                    stack.push(k);
2425                    stack.push(v);
2426                }
2427            }
2428            Expression::MapFunc(f) => {
2429                for k in &f.keys {
2430                    stack.push(k);
2431                }
2432                for v in &f.values {
2433                    stack.push(v);
2434                }
2435            }
2436            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2437                stack.push(&f.this);
2438                stack.push(&f.transform);
2439            }
2440            Expression::JsonArrayAgg(f) => {
2441                stack.push(&f.this);
2442                if let Some(ref filter) = f.filter {
2443                    stack.push(filter);
2444                }
2445            }
2446            Expression::JsonObjectAgg(f) => {
2447                stack.push(&f.key);
2448                stack.push(&f.value);
2449                if let Some(ref filter) = f.filter {
2450                    stack.push(filter);
2451                }
2452            }
2453            Expression::NTile(f) => {
2454                if let Some(ref n) = f.num_buckets {
2455                    stack.push(n);
2456                }
2457            }
2458            Expression::Rand(f) => {
2459                if let Some(ref s) = f.seed {
2460                    stack.push(s);
2461                }
2462                if let Some(ref lo) = f.lower {
2463                    stack.push(lo);
2464                }
2465                if let Some(ref hi) = f.upper {
2466                    stack.push(hi);
2467                }
2468            }
2469            Expression::Any(q) | Expression::All(q) => {
2470                stack.push(&q.this);
2471                stack.push(&q.subquery);
2472            }
2473            Expression::Overlaps(o) => {
2474                if let Some(ref this) = o.this {
2475                    stack.push(this);
2476                }
2477                if let Some(ref expr) = o.expression {
2478                    stack.push(expr);
2479                }
2480                if let Some(ref ls) = o.left_start {
2481                    stack.push(ls);
2482                }
2483                if let Some(ref le) = o.left_end {
2484                    stack.push(le);
2485                }
2486                if let Some(ref rs) = o.right_start {
2487                    stack.push(rs);
2488                }
2489                if let Some(ref re) = o.right_end {
2490                    stack.push(re);
2491                }
2492            }
2493            Expression::Interval(i) => {
2494                if let Some(ref this) = i.this {
2495                    stack.push(this);
2496                }
2497            }
2498            Expression::TimeStrToTime(f) => {
2499                stack.push(&f.this);
2500                if let Some(ref zone) = f.zone {
2501                    stack.push(zone);
2502                }
2503            }
2504            Expression::JSONBExtractScalar(f) => {
2505                stack.push(&f.this);
2506                stack.push(&f.expression);
2507                if let Some(ref jt) = f.json_type {
2508                    stack.push(jt);
2509                }
2510            }
2511            Expression::JSONExtract(f) => {
2512                stack.push(&f.this);
2513                stack.push(&f.expression);
2514                for e in &f.expressions {
2515                    stack.push(e);
2516                }
2517                if let Some(ref option) = f.option {
2518                    stack.push(option);
2519                }
2520                if let Some(ref on_condition) = f.on_condition {
2521                    stack.push(on_condition);
2522                }
2523            }
2524
2525            // === True leaves and non-expression-bearing nodes ===
2526            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
2527            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
2528            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
2529            _ => {}
2530        }
2531    }
2532}
2533
2534// ---------------------------------------------------------------------------
2535// Tests
2536// ---------------------------------------------------------------------------
2537
2538#[cfg(test)]
2539mod tests {
2540    use super::*;
2541    use crate::dialects::{Dialect, DialectType};
2542    use crate::expressions::DataType;
2543    use crate::optimizer::annotate_types::annotate_types;
2544    use crate::parse_one;
2545    use crate::schema::{MappingSchema, Schema};
2546
2547    fn parse(sql: &str) -> Expression {
2548        let dialect = Dialect::get(DialectType::Generic);
2549        let ast = dialect.parse(sql).unwrap();
2550        ast.into_iter().next().unwrap()
2551    }
2552
2553    #[test]
2554    fn test_simple_lineage() {
2555        let expr = parse("SELECT a FROM t");
2556        let node = lineage("a", &expr, None, false).unwrap();
2557
2558        assert_eq!(node.name, "a");
2559        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2560        // Should trace to t.a
2561        let names = node.downstream_names();
2562        assert!(
2563            names.iter().any(|n| n == "t.a"),
2564            "Expected t.a in downstream, got: {:?}",
2565            names
2566        );
2567    }
2568
2569    #[test]
2570    fn test_lineage_walk() {
2571        let root = LineageNode {
2572            name: "col_a".to_string(),
2573            expression: Expression::Null(crate::expressions::Null),
2574            source: Expression::Null(crate::expressions::Null),
2575            downstream: vec![LineageNode::new(
2576                "t.a",
2577                Expression::Null(crate::expressions::Null),
2578                Expression::Null(crate::expressions::Null),
2579            )],
2580            source_name: String::new(),
2581            source_kind: SourceKind::Unknown,
2582            source_alias: None,
2583            reference_node_name: String::new(),
2584        };
2585
2586        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2587        assert_eq!(names.len(), 2);
2588        assert_eq!(names[0], "col_a");
2589        assert_eq!(names[1], "t.a");
2590    }
2591
2592    #[test]
2593    fn test_aliased_column() {
2594        let expr = parse("SELECT a + 1 AS b FROM t");
2595        let node = lineage("b", &expr, None, false).unwrap();
2596
2597        assert_eq!(node.name, "b");
2598        // Should trace through the expression to t.a
2599        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2600        assert!(
2601            all_names.iter().any(|n| n.contains("a")),
2602            "Expected to trace to column a, got: {:?}",
2603            all_names
2604        );
2605    }
2606
2607    #[test]
2608    fn test_qualified_column() {
2609        let expr = parse("SELECT t.a FROM t");
2610        let node = lineage("a", &expr, None, false).unwrap();
2611
2612        assert_eq!(node.name, "a");
2613        let names = node.downstream_names();
2614        assert!(
2615            names.iter().any(|n| n == "t.a"),
2616            "Expected t.a, got: {:?}",
2617            names
2618        );
2619    }
2620
2621    #[test]
2622    fn test_unqualified_column() {
2623        let expr = parse("SELECT a FROM t");
2624        let node = lineage("a", &expr, None, false).unwrap();
2625
2626        // Unqualified but single source → resolved to t.a
2627        let names = node.downstream_names();
2628        assert!(
2629            names.iter().any(|n| n == "t.a"),
2630            "Expected t.a, got: {:?}",
2631            names
2632        );
2633    }
2634
2635    #[test]
2636    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2637        let query = "SELECT name FROM users";
2638        let dialect = Dialect::get(DialectType::BigQuery);
2639        let expr = dialect
2640            .parse(query)
2641            .unwrap()
2642            .into_iter()
2643            .next()
2644            .expect("expected one expression");
2645
2646        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2647        schema
2648            .add_table("users", &[("name".into(), DataType::Text)], None)
2649            .expect("schema setup");
2650
2651        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2652            .expect("lineage without schema");
2653        let mut expr_without = node_without_schema.expression.clone();
2654        annotate_types(
2655            &mut expr_without,
2656            Some(&schema),
2657            Some(DialectType::BigQuery),
2658        );
2659        assert_eq!(
2660            expr_without.inferred_type(),
2661            None,
2662            "Expected unresolved root type without schema-aware lineage qualification"
2663        );
2664
2665        let node_with_schema = lineage_with_schema(
2666            "name",
2667            &expr,
2668            Some(&schema),
2669            Some(DialectType::BigQuery),
2670            false,
2671        )
2672        .expect("lineage with schema");
2673        let mut expr_with = node_with_schema.expression.clone();
2674        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2675
2676        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2677    }
2678
2679    #[test]
2680    fn test_lineage_with_schema_correlated_scalar_subquery() {
2681        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2682        let dialect = Dialect::get(DialectType::BigQuery);
2683        let expr = dialect
2684            .parse(query)
2685            .unwrap()
2686            .into_iter()
2687            .next()
2688            .expect("expected one expression");
2689
2690        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2691        schema
2692            .add_table(
2693                "t1",
2694                &[("id".into(), DataType::BigInt { length: None })],
2695                None,
2696            )
2697            .expect("schema setup");
2698        schema
2699            .add_table(
2700                "t2",
2701                &[
2702                    ("id".into(), DataType::BigInt { length: None }),
2703                    ("val".into(), DataType::BigInt { length: None }),
2704                ],
2705                None,
2706            )
2707            .expect("schema setup");
2708
2709        let node = lineage_with_schema(
2710            "id",
2711            &expr,
2712            Some(&schema),
2713            Some(DialectType::BigQuery),
2714            false,
2715        )
2716        .expect("lineage_with_schema should handle correlated scalar subqueries");
2717
2718        assert_eq!(node.name, "id");
2719    }
2720
2721    #[test]
2722    fn test_lineage_with_schema_join_using() {
2723        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2724        let dialect = Dialect::get(DialectType::BigQuery);
2725        let expr = dialect
2726            .parse(query)
2727            .unwrap()
2728            .into_iter()
2729            .next()
2730            .expect("expected one expression");
2731
2732        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2733        schema
2734            .add_table(
2735                "t1",
2736                &[("a".into(), DataType::BigInt { length: None })],
2737                None,
2738            )
2739            .expect("schema setup");
2740        schema
2741            .add_table(
2742                "t2",
2743                &[("a".into(), DataType::BigInt { length: None })],
2744                None,
2745            )
2746            .expect("schema setup");
2747
2748        let node = lineage_with_schema(
2749            "a",
2750            &expr,
2751            Some(&schema),
2752            Some(DialectType::BigQuery),
2753            false,
2754        )
2755        .expect("lineage_with_schema should handle JOIN USING");
2756
2757        assert_eq!(node.name, "a");
2758    }
2759
2760    #[test]
2761    fn test_lineage_with_schema_qualified_table_name() {
2762        let query = "SELECT a FROM raw.t1";
2763        let dialect = Dialect::get(DialectType::BigQuery);
2764        let expr = dialect
2765            .parse(query)
2766            .unwrap()
2767            .into_iter()
2768            .next()
2769            .expect("expected one expression");
2770
2771        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2772        schema
2773            .add_table(
2774                "raw.t1",
2775                &[("a".into(), DataType::BigInt { length: None })],
2776                None,
2777            )
2778            .expect("schema setup");
2779
2780        let node = lineage_with_schema(
2781            "a",
2782            &expr,
2783            Some(&schema),
2784            Some(DialectType::BigQuery),
2785            false,
2786        )
2787        .expect("lineage_with_schema should handle dotted schema.table names");
2788
2789        assert_eq!(node.name, "a");
2790    }
2791
2792    #[test]
2793    fn test_lineage_with_schema_none_matches_lineage() {
2794        let expr = parse("SELECT a FROM t");
2795        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2796        let with_none =
2797            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2798
2799        assert_eq!(with_none.name, baseline.name);
2800        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2801    }
2802
2803    #[test]
2804    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2805        let dialect = Dialect::get(DialectType::BigQuery);
2806        let expr = dialect
2807            .parse("SELECT Name AS name FROM teams")
2808            .unwrap()
2809            .into_iter()
2810            .next()
2811            .expect("expected one expression");
2812
2813        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2814        schema
2815            .add_table(
2816                "teams",
2817                &[("Name".into(), DataType::String { length: None })],
2818                None,
2819            )
2820            .expect("schema setup");
2821
2822        let node = lineage_with_schema(
2823            "name",
2824            &expr,
2825            Some(&schema),
2826            Some(DialectType::BigQuery),
2827            false,
2828        )
2829        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2830
2831        let names = node.downstream_names();
2832        assert!(
2833            names.iter().any(|n| n == "teams.Name"),
2834            "Expected teams.Name in downstream, got: {:?}",
2835            names
2836        );
2837    }
2838
2839    #[test]
2840    fn test_lineage_bigquery_mixed_case_alias_lookup() {
2841        let dialect = Dialect::get(DialectType::BigQuery);
2842        let expr = dialect
2843            .parse("SELECT Name AS Name FROM teams")
2844            .unwrap()
2845            .into_iter()
2846            .next()
2847            .expect("expected one expression");
2848
2849        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2850            .expect("lineage should resolve mixed-case aliases in BigQuery");
2851
2852        assert_eq!(node.name, "name");
2853    }
2854
2855    #[test]
2856    fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2857        let expr = parse_one(
2858            r#"
2859SELECT date_val AS week_start
2860FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2861"#,
2862            DialectType::BigQuery,
2863        )
2864        .expect("parse");
2865
2866        let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2867            .expect("lineage should resolve UNNEST alias as a source");
2868        let child = node
2869            .downstream
2870            .first()
2871            .expect("week_start should have downstream lineage");
2872
2873        assert_eq!(child.name, "_0.date_val");
2874        assert_eq!(child.source_name, "_0");
2875        assert_eq!(child.source_kind, SourceKind::Virtual);
2876        assert_eq!(child.source_alias.as_deref(), Some("date_val"));
2877
2878        let Expression::Column(column) = &child.expression else {
2879            panic!(
2880                "expected downstream column expression, got {:?}",
2881                child.expression
2882            );
2883        };
2884        assert_eq!(column.name.name, "date_val");
2885        assert_eq!(
2886            column.table.as_ref().map(|table| table.name.as_str()),
2887            Some("_0")
2888        );
2889        assert!(
2890            matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
2891            "expected UNNEST source expression, got {:?}",
2892            child.source
2893        );
2894    }
2895
2896    #[test]
2897    fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
2898        let expr =
2899            parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
2900
2901        let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2902        let child = node.downstream.first().expect("id should have lineage");
2903
2904        assert_eq!(child.name, "date_val.id");
2905        assert_eq!(child.source_name, "date_val");
2906        assert_eq!(child.source_kind, SourceKind::Table);
2907        assert_eq!(child.source_alias, None);
2908    }
2909
2910    #[test]
2911    fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
2912        let expr = parse_one(
2913            r#"
2914SELECT a.a AS first_value, b.b AS second_value
2915FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
2916JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
2917"#,
2918            DialectType::BigQuery,
2919        )
2920        .expect("parse");
2921
2922        let first =
2923            lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2924        let second =
2925            lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2926
2927        let first_child = first.downstream.first().expect("first source");
2928        let second_child = second.downstream.first().expect("second source");
2929
2930        assert_eq!(first_child.name, "_0.a");
2931        assert_eq!(first_child.source_name, "_0");
2932        assert_eq!(first_child.source_alias.as_deref(), Some("a"));
2933        assert_eq!(first_child.source_kind, SourceKind::Virtual);
2934
2935        assert_eq!(second_child.name, "_1.b");
2936        assert_eq!(second_child.source_name, "_1");
2937        assert_eq!(second_child.source_alias.as_deref(), Some("b"));
2938        assert_eq!(second_child.source_kind, SourceKind::Virtual);
2939    }
2940
2941    #[test]
2942    fn test_lineage_table_backed_unnest_points_to_real_source_column() {
2943        let expr = parse_one(
2944            r#"
2945SELECT item.item AS item
2946FROM t JOIN UNNEST(t.items) AS item ON TRUE
2947"#,
2948            DialectType::BigQuery,
2949        )
2950        .expect("parse");
2951
2952        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2953        let virtual_child = node.downstream.first().expect("virtual item source");
2954        assert_eq!(virtual_child.name, "_0.item");
2955        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2956
2957        let real_child = virtual_child
2958            .downstream
2959            .first()
2960            .expect("UNNEST(t.items) should depend on t.items");
2961        assert_eq!(real_child.name, "t.items");
2962        assert_eq!(real_child.source_name, "t");
2963        assert_eq!(real_child.source_kind, SourceKind::Table);
2964    }
2965
2966    #[test]
2967    fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
2968        let expr = parse_one(
2969            r#"
2970SELECT item AS item
2971FROM t JOIN UNNEST(t.items) AS item ON TRUE
2972"#,
2973            DialectType::BigQuery,
2974        )
2975        .expect("parse");
2976
2977        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2978        let virtual_child = node.downstream.first().expect("virtual item source");
2979        assert_eq!(virtual_child.name, "_0.item");
2980        assert_eq!(virtual_child.source_name, "_0");
2981        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2982        assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
2983
2984        let real_child = virtual_child
2985            .downstream
2986            .first()
2987            .expect("UNNEST(t.items) should depend on t.items");
2988        assert_eq!(real_child.name, "t.items");
2989        assert_eq!(real_child.source_name, "t");
2990        assert_eq!(real_child.source_kind, SourceKind::Table);
2991    }
2992
2993    #[test]
2994    fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
2995        let cases = [
2996            (
2997                DialectType::PostgreSQL,
2998                "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
2999            ),
3000            (
3001                DialectType::Presto,
3002                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3003            ),
3004            (
3005                DialectType::Trino,
3006                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3007            ),
3008        ];
3009
3010        for (dialect, sql) in cases {
3011            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3012            let node = lineage("out", &expr, Some(dialect), false)
3013                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3014            let virtual_child = node
3015                .downstream
3016                .first()
3017                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3018
3019            assert_eq!(
3020                virtual_child.name, "_0.x",
3021                "unexpected virtual child for {dialect:?}"
3022            );
3023            assert_eq!(virtual_child.source_name, "_0");
3024            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3025            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3026
3027            let real_child = virtual_child
3028                .downstream
3029                .first()
3030                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3031            assert_eq!(real_child.name, "t.items");
3032            assert_eq!(real_child.source_kind, SourceKind::Table);
3033        }
3034    }
3035
3036    #[test]
3037    fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
3038        let cases = [
3039            (
3040                DialectType::Spark,
3041                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3042            ),
3043            (
3044                DialectType::Hive,
3045                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3046            ),
3047        ];
3048
3049        for (dialect, sql) in cases {
3050            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3051            let node = lineage("out", &expr, Some(dialect), false)
3052                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3053            let virtual_child = node
3054                .downstream
3055                .first()
3056                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3057
3058            assert_eq!(virtual_child.name, "_0.x");
3059            assert_eq!(virtual_child.source_name, "_0");
3060            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3061            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3062
3063            let real_child = virtual_child
3064                .downstream
3065                .first()
3066                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3067            assert_eq!(real_child.name, "t.items");
3068            assert_eq!(real_child.source_kind, SourceKind::Table);
3069        }
3070    }
3071
3072    #[test]
3073    fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
3074        let expr = parse_one(
3075            "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
3076            DialectType::Snowflake,
3077        )
3078        .expect("parse");
3079
3080        let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
3081        let virtual_child = node.downstream.first().expect("virtual flatten source");
3082        assert_eq!(virtual_child.name, "_0.value");
3083        assert_eq!(virtual_child.source_name, "_0");
3084        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3085        assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
3086
3087        let real_child = virtual_child
3088            .downstream
3089            .first()
3090            .expect("FLATTEN input should depend on raw_events.payload");
3091        assert_eq!(real_child.name, "raw_events.payload");
3092        assert_eq!(real_child.source_kind, SourceKind::Table);
3093    }
3094
3095    #[test]
3096    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
3097        let expr = parse_one(
3098            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3099            DialectType::Snowflake,
3100        )
3101        .expect("parse");
3102
3103        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3104        schema
3105            .add_table(
3106                "fact.some_daily_metrics",
3107                &[("date_utc".to_string(), DataType::Date)],
3108                None,
3109            )
3110            .expect("schema setup");
3111
3112        let node = lineage_with_schema(
3113            "recency",
3114            &expr,
3115            Some(&schema),
3116            Some(DialectType::Snowflake),
3117            false,
3118        )
3119        .expect("lineage_with_schema should not treat date part as a column");
3120
3121        let names = node.downstream_names();
3122        assert!(
3123            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3124            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3125            names
3126        );
3127        assert!(
3128            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3129            "Did not expect date part to appear as lineage column, got: {:?}",
3130            names
3131        );
3132    }
3133
3134    #[test]
3135    fn test_snowflake_datediff_parses_to_typed_ast() {
3136        let expr = parse_one(
3137            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3138            DialectType::Snowflake,
3139        )
3140        .expect("parse");
3141
3142        match expr {
3143            Expression::Select(select) => match &select.expressions[0] {
3144                Expression::Alias(alias) => match &alias.this {
3145                    Expression::DateDiff(f) => {
3146                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
3147                    }
3148                    other => panic!("expected DateDiff, got {other:?}"),
3149                },
3150                other => panic!("expected Alias, got {other:?}"),
3151            },
3152            other => panic!("expected Select, got {other:?}"),
3153        }
3154    }
3155
3156    #[test]
3157    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
3158        let expr = parse_one(
3159            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3160            DialectType::Snowflake,
3161        )
3162        .expect("parse");
3163
3164        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3165        schema
3166            .add_table(
3167                "fact.some_daily_metrics",
3168                &[("date_utc".to_string(), DataType::Date)],
3169                None,
3170            )
3171            .expect("schema setup");
3172
3173        let node = lineage_with_schema(
3174            "next_day",
3175            &expr,
3176            Some(&schema),
3177            Some(DialectType::Snowflake),
3178            false,
3179        )
3180        .expect("lineage_with_schema should not treat DATEADD date part as a column");
3181
3182        let names = node.downstream_names();
3183        assert!(
3184            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3185            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3186            names
3187        );
3188        assert!(
3189            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3190            "Did not expect date part to appear as lineage column, got: {:?}",
3191            names
3192        );
3193    }
3194
3195    #[test]
3196    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
3197        let expr = parse_one(
3198            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3199            DialectType::Snowflake,
3200        )
3201        .expect("parse");
3202
3203        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3204        schema
3205            .add_table(
3206                "fact.some_daily_metrics",
3207                &[("date_utc".to_string(), DataType::Date)],
3208                None,
3209            )
3210            .expect("schema setup");
3211
3212        let node = lineage_with_schema(
3213            "day_part",
3214            &expr,
3215            Some(&schema),
3216            Some(DialectType::Snowflake),
3217            false,
3218        )
3219        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
3220
3221        let names = node.downstream_names();
3222        assert!(
3223            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3224            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3225            names
3226        );
3227        assert!(
3228            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3229            "Did not expect date part to appear as lineage column, got: {:?}",
3230            names
3231        );
3232    }
3233
3234    #[test]
3235    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
3236        let expr = parse_one(
3237            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3238            DialectType::Snowflake,
3239        )
3240        .expect("parse");
3241
3242        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3243        schema
3244            .add_table(
3245                "fact.some_daily_metrics",
3246                &[("date_utc".to_string(), DataType::Date)],
3247                None,
3248            )
3249            .expect("schema setup");
3250
3251        let node = lineage_with_schema(
3252            "day_part",
3253            &expr,
3254            Some(&schema),
3255            Some(DialectType::Snowflake),
3256            false,
3257        )
3258        .expect("quoted DATE_PART should continue to work");
3259
3260        let names = node.downstream_names();
3261        assert!(
3262            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3263            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3264            names
3265        );
3266    }
3267
3268    #[test]
3269    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
3270        let expr = parse_one(
3271            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3272            DialectType::Snowflake,
3273        )
3274        .expect("parse");
3275
3276        match expr {
3277            Expression::Select(select) => match &select.expressions[0] {
3278                Expression::Alias(alias) => match &alias.this {
3279                    Expression::Function(f) => {
3280                        assert_eq!(f.name.to_uppercase(), "DATEADD");
3281                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3282                    }
3283                    other => panic!("expected generic DATEADD function, got {other:?}"),
3284                },
3285                other => panic!("expected Alias, got {other:?}"),
3286            },
3287            other => panic!("expected Select, got {other:?}"),
3288        }
3289    }
3290
3291    #[test]
3292    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
3293        let expr = parse_one(
3294            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3295            DialectType::Snowflake,
3296        )
3297        .expect("parse");
3298
3299        match expr {
3300            Expression::Select(select) => match &select.expressions[0] {
3301                Expression::Alias(alias) => match &alias.this {
3302                    Expression::Function(f) => {
3303                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
3304                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3305                    }
3306                    other => panic!("expected generic DATE_PART function, got {other:?}"),
3307                },
3308                other => panic!("expected Alias, got {other:?}"),
3309            },
3310            other => panic!("expected Select, got {other:?}"),
3311        }
3312    }
3313
3314    #[test]
3315    fn test_snowflake_date_part_string_literal_stays_generic_function() {
3316        let expr = parse_one(
3317            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3318            DialectType::Snowflake,
3319        )
3320        .expect("parse");
3321
3322        match expr {
3323            Expression::Select(select) => match &select.expressions[0] {
3324                Expression::Alias(alias) => match &alias.this {
3325                    Expression::Function(f) => {
3326                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
3327                    }
3328                    other => panic!("expected generic DATE_PART function, got {other:?}"),
3329                },
3330                other => panic!("expected Alias, got {other:?}"),
3331            },
3332            other => panic!("expected Select, got {other:?}"),
3333        }
3334    }
3335
3336    #[test]
3337    fn test_lineage_join() {
3338        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3339
3340        let node_a = lineage("a", &expr, None, false).unwrap();
3341        let names_a = node_a.downstream_names();
3342        assert!(
3343            names_a.iter().any(|n| n == "t.a"),
3344            "Expected t.a, got: {:?}",
3345            names_a
3346        );
3347
3348        let node_b = lineage("b", &expr, None, false).unwrap();
3349        let names_b = node_b.downstream_names();
3350        assert!(
3351            names_b.iter().any(|n| n == "s.b"),
3352            "Expected s.b, got: {:?}",
3353            names_b
3354        );
3355    }
3356
3357    #[test]
3358    fn test_lineage_alias_leaf_has_resolved_source_name() {
3359        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
3360        let node = lineage("col1", &expr, None, false).unwrap();
3361
3362        // Keep alias in the display lineage edge.
3363        let names = node.downstream_names();
3364        assert!(
3365            names.iter().any(|n| n == "t1.col1"),
3366            "Expected aliased column edge t1.col1, got: {:?}",
3367            names
3368        );
3369
3370        // Leaf should expose the resolved base table for consumers.
3371        let leaf = node
3372            .downstream
3373            .iter()
3374            .find(|n| n.name == "t1.col1")
3375            .expect("Expected t1.col1 leaf");
3376        assert_eq!(leaf.source_name, "table1");
3377        match &leaf.source {
3378            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
3379            _ => panic!("Expected leaf source to be a table expression"),
3380        }
3381    }
3382
3383    #[test]
3384    fn test_lineage_derived_table() {
3385        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
3386        let node = lineage("a", &expr, None, false).unwrap();
3387
3388        assert_eq!(node.name, "a");
3389        // Should trace through the derived table to t.a
3390        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3391        assert!(
3392            all_names.iter().any(|n| n == "t.a"),
3393            "Expected to trace through derived table to t.a, got: {:?}",
3394            all_names
3395        );
3396    }
3397
3398    #[test]
3399    fn test_lineage_cte() {
3400        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
3401        let node = lineage("a", &expr, None, false).unwrap();
3402
3403        assert_eq!(node.name, "a");
3404        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3405        assert!(
3406            all_names.iter().any(|n| n == "t.a"),
3407            "Expected to trace through CTE to t.a, got: {:?}",
3408            all_names
3409        );
3410    }
3411
3412    #[test]
3413    fn test_lineage_union() {
3414        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
3415        let node = lineage("a", &expr, None, false).unwrap();
3416
3417        assert_eq!(node.name, "a");
3418        // Should have 2 downstream branches
3419        assert_eq!(
3420            node.downstream.len(),
3421            2,
3422            "Expected 2 branches for UNION, got {}",
3423            node.downstream.len()
3424        );
3425    }
3426
3427    #[test]
3428    fn test_lineage_cte_union() {
3429        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
3430        let node = lineage("a", &expr, None, false).unwrap();
3431
3432        // Should trace through CTE into both UNION branches
3433        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3434        assert!(
3435            all_names.len() >= 3,
3436            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
3437            all_names
3438        );
3439    }
3440
3441    #[test]
3442    fn test_lineage_star() {
3443        let expr = parse("SELECT * FROM t");
3444        let node = lineage("*", &expr, None, false).unwrap();
3445
3446        assert_eq!(node.name, "*");
3447        // Should have downstream for table t
3448        assert!(
3449            !node.downstream.is_empty(),
3450            "Star should produce downstream nodes"
3451        );
3452    }
3453
3454    #[test]
3455    fn test_lineage_subquery_in_select() {
3456        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
3457        let node = lineage("x", &expr, None, false).unwrap();
3458
3459        assert_eq!(node.name, "x");
3460        // Should have traced into the scalar subquery
3461        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3462        assert!(
3463            all_names.len() >= 2,
3464            "Expected tracing into scalar subquery, got: {:?}",
3465            all_names
3466        );
3467    }
3468
3469    #[test]
3470    fn test_lineage_multiple_columns() {
3471        let expr = parse("SELECT a, b FROM t");
3472
3473        let node_a = lineage("a", &expr, None, false).unwrap();
3474        let node_b = lineage("b", &expr, None, false).unwrap();
3475
3476        assert_eq!(node_a.name, "a");
3477        assert_eq!(node_b.name, "b");
3478
3479        // Each should trace independently
3480        let names_a = node_a.downstream_names();
3481        let names_b = node_b.downstream_names();
3482        assert!(names_a.iter().any(|n| n == "t.a"));
3483        assert!(names_b.iter().any(|n| n == "t.b"));
3484    }
3485
3486    #[test]
3487    fn test_get_source_tables() {
3488        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3489        let node = lineage("a", &expr, None, false).unwrap();
3490
3491        let tables = get_source_tables(&node);
3492        assert!(
3493            tables.contains("t"),
3494            "Expected source table 't', got: {:?}",
3495            tables
3496        );
3497    }
3498
3499    #[test]
3500    fn test_lineage_column_not_found() {
3501        let expr = parse("SELECT a FROM t");
3502        let result = lineage("nonexistent", &expr, None, false);
3503        assert!(result.is_err());
3504    }
3505
3506    #[test]
3507    fn test_lineage_nested_cte() {
3508        let expr = parse(
3509            "WITH cte1 AS (SELECT a FROM t), \
3510             cte2 AS (SELECT a FROM cte1) \
3511             SELECT a FROM cte2",
3512        );
3513        let node = lineage("a", &expr, None, false).unwrap();
3514
3515        // Should trace through cte2 → cte1 → t
3516        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3517        assert!(
3518            all_names.len() >= 3,
3519            "Expected to trace through nested CTEs, got: {:?}",
3520            all_names
3521        );
3522    }
3523
3524    #[test]
3525    fn test_trim_selects_true() {
3526        let expr = parse("SELECT a, b, c FROM t");
3527        let node = lineage("a", &expr, None, true).unwrap();
3528
3529        // The source should be trimmed to only include 'a'
3530        if let Expression::Select(select) = &node.source {
3531            assert_eq!(
3532                select.expressions.len(),
3533                1,
3534                "Trimmed source should have 1 expression, got {}",
3535                select.expressions.len()
3536            );
3537        } else {
3538            panic!("Expected Select source");
3539        }
3540    }
3541
3542    #[test]
3543    fn test_trim_selects_false() {
3544        let expr = parse("SELECT a, b, c FROM t");
3545        let node = lineage("a", &expr, None, false).unwrap();
3546
3547        // The source should keep all columns
3548        if let Expression::Select(select) = &node.source {
3549            assert_eq!(
3550                select.expressions.len(),
3551                3,
3552                "Untrimmed source should have 3 expressions"
3553            );
3554        } else {
3555            panic!("Expected Select source");
3556        }
3557    }
3558
3559    #[test]
3560    fn test_lineage_expression_in_select() {
3561        let expr = parse("SELECT a + b AS c FROM t");
3562        let node = lineage("c", &expr, None, false).unwrap();
3563
3564        // Should trace to both a and b from t
3565        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3566        assert!(
3567            all_names.len() >= 3,
3568            "Expected to trace a + b to both columns, got: {:?}",
3569            all_names
3570        );
3571    }
3572
3573    #[test]
3574    fn test_set_operation_by_index() {
3575        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3576
3577        // Trace column "a" which is at index 0
3578        let node = lineage("a", &expr, None, false).unwrap();
3579
3580        // UNION branches should be traced by index
3581        assert_eq!(node.downstream.len(), 2);
3582    }
3583
3584    // --- Tests for column lineage inside function calls (issue #18) ---
3585
3586    fn print_node(node: &LineageNode, indent: usize) {
3587        let pad = "  ".repeat(indent);
3588        println!(
3589            "{pad}name={:?} source_name={:?}",
3590            node.name, node.source_name
3591        );
3592        for child in &node.downstream {
3593            print_node(child, indent + 1);
3594        }
3595    }
3596
3597    #[test]
3598    fn test_issue18_repro() {
3599        // Exact scenario from the issue
3600        let query = "SELECT UPPER(name) as upper_name FROM users";
3601        println!("Query: {query}\n");
3602
3603        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3604        let exprs = dialect.parse(query).unwrap();
3605        let expr = &exprs[0];
3606
3607        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3608        println!("lineage(\"upper_name\"):");
3609        print_node(&node, 1);
3610
3611        let names = node.downstream_names();
3612        assert!(
3613            names.iter().any(|n| n == "users.name"),
3614            "Expected users.name in downstream, got: {:?}",
3615            names
3616        );
3617    }
3618
3619    #[test]
3620    fn test_lineage_bigquery_safe_namespace_issue207() {
3621        let query = r#"
3622WITH import_cte AS (
3623  SELECT timestamp, data, operation
3624  FROM `project`.`dataset`.`source_table`
3625),
3626transform_cte AS (
3627  SELECT
3628    timestamp,
3629    SAFE.PARSE_JSON(data) AS json_data
3630  FROM import_cte
3631)
3632SELECT json_data FROM transform_cte
3633"#;
3634        let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3635        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3636            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3637        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3638
3639        assert!(
3640            names.iter().any(|name| name == "source_table.data"),
3641            "expected source_table.data in lineage, got {names:?}"
3642        );
3643        assert!(
3644            !names
3645                .iter()
3646                .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3647            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3648        );
3649    }
3650
3651    #[test]
3652    fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3653        let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3654        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3655            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3656        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3657
3658        assert!(
3659            names.iter().any(|name| name == "t.data"),
3660            "expected t.data in lineage, got {names:?}"
3661        );
3662        assert!(
3663            !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3664            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3665        );
3666    }
3667
3668    #[test]
3669    fn test_lineage_method_call_receiver_control() {
3670        let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3671        let node = lineage("out", &expr, None, false).expect("lineage");
3672        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3673
3674        assert!(
3675            names.iter().any(|name| name == "t.obj"),
3676            "expected ordinary method receiver to remain in lineage, got {names:?}"
3677        );
3678        assert!(
3679            names.iter().any(|name| name == "t.arg"),
3680            "expected method argument in lineage, got {names:?}"
3681        );
3682    }
3683
3684    #[test]
3685    fn test_lineage_upper_function() {
3686        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3687        let node = lineage("upper_name", &expr, None, false).unwrap();
3688
3689        let names = node.downstream_names();
3690        assert!(
3691            names.iter().any(|n| n == "users.name"),
3692            "Expected users.name in downstream, got: {:?}",
3693            names
3694        );
3695    }
3696
3697    #[test]
3698    fn test_lineage_round_function() {
3699        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3700        let node = lineage("rounded", &expr, None, false).unwrap();
3701
3702        let names = node.downstream_names();
3703        assert!(
3704            names.iter().any(|n| n == "products.price"),
3705            "Expected products.price in downstream, got: {:?}",
3706            names
3707        );
3708    }
3709
3710    #[test]
3711    fn test_lineage_coalesce_function() {
3712        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3713        let node = lineage("val", &expr, None, false).unwrap();
3714
3715        let names = node.downstream_names();
3716        assert!(
3717            names.iter().any(|n| n == "t.a"),
3718            "Expected t.a in downstream, got: {:?}",
3719            names
3720        );
3721        assert!(
3722            names.iter().any(|n| n == "t.b"),
3723            "Expected t.b in downstream, got: {:?}",
3724            names
3725        );
3726    }
3727
3728    #[test]
3729    fn test_lineage_count_function() {
3730        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3731        let node = lineage("cnt", &expr, None, false).unwrap();
3732
3733        let names = node.downstream_names();
3734        assert!(
3735            names.iter().any(|n| n == "t.id"),
3736            "Expected t.id in downstream, got: {:?}",
3737            names
3738        );
3739    }
3740
3741    #[test]
3742    fn test_lineage_sum_function() {
3743        let expr = parse("SELECT SUM(amount) AS total FROM t");
3744        let node = lineage("total", &expr, None, false).unwrap();
3745
3746        let names = node.downstream_names();
3747        assert!(
3748            names.iter().any(|n| n == "t.amount"),
3749            "Expected t.amount in downstream, got: {:?}",
3750            names
3751        );
3752    }
3753
3754    #[test]
3755    fn test_lineage_case_with_nested_functions() {
3756        let expr =
3757            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3758        let node = lineage("result", &expr, None, false).unwrap();
3759
3760        let names = node.downstream_names();
3761        assert!(
3762            names.iter().any(|n| n == "t.x"),
3763            "Expected t.x in downstream, got: {:?}",
3764            names
3765        );
3766        assert!(
3767            names.iter().any(|n| n == "t.name"),
3768            "Expected t.name in downstream, got: {:?}",
3769            names
3770        );
3771    }
3772
3773    #[test]
3774    fn test_lineage_substring_function() {
3775        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3776        let node = lineage("short", &expr, None, false).unwrap();
3777
3778        let names = node.downstream_names();
3779        assert!(
3780            names.iter().any(|n| n == "t.name"),
3781            "Expected t.name in downstream, got: {:?}",
3782            names
3783        );
3784    }
3785
3786    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
3787
3788    #[test]
3789    fn test_lineage_cte_select_star() {
3790        // Ported from sqlglot: test_lineage_source_with_star
3791        // WITH y AS (SELECT * FROM x) SELECT a FROM y
3792        // After star expansion: SELECT y.a AS a FROM y
3793        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3794        let node = lineage("a", &expr, None, false).unwrap();
3795
3796        assert_eq!(node.name, "a");
3797        // Should successfully resolve column 'a' through the CTE
3798        // (previously failed with "Cannot find column 'a' in query")
3799        assert!(
3800            !node.downstream.is_empty(),
3801            "Expected downstream nodes tracing through CTE, got none"
3802        );
3803    }
3804
3805    #[test]
3806    fn test_lineage_cte_select_star_renamed_column() {
3807        // dbt standard pattern: CTE with column rename + outer SELECT *
3808        // This is the primary use case for dbt projects (jaffle-shop etc.)
3809        let expr =
3810            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3811        let node = lineage("customer_id", &expr, None, false).unwrap();
3812
3813        assert_eq!(node.name, "customer_id");
3814        // Should trace customer_id → renamed CTE → source.id
3815        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3816        assert!(
3817            all_names.len() >= 2,
3818            "Expected at least 2 nodes (customer_id → source), got: {:?}",
3819            all_names
3820        );
3821    }
3822
3823    #[test]
3824    fn test_lineage_cte_select_star_multiple_columns() {
3825        // CTE exposes multiple columns, outer SELECT * should resolve each
3826        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3827
3828        for col in &["a", "b", "c"] {
3829            let node = lineage(col, &expr, None, false).unwrap();
3830            assert_eq!(node.name, *col);
3831            // Verify lineage resolves without error (star expanded to explicit columns)
3832            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3833            assert!(
3834                all_names.len() >= 2,
3835                "Expected at least 2 nodes for column {}, got: {:?}",
3836                col,
3837                all_names
3838            );
3839        }
3840    }
3841
3842    #[test]
3843    fn test_lineage_nested_cte_select_star() {
3844        // Nested CTE star expansion: cte2 references cte1 via SELECT *
3845        let expr = parse(
3846            "WITH cte1 AS (SELECT a FROM t), \
3847             cte2 AS (SELECT * FROM cte1) \
3848             SELECT * FROM cte2",
3849        );
3850        let node = lineage("a", &expr, None, false).unwrap();
3851
3852        assert_eq!(node.name, "a");
3853        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3854        assert!(
3855            all_names.len() >= 3,
3856            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3857            all_names
3858        );
3859    }
3860
3861    #[test]
3862    fn test_lineage_three_level_nested_cte_star() {
3863        // Three-level nested CTE: cte3 → cte2 → cte1 → t
3864        let expr = parse(
3865            "WITH cte1 AS (SELECT x FROM t), \
3866             cte2 AS (SELECT * FROM cte1), \
3867             cte3 AS (SELECT * FROM cte2) \
3868             SELECT * FROM cte3",
3869        );
3870        let node = lineage("x", &expr, None, false).unwrap();
3871
3872        assert_eq!(node.name, "x");
3873        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3874        assert!(
3875            all_names.len() >= 4,
3876            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3877            all_names
3878        );
3879    }
3880
3881    #[test]
3882    fn test_lineage_cte_union_star() {
3883        // CTE with UNION body, outer SELECT * should resolve from left branch
3884        let expr = parse(
3885            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3886             SELECT * FROM cte",
3887        );
3888        let node = lineage("a", &expr, None, false).unwrap();
3889
3890        assert_eq!(node.name, "a");
3891        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3892        assert!(
3893            all_names.len() >= 2,
3894            "Expected at least 2 nodes for CTE union star, got: {:?}",
3895            all_names
3896        );
3897    }
3898
3899    #[test]
3900    fn test_lineage_cte_star_unknown_table() {
3901        // When CTE references an unknown table, star expansion is skipped gracefully
3902        // and lineage falls back to normal resolution (which may fail)
3903        let expr = parse(
3904            "WITH cte AS (SELECT * FROM unknown_table) \
3905             SELECT * FROM cte",
3906        );
3907        // This should not panic — it may succeed or fail depending on resolution,
3908        // but should not crash
3909        let _result = lineage("x", &expr, None, false);
3910    }
3911
3912    #[test]
3913    fn test_lineage_cte_explicit_columns() {
3914        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
3915        let expr = parse(
3916            "WITH cte(x, y) AS (SELECT a, b FROM t) \
3917             SELECT * FROM cte",
3918        );
3919        let node = lineage("x", &expr, None, false).unwrap();
3920        assert_eq!(node.name, "x");
3921    }
3922
3923    #[test]
3924    fn test_lineage_cte_qualified_star() {
3925        // Qualified star: SELECT cte.* FROM cte
3926        let expr = parse(
3927            "WITH cte AS (SELECT a, b FROM t) \
3928             SELECT cte.* FROM cte",
3929        );
3930        for col in &["a", "b"] {
3931            let node = lineage(col, &expr, None, false).unwrap();
3932            assert_eq!(node.name, *col);
3933            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3934            assert!(
3935                all_names.len() >= 2,
3936                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3937                col,
3938                all_names
3939            );
3940        }
3941    }
3942
3943    #[test]
3944    fn test_lineage_subquery_select_star() {
3945        // Ported from sqlglot: test_select_star
3946        // SELECT x FROM (SELECT * FROM table_a)
3947        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3948        let node = lineage("x", &expr, None, false).unwrap();
3949
3950        assert_eq!(node.name, "x");
3951        assert!(
3952            !node.downstream.is_empty(),
3953            "Expected downstream nodes for subquery with SELECT *, got none"
3954        );
3955    }
3956
3957    #[test]
3958    fn test_lineage_cte_star_with_schema_external_table() {
3959        // CTE references an external table via SELECT * — schema enables expansion
3960        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3961SELECT * FROM orders"#;
3962        let expr = parse(sql);
3963
3964        let mut schema = MappingSchema::new();
3965        let cols = vec![
3966            ("order_id".to_string(), DataType::Unknown),
3967            ("customer_id".to_string(), DataType::Unknown),
3968            ("amount".to_string(), DataType::Unknown),
3969        ];
3970        schema.add_table("stg_orders", &cols, None).unwrap();
3971
3972        let node =
3973            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3974                .unwrap();
3975        assert_eq!(node.name, "order_id");
3976    }
3977
3978    #[test]
3979    fn test_lineage_cte_star_with_schema_three_part_name() {
3980        // CTE references an external table with fully-qualified 3-part name
3981        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3982SELECT * FROM orders"#;
3983        let expr = parse(sql);
3984
3985        let mut schema = MappingSchema::new();
3986        let cols = vec![
3987            ("order_id".to_string(), DataType::Unknown),
3988            ("customer_id".to_string(), DataType::Unknown),
3989        ];
3990        schema
3991            .add_table("db.schema.stg_orders", &cols, None)
3992            .unwrap();
3993
3994        let node = lineage_with_schema(
3995            "customer_id",
3996            &expr,
3997            Some(&schema as &dyn Schema),
3998            None,
3999            false,
4000        )
4001        .unwrap();
4002        assert_eq!(node.name, "customer_id");
4003    }
4004
4005    #[test]
4006    fn test_lineage_cte_star_with_schema_nested() {
4007        // Nested CTEs: outer CTE references inner CTE with SELECT *,
4008        // inner CTE references external table with SELECT *
4009        let sql = r#"WITH
4010            raw AS (SELECT * FROM external_table),
4011            enriched AS (SELECT * FROM raw)
4012        SELECT * FROM enriched"#;
4013        let expr = parse(sql);
4014
4015        let mut schema = MappingSchema::new();
4016        let cols = vec![
4017            ("id".to_string(), DataType::Unknown),
4018            ("name".to_string(), DataType::Unknown),
4019        ];
4020        schema.add_table("external_table", &cols, None).unwrap();
4021
4022        let node =
4023            lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4024        assert_eq!(node.name, "name");
4025    }
4026
4027    #[test]
4028    fn test_lineage_cte_qualified_star_with_schema() {
4029        // CTE uses qualified star (orders.*) from a CTE whose columns
4030        // come from an external table via SELECT *
4031        let sql = r#"WITH
4032            orders AS (SELECT * FROM stg_orders),
4033            enriched AS (
4034                SELECT orders.*, 'extra' AS extra
4035                FROM orders
4036            )
4037        SELECT * FROM enriched"#;
4038        let expr = parse(sql);
4039
4040        let mut schema = MappingSchema::new();
4041        let cols = vec![
4042            ("order_id".to_string(), DataType::Unknown),
4043            ("total".to_string(), DataType::Unknown),
4044        ];
4045        schema.add_table("stg_orders", &cols, None).unwrap();
4046
4047        let node =
4048            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4049                .unwrap();
4050        assert_eq!(node.name, "order_id");
4051
4052        // Also verify the extra column works
4053        let extra =
4054            lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4055        assert_eq!(extra.name, "extra");
4056    }
4057
4058    #[test]
4059    fn test_lineage_cte_star_without_schema_still_works() {
4060        // Without schema, CTE-to-CTE star expansion still works
4061        let sql = r#"WITH
4062            cte1 AS (SELECT id, name FROM raw_table),
4063            cte2 AS (SELECT * FROM cte1)
4064        SELECT * FROM cte2"#;
4065        let expr = parse(sql);
4066
4067        // No schema — should still resolve through CTE chain
4068        let node = lineage("id", &expr, None, false).unwrap();
4069        assert_eq!(node.name, "id");
4070    }
4071
4072    #[test]
4073    fn test_lineage_nested_cte_star_with_join_and_schema() {
4074        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
4075        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
4076        let sql = r#"WITH
4077base_orders AS (
4078    SELECT * FROM stg_orders
4079),
4080with_payments AS (
4081    SELECT
4082        base_orders.*,
4083        p.amount
4084    FROM base_orders
4085    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
4086),
4087final_cte AS (
4088    SELECT * FROM with_payments
4089)
4090SELECT * FROM final_cte"#;
4091        let expr = parse(sql);
4092
4093        let mut schema = MappingSchema::new();
4094        let order_cols = vec![
4095            (
4096                "order_id".to_string(),
4097                crate::expressions::DataType::Unknown,
4098            ),
4099            (
4100                "customer_id".to_string(),
4101                crate::expressions::DataType::Unknown,
4102            ),
4103            ("status".to_string(), crate::expressions::DataType::Unknown),
4104        ];
4105        let pay_cols = vec![
4106            (
4107                "payment_id".to_string(),
4108                crate::expressions::DataType::Unknown,
4109            ),
4110            (
4111                "order_id".to_string(),
4112                crate::expressions::DataType::Unknown,
4113            ),
4114            ("amount".to_string(), crate::expressions::DataType::Unknown),
4115        ];
4116        schema.add_table("stg_orders", &order_cols, None).unwrap();
4117        schema.add_table("stg_payments", &pay_cols, None).unwrap();
4118
4119        // order_id should trace back to stg_orders
4120        let node =
4121            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4122                .unwrap();
4123        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4124
4125        // The leaf should be "stg_orders.order_id" (not just "order_id")
4126        let has_table_qualified = all_names
4127            .iter()
4128            .any(|n| n.contains('.') && n.contains("order_id"));
4129        assert!(
4130            has_table_qualified,
4131            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
4132            all_names
4133        );
4134
4135        // amount should trace back to stg_payments
4136        let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
4137            .unwrap();
4138        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4139
4140        let has_table_qualified = all_names
4141            .iter()
4142            .any(|n| n.contains('.') && n.contains("amount"));
4143        assert!(
4144            has_table_qualified,
4145            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
4146            all_names
4147        );
4148    }
4149
4150    #[test]
4151    fn test_lineage_cte_alias_resolution() {
4152        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
4153        let sql = r#"WITH import_stg_items AS (
4154    SELECT item_id, name, status FROM stg_items
4155)
4156SELECT base.item_id, base.status
4157FROM import_stg_items AS base"#;
4158        let expr = parse(sql);
4159
4160        let node = lineage("item_id", &expr, None, false).unwrap();
4161        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4162        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
4163        assert!(
4164            all_names.iter().any(|n| n == "stg_items.item_id"),
4165            "Expected leaf 'stg_items.item_id', got: {:?}",
4166            all_names
4167        );
4168    }
4169
4170    #[test]
4171    fn test_lineage_cte_alias_with_schema_and_star() {
4172        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
4173        let sql = r#"WITH import_stg AS (
4174    SELECT * FROM stg_items
4175)
4176SELECT base.item_id, base.status
4177FROM import_stg AS base"#;
4178        let expr = parse(sql);
4179
4180        let mut schema = MappingSchema::new();
4181        schema
4182            .add_table(
4183                "stg_items",
4184                &[
4185                    ("item_id".to_string(), DataType::Unknown),
4186                    ("name".to_string(), DataType::Unknown),
4187                    ("status".to_string(), DataType::Unknown),
4188                ],
4189                None,
4190            )
4191            .unwrap();
4192
4193        let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
4194            .unwrap();
4195        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4196        assert!(
4197            all_names.iter().any(|n| n == "stg_items.item_id"),
4198            "Expected leaf 'stg_items.item_id', got: {:?}",
4199            all_names
4200        );
4201    }
4202
4203    #[test]
4204    fn test_lineage_cte_alias_with_join() {
4205        // Multiple CTE aliases in a JOIN: each should resolve independently
4206        let sql = r#"WITH
4207    import_users AS (SELECT id, name FROM users),
4208    import_orders AS (SELECT id, user_id, amount FROM orders)
4209SELECT u.name, o.amount
4210FROM import_users AS u
4211LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
4212        let expr = parse(sql);
4213
4214        let node = lineage("name", &expr, None, false).unwrap();
4215        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4216        assert!(
4217            all_names.iter().any(|n| n == "users.name"),
4218            "Expected leaf 'users.name', got: {:?}",
4219            all_names
4220        );
4221
4222        let node = lineage("amount", &expr, None, false).unwrap();
4223        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4224        assert!(
4225            all_names.iter().any(|n| n == "orders.amount"),
4226            "Expected leaf 'orders.amount', got: {:?}",
4227            all_names
4228        );
4229    }
4230
4231    // -----------------------------------------------------------------------
4232    // Quoted CTE name tests — verifying SQL identifier case semantics
4233    // -----------------------------------------------------------------------
4234
4235    #[test]
4236    fn test_lineage_unquoted_cte_case_insensitive() {
4237        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
4238        // MyCte and MYCTE should match.
4239        let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
4240        let node = lineage("col", &expr, None, false).unwrap();
4241        assert_eq!(node.name, "col");
4242        assert!(
4243            !node.downstream.is_empty(),
4244            "Unquoted CTE should resolve case-insensitively"
4245        );
4246    }
4247
4248    #[test]
4249    fn test_lineage_quoted_cte_case_preserved() {
4250        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
4251        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
4252        let node = lineage("col", &expr, None, false).unwrap();
4253        assert_eq!(node.name, "col");
4254        assert!(
4255            !node.downstream.is_empty(),
4256            "Quoted CTE with matching case should resolve"
4257        );
4258    }
4259
4260    #[test]
4261    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
4262        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
4263        // sqlglot treats this as a table reference, not a CTE match.
4264        // Star expansion should NOT resolve through the CTE.
4265        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
4266        // lineage("col", ...) should fail because "mycte" is treated as an external
4267        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
4268        let result = lineage("col", &expr, None, false);
4269        assert!(
4270            result.is_err(),
4271            "Quoted CTE with case mismatch should not expand star: {:?}",
4272            result
4273        );
4274    }
4275
4276    #[test]
4277    fn test_lineage_mixed_quoted_unquoted_cte() {
4278        // Mix of unquoted and quoted CTEs in a nested chain.
4279        let expr = parse(
4280            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
4281        );
4282        let node = lineage("a", &expr, None, false).unwrap();
4283        assert_eq!(node.name, "a");
4284        assert!(
4285            !node.downstream.is_empty(),
4286            "Mixed quoted/unquoted CTE chain should resolve"
4287        );
4288    }
4289
4290    // -----------------------------------------------------------------------
4291    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
4292    // -----------------------------------------------------------------------
4293    //
4294    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
4295    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
4296    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
4297    // direct string comparison for CTE name matching, ignoring the quoted status.
4298    //
4299    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
4300    // and unquoted as case-insensitive. The scope system should do the same.
4301    //
4302    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
4303    // which is broader than the star expansion scope of this PR.
4304
4305    #[test]
4306    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
4307        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
4308        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
4309        // as "mycte" incorrectly resolves to the CTE.
4310        //
4311        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
4312        // case-sensitive: "mycte" should NOT match CTE "MyCte".
4313        //
4314        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
4315        // this test should fail — update the assertion to match correct behavior:
4316        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
4317        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
4318        let node = lineage("col", &expr, None, false).unwrap();
4319        assert!(!node.downstream.is_empty());
4320        let child = &node.downstream[0];
4321        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
4322        assert_eq!(
4323            child.source_name, "MyCte",
4324            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4325             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4326        );
4327    }
4328
4329    #[test]
4330    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
4331        // Known bug: same as above but with qualified column reference ("mycte".col).
4332        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
4333        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
4334        //
4335        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
4336        // this test should fail — update to assert source_name != "MyCte".
4337        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
4338        let node = lineage("col", &expr, None, false).unwrap();
4339        assert!(!node.downstream.is_empty());
4340        let child = &node.downstream[0];
4341        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
4342        assert_eq!(
4343            child.source_name, "MyCte",
4344            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4345             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4346        );
4347    }
4348
4349    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
4350
4351    /// sqlglot: test_node_name_doesnt_contain_comment
4352    /// Comments in column expressions should not affect lineage resolution.
4353    /// NOTE: This test uses SELECT * from a derived table, which is a separate
4354    /// known limitation in polyglot-sql (star expansion in subqueries).
4355    #[test]
4356    #[ignore = "requires derived table star expansion (separate issue)"]
4357    fn test_node_name_doesnt_contain_comment() {
4358        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
4359        let node = lineage("x", &expr, None, false).unwrap();
4360
4361        assert_eq!(node.name, "x");
4362        assert!(!node.downstream.is_empty());
4363    }
4364
4365    /// A line comment between SELECT and the first column wraps the column
4366    /// in an Annotated node. Lineage must unwrap it to find the column name.
4367    /// Verify that commented and uncommented queries produce identical lineage.
4368    #[test]
4369    fn test_comment_before_first_column_in_cte() {
4370        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
4371        let sql_without_comment = "with t as (select 1 as a) select a from t";
4372
4373        // Without comment — baseline
4374        let expr_ok = parse(sql_without_comment);
4375        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
4376
4377        // With comment — should produce identical lineage
4378        let expr_comment = parse(sql_with_comment);
4379        let node_comment = lineage("a", &expr_comment, None, false)
4380            .expect("with comment before first column should succeed");
4381
4382        assert_eq!(node_ok.name, node_comment.name, "node names should match");
4383        assert_eq!(
4384            node_ok.downstream_names(),
4385            node_comment.downstream_names(),
4386            "downstream lineage should be identical with or without comment"
4387        );
4388    }
4389
4390    /// Block comment between SELECT and first column.
4391    #[test]
4392    fn test_block_comment_before_first_column() {
4393        let sql = "with t as (select 1 as a) select /* section */ a from t";
4394        let expr = parse(sql);
4395        let node = lineage("a", &expr, None, false)
4396            .expect("block comment before first column should succeed");
4397        assert_eq!(node.name, "a");
4398        assert!(
4399            !node.downstream.is_empty(),
4400            "should have downstream lineage"
4401        );
4402    }
4403
4404    /// Comment before first column should not affect second column resolution.
4405    #[test]
4406    fn test_comment_before_first_column_second_col_ok() {
4407        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
4408        let expr = parse(sql);
4409
4410        let node_a =
4411            lineage("a", &expr, None, false).expect("column a with comment should succeed");
4412        assert_eq!(node_a.name, "a");
4413
4414        let node_b =
4415            lineage("b", &expr, None, false).expect("column b with comment should succeed");
4416        assert_eq!(node_b.name, "b");
4417    }
4418
4419    /// Aliased column with preceding comment.
4420    #[test]
4421    fn test_comment_before_aliased_column() {
4422        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
4423        let expr = parse(sql);
4424        let node =
4425            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
4426        assert_eq!(node.name, "y");
4427        assert!(
4428            !node.downstream.is_empty(),
4429            "aliased column should have downstream lineage"
4430        );
4431    }
4432}