Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19/// A node in the column lineage graph
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22    /// Name of this lineage step (e.g., "table.column")
23    pub name: String,
24    /// The expression at this node
25    pub expression: Expression,
26    /// The source expression (the full query context)
27    pub source: Expression,
28    /// Downstream nodes that depend on this one
29    pub downstream: Vec<LineageNode>,
30    /// Optional source name (e.g., for derived tables)
31    pub source_name: String,
32    /// Semantic source kind for downstream consumers.
33    pub source_kind: SourceKind,
34    /// User-written source alias when different from canonical source name.
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub source_alias: Option<String>,
37    /// Optional reference node name (e.g., for CTEs)
38    pub reference_node_name: String,
39}
40
41impl LineageNode {
42    /// Create a new lineage node
43    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44        Self {
45            name: name.into(),
46            expression,
47            source,
48            downstream: Vec::new(),
49            source_name: String::new(),
50            source_kind: SourceKind::Unknown,
51            source_alias: None,
52            reference_node_name: String::new(),
53        }
54    }
55
56    /// Iterate over all nodes in the lineage graph using DFS
57    pub fn walk(&self) -> LineageWalker<'_> {
58        LineageWalker { stack: vec![self] }
59    }
60
61    /// Get all downstream column names
62    pub fn downstream_names(&self) -> Vec<String> {
63        self.downstream.iter().map(|n| n.name.clone()).collect()
64    }
65}
66
67fn source_kind_for_scope_context(
68    scope: &Scope,
69    source_name: &str,
70    reference_node_name: &str,
71) -> SourceKind {
72    if source_name.is_empty() && reference_node_name.is_empty() {
73        return SourceKind::Root;
74    }
75    if let Some(source_info) = scope.sources.get(source_name) {
76        return source_info.kind;
77    }
78    if scope.cte_sources.contains_key(source_name) {
79        return SourceKind::Cte;
80    }
81    match scope.scope_type {
82        ScopeType::Cte => SourceKind::Cte,
83        ScopeType::DerivedTable => SourceKind::DerivedTable,
84        ScopeType::Udtf => SourceKind::Virtual,
85        _ => SourceKind::Unknown,
86    }
87}
88
89fn apply_scope_context(
90    node: &mut LineageNode,
91    scope: &Scope,
92    source_name: &str,
93    reference_node_name: &str,
94) {
95    node.source_name = source_name.to_string();
96    node.reference_node_name = reference_node_name.to_string();
97    node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100/// Iterator for walking the lineage graph
101pub struct LineageWalker<'a> {
102    stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106    type Item = &'a LineageNode;
107
108    fn next(&mut self) -> Option<Self::Item> {
109        if let Some(node) = self.stack.pop() {
110            // Add children in reverse order so they're visited in order
111            for child in node.downstream.iter().rev() {
112                self.stack.push(child);
113            }
114            Some(node)
115        } else {
116            None
117        }
118    }
119}
120
121// ---------------------------------------------------------------------------
122// ColumnRef: name or positional index for column lookup
123// ---------------------------------------------------------------------------
124
125/// Column reference for lineage tracing — by name or positional index.
126enum ColumnRef<'a> {
127    Name(&'a str),
128    Index(usize),
129}
130
131// ---------------------------------------------------------------------------
132// Public API
133// ---------------------------------------------------------------------------
134
135/// Build the lineage graph for a column in a SQL query
136///
137/// # Arguments
138/// * `column` - The column name to trace lineage for
139/// * `sql` - The SQL expression (SELECT, UNION, etc.)
140/// * `dialect` - Optional dialect for parsing
141/// * `trim_selects` - If true, trim the source SELECT to only include the target column
142///
143/// # Returns
144/// The root lineage node for the specified column
145///
146/// # Example
147/// ```ignore
148/// use polyglot_sql::lineage::lineage;
149/// use polyglot_sql::parse_one;
150/// use polyglot_sql::DialectType;
151///
152/// let sql = "SELECT a, b + 1 AS c FROM t";
153/// let expr = parse_one(sql, DialectType::Generic).unwrap();
154/// let node = lineage("c", &expr, None, false).unwrap();
155/// ```
156pub fn lineage(
157    column: &str,
158    sql: &Expression,
159    dialect: Option<DialectType>,
160    trim_selects: bool,
161) -> Result<LineageNode> {
162    // Fast path: skip clone when there are no CTEs to expand
163    let effective_sql = lineage_effective_expression(sql);
164    let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
165    if !has_with {
166        return lineage_from_expression(column, sql, dialect, trim_selects);
167    }
168    let mut owned = sql.clone();
169    expand_cte_stars(&mut owned, None);
170    lineage_from_expression(column, &owned, dialect, trim_selects)
171}
172
173/// Build the lineage graph for a column in a SQL query using optional schema metadata.
174///
175/// When `schema` is provided, the query is first qualified with
176/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
177/// ambiguous column references.
178///
179/// # Arguments
180/// * `column` - The column name to trace lineage for
181/// * `sql` - The SQL expression (SELECT, UNION, etc.)
182/// * `schema` - Optional schema used for qualification
183/// * `dialect` - Optional dialect for qualification and lineage handling
184/// * `trim_selects` - If true, trim the source SELECT to only include the target column
185///
186/// # Returns
187/// The root lineage node for the specified column
188pub fn lineage_with_schema(
189    column: &str,
190    sql: &Expression,
191    schema: Option<&dyn Schema>,
192    dialect: Option<DialectType>,
193    trim_selects: bool,
194) -> Result<LineageNode> {
195    let mut qualified_expression = if let Some(schema) = schema {
196        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197            QualifyColumnsOptions::new().with_dialect(dialect_type)
198        } else {
199            QualifyColumnsOptions::new()
200        };
201
202        qualify_columns(sql.clone(), schema, &options).map_err(|e| {
203            Error::internal(format!("Lineage qualification failed with schema: {}", e))
204        })?
205    } else {
206        sql.clone()
207    };
208
209    // Annotate types in-place so lineage nodes carry type information
210    annotate_types(&mut qualified_expression, schema, dialect);
211
212    // Expand CTE stars on the already-owned expression (no extra clone).
213    // Pass schema so that stars from external tables can also be resolved.
214    expand_cte_stars(&mut qualified_expression, schema);
215
216    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220    column: &str,
221    sql: &Expression,
222    dialect: Option<DialectType>,
223    trim_selects: bool,
224) -> Result<LineageNode> {
225    let sql = lineage_effective_expression(sql);
226    let scope = build_scope(sql);
227    to_node(
228        ColumnRef::Name(column),
229        &scope,
230        dialect,
231        "",
232        "",
233        "",
234        trim_selects,
235    )
236}
237
238pub(crate) fn lineage_by_index_from_expression(
239    column_index: usize,
240    sql: &Expression,
241    dialect: Option<DialectType>,
242    trim_selects: bool,
243) -> Result<LineageNode> {
244    let sql = lineage_effective_expression(sql);
245    let scope = build_scope(sql);
246    to_node(
247        ColumnRef::Index(column_index),
248        &scope,
249        dialect,
250        "",
251        "",
252        "",
253        trim_selects,
254    )
255}
256
257fn lineage_effective_expression(sql: &Expression) -> &Expression {
258    match sql {
259        Expression::Prepare(prepare) => &prepare.statement,
260        _ => sql,
261    }
262}
263
264// ---------------------------------------------------------------------------
265// CTE star expansion
266// ---------------------------------------------------------------------------
267
268/// Normalize an identifier for CTE name matching.
269///
270/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
271/// quoted identifiers preserve their original case. This matches sqlglot's
272/// `normalize_identifiers` behavior.
273fn normalize_cte_name(ident: &Identifier) -> String {
274    if ident.quoted {
275        ident.name.clone()
276    } else {
277        ident.name.to_lowercase()
278    }
279}
280
281/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
282/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
283/// which qualify_columns cannot resolve because it processes each SELECT independently.
284///
285/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
286/// by looking up column names in the schema. This enables correct expansion of patterns
287/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
288///
289/// CTE name matching follows SQL identifier semantics: unquoted names are compared
290/// case-insensitively (lowercased), while quoted names preserve their original case.
291/// This matches sqlglot's `normalize_identifiers` behavior.
292pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
293    if let Expression::Prepare(prepare) = expr {
294        expand_cte_stars(&mut prepare.statement, schema);
295        return;
296    }
297
298    let select = match expr {
299        Expression::Select(s) => s,
300        _ => return,
301    };
302
303    let with = match &mut select.with {
304        Some(w) => w,
305        None => return,
306    };
307
308    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
309
310    for cte in &mut with.ctes {
311        let cte_name = normalize_cte_name(&cte.alias);
312
313        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
314        if !cte.columns.is_empty() {
315            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
316            resolved_cte_columns.insert(cte_name, cols);
317            continue;
318        }
319
320        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
321        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
322        // references itself. We detect this conservatively: if the CTE name appears as
323        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
324        // block are still expanded.
325        if with.recursive {
326            let is_self_referencing =
327                if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
328                    let body_sources = get_select_sources(body_select);
329                    body_sources.iter().any(|s| s.normalized == cte_name)
330                } else {
331                    false
332                };
333            if is_self_referencing {
334                continue;
335            }
336        }
337
338        // Get the SELECT from the CTE body (handle UNION by taking left branch)
339        let body_select = match get_leftmost_select_mut(&mut cte.this) {
340            Some(s) => s,
341            None => continue,
342        };
343
344        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
345        resolved_cte_columns.insert(cte_name, columns);
346    }
347
348    // Also expand stars in the outer SELECT itself
349    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
350}
351
352/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
353///
354/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
355/// are determined by the left branch. This matches sqlglot's behavior.
356fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
357    let mut current = expr;
358    for _ in 0..MAX_LINEAGE_DEPTH {
359        match current {
360            Expression::Select(s) => return Some(s),
361            Expression::Union(u) => current = &mut u.left,
362            Expression::Intersect(i) => current = &mut i.left,
363            Expression::Except(e) => current = &mut e.left,
364            Expression::Paren(p) => current = &mut p.this,
365            _ => return None,
366        }
367    }
368    None
369}
370
371/// Rewrite star expressions in a SELECT using resolved CTE column lists.
372/// Falls back to `schema` for external table column lookup.
373/// Returns the list of output column names after expansion.
374fn rewrite_stars_in_select(
375    select: &mut Select,
376    resolved_ctes: &HashMap<String, Vec<String>>,
377    schema: Option<&dyn Schema>,
378) -> Vec<String> {
379    // The AST represents star expressions in two forms depending on syntax:
380    //   - `SELECT *`      → Expression::Star (unqualified star)
381    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
382    // Both must be checked to handle all star patterns.
383    let has_star = select
384        .expressions
385        .iter()
386        .any(|e| matches!(e, Expression::Star(_)));
387    let has_qualified_star = select
388        .expressions
389        .iter()
390        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
391
392    if !has_star && !has_qualified_star {
393        // No stars — just extract column names without rewriting
394        return select
395            .expressions
396            .iter()
397            .filter_map(get_expression_output_name)
398            .collect();
399    }
400
401    let sources = get_select_sources(select);
402    let mut new_expressions = Vec::new();
403    let mut result_columns = Vec::new();
404
405    for expr in &select.expressions {
406        match expr {
407            Expression::Star(star) => {
408                let qual = star.table.as_ref();
409                if let Some(expanded) =
410                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
411                {
412                    for (src_alias, col_name) in &expanded {
413                        let table_id = Identifier::new(src_alias);
414                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
415                        result_columns.push(col_name.clone());
416                    }
417                } else {
418                    new_expressions.push(expr.clone());
419                    result_columns.push("*".to_string());
420                }
421            }
422            Expression::Column(c) if c.name.name == "*" => {
423                let qual = c.table.as_ref();
424                if let Some(expanded) =
425                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
426                {
427                    for (_src_alias, col_name) in &expanded {
428                        // Keep the original table qualifier for qualified stars (table.*)
429                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
430                        result_columns.push(col_name.clone());
431                    }
432                } else {
433                    new_expressions.push(expr.clone());
434                    result_columns.push("*".to_string());
435                }
436            }
437            _ => {
438                new_expressions.push(expr.clone());
439                if let Some(name) = get_expression_output_name(expr) {
440                    result_columns.push(name);
441                }
442            }
443        }
444    }
445
446    select.expressions = new_expressions;
447    result_columns
448}
449
450/// Try to expand a star expression by looking up source columns from resolved CTEs,
451/// falling back to the schema for external tables.
452/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
453/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
454fn expand_star_from_sources(
455    qualifier: Option<&Identifier>,
456    sources: &[SourceInfo],
457    resolved_ctes: &HashMap<String, Vec<String>>,
458    schema: Option<&dyn Schema>,
459) -> Option<Vec<(String, String)>> {
460    let mut expanded = Vec::new();
461
462    if let Some(qual) = qualifier {
463        // Qualified star: table.*
464        let qual_normalized = normalize_cte_name(qual);
465        for src in sources {
466            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
467                // Try CTE first
468                if let Some(cols) = resolved_ctes.get(&src.normalized) {
469                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
470                    return Some(expanded);
471                }
472                // Fall back to schema
473                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
474                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
475                    return Some(expanded);
476                }
477            }
478        }
479        None
480    } else {
481        // Unqualified star: expand all sources.
482        // Intentionally conservative: if any source can't be resolved, the entire
483        // expansion is aborted. Partial expansion would produce an incomplete column
484        // list, causing downstream lineage resolution to silently omit columns.
485        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
486        let mut any_expanded = false;
487        for src in sources {
488            if let Some(cols) = resolved_ctes.get(&src.normalized) {
489                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
490                any_expanded = true;
491            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
492                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
493                any_expanded = true;
494            } else {
495                return None;
496            }
497        }
498        if any_expanded {
499            Some(expanded)
500        } else {
501            None
502        }
503    }
504}
505
506/// Look up column names for a table from the schema.
507fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
508    let schema = schema?;
509    if fq_name.is_empty() {
510        return None;
511    }
512    schema
513        .column_names(fq_name)
514        .ok()
515        .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
516}
517
518/// Create a Column expression with the given name and optional table qualifier.
519fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
520    Expression::Column(Box::new(crate::expressions::Column {
521        name: Identifier::new(name),
522        table: table.cloned(),
523        join_mark: false,
524        trailing_comments: Vec::new(),
525        span: None,
526        inferred_type: None,
527    }))
528}
529
530/// Extract the output name of a SELECT expression.
531fn get_expression_output_name(expr: &Expression) -> Option<String> {
532    match expr {
533        Expression::Alias(a) => Some(a.alias.name.clone()),
534        Expression::Column(c) => Some(c.name.name.clone()),
535        Expression::Identifier(id) => Some(id.name.clone()),
536        Expression::Star(_) => Some("*".to_string()),
537        _ => None,
538    }
539}
540
541/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
542struct SourceInfo {
543    alias: String,
544    /// Whether this source was introduced through a quoted identifier.
545    ///
546    /// The schema-less star passthrough heuristic must stay conservative for
547    /// quoted sources because unresolved quoted table names can be distinct
548    /// from similarly named CTEs that older scope paths still compare
549    /// case-insensitively.
550    quoted: bool,
551    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
552    normalized: String,
553    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
554    fq_name: String,
555}
556
557/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
558/// SELECT's FROM and JOIN clauses in a single pass.
559fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
560    let mut sources = Vec::new();
561
562    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
563        fn virtual_source_info(alias: &Identifier) -> SourceInfo {
564            SourceInfo {
565                alias: alias.name.clone(),
566                quoted: alias.quoted,
567                normalized: normalize_cte_name(alias),
568                fq_name: alias.name.clone(),
569            }
570        }
571
572        fn named_virtual_source_info(alias: &str) -> SourceInfo {
573            SourceInfo {
574                alias: alias.to_string(),
575                quoted: false,
576                normalized: alias.to_lowercase(),
577                fq_name: alias.to_string(),
578            }
579        }
580
581        match expr {
582            Expression::Table(t) => {
583                let normalized = normalize_cte_name(&t.name);
584                let alias = t
585                    .alias
586                    .as_ref()
587                    .map(|a| a.name.clone())
588                    .unwrap_or_else(|| t.name.name.clone());
589                let mut parts = Vec::new();
590                if let Some(catalog) = &t.catalog {
591                    parts.push(catalog.name.clone());
592                }
593                if let Some(schema) = &t.schema {
594                    parts.push(schema.name.clone());
595                }
596                parts.push(t.name.name.clone());
597                let fq_name = parts.join(".");
598                Some(SourceInfo {
599                    alias,
600                    quoted: t.name.quoted,
601                    normalized,
602                    fq_name,
603                })
604            }
605            Expression::Subquery(s) => {
606                let alias_identifier = s.alias.as_ref()?;
607                let alias = alias_identifier.name.clone();
608                let normalized = alias.to_lowercase();
609                let fq_name = alias.clone();
610                Some(SourceInfo {
611                    alias,
612                    quoted: alias_identifier.quoted,
613                    normalized,
614                    fq_name,
615                })
616            }
617            Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
618            Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
619                Some(virtual_source_info(&a.alias))
620            }
621            Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
622            Expression::LateralView(lateral_view) => lateral_view
623                .table_alias
624                .as_ref()
625                .or_else(|| lateral_view.column_aliases.first())
626                .map(virtual_source_info),
627            Expression::Paren(p) => extract_source(&p.this),
628            _ => None,
629        }
630    }
631
632    if let Some(from) = &select.from {
633        for expr in &from.expressions {
634            if let Some(info) = extract_source(expr) {
635                sources.push(info);
636            }
637        }
638    }
639    for join in &select.joins {
640        if let Some(info) = extract_source(&join.this) {
641            sources.push(info);
642        }
643    }
644    for lateral_view in &select.lateral_views {
645        if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
646        {
647            sources.push(info);
648        }
649    }
650    sources
651}
652
653/// Get all source tables from a lineage graph
654pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
655    let mut tables = HashSet::new();
656    collect_source_tables(node, &mut tables);
657    tables
658}
659
660/// Recursively collect source table names from lineage graph
661pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
662    if let Expression::Table(table) = &node.source {
663        tables.insert(table.name.name.clone());
664    }
665    for child in &node.downstream {
666        collect_source_tables(child, tables);
667    }
668}
669
670// ---------------------------------------------------------------------------
671// Core recursive lineage builder
672// ---------------------------------------------------------------------------
673
674/// Maximum recursion depth for lineage tracing to prevent stack overflow
675/// on circular or deeply nested CTE chains.
676const MAX_LINEAGE_DEPTH: usize = 64;
677
678/// Recursively build a lineage node for a column in a scope.
679fn to_node(
680    column: ColumnRef<'_>,
681    scope: &Scope,
682    dialect: Option<DialectType>,
683    scope_name: &str,
684    source_name: &str,
685    reference_node_name: &str,
686    trim_selects: bool,
687) -> Result<LineageNode> {
688    to_node_inner(
689        column,
690        scope,
691        dialect,
692        scope_name,
693        source_name,
694        reference_node_name,
695        trim_selects,
696        &[],
697        0,
698    )
699}
700
701fn to_node_inner(
702    column: ColumnRef<'_>,
703    scope: &Scope,
704    dialect: Option<DialectType>,
705    scope_name: &str,
706    source_name: &str,
707    reference_node_name: &str,
708    trim_selects: bool,
709    ancestor_cte_scopes: &[Scope],
710    depth: usize,
711) -> Result<LineageNode> {
712    if depth > MAX_LINEAGE_DEPTH {
713        return Err(Error::internal(format!(
714            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
715        )));
716    }
717    let scope_expr = &scope.expression;
718
719    // Build combined CTE scopes: current scope's cte_scopes + ancestors
720    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
721    for s in ancestor_cte_scopes {
722        all_cte_scopes.push(s);
723    }
724
725    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
726    //    but we need the inner query (SELECT/UNION) for column lookup.
727    let effective_expr = match scope_expr {
728        Expression::Cte(cte) => &cte.this,
729        other => other,
730    };
731
732    // 1. Set operations (UNION / INTERSECT / EXCEPT)
733    if matches!(
734        effective_expr,
735        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
736    ) {
737        // For CTE wrapping a set op, create a temporary scope with the inner expression
738        if matches!(scope_expr, Expression::Cte(_)) {
739            let mut inner_scope = Scope::new(effective_expr.clone());
740            inner_scope.union_scopes = scope.union_scopes.clone();
741            inner_scope.sources = scope.sources.clone();
742            inner_scope.cte_sources = scope.cte_sources.clone();
743            inner_scope.cte_scopes = scope.cte_scopes.clone();
744            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
745            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
746            return handle_set_operation(
747                &column,
748                &inner_scope,
749                dialect,
750                scope_name,
751                source_name,
752                reference_node_name,
753                trim_selects,
754                ancestor_cte_scopes,
755                depth,
756            );
757        }
758        return handle_set_operation(
759            &column,
760            scope,
761            dialect,
762            scope_name,
763            source_name,
764            reference_node_name,
765            trim_selects,
766            ancestor_cte_scopes,
767            depth,
768        );
769    }
770
771    // 2. Find the select expression for this column
772    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
773    let column_name = resolve_column_name(&column, &select_expr);
774
775    // 3. Trim source if requested
776    let node_source = if trim_selects {
777        trim_source(effective_expr, &select_expr)
778    } else {
779        effective_expr.clone()
780    };
781
782    // 4. Create the lineage node
783    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
784    apply_scope_context(&mut node, scope, source_name, reference_node_name);
785
786    // 5. Star handling — add downstream for each source
787    if let Expression::Star(star) = &select_expr {
788        let star_table = star
789            .table
790            .as_ref()
791            .map(|identifier| identifier.name.as_str());
792        for (name, source_info) in &scope.sources {
793            if let Some(star_table) = star_table {
794                let table_matches = name.eq_ignore_ascii_case(star_table)
795                    || source_info
796                        .alias
797                        .as_deref()
798                        .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
799                    || matches!(
800                        &source_info.expression,
801                        Expression::Table(table_ref)
802                            if table_name_from_table_ref(table_ref).eq_ignore_ascii_case(star_table)
803                    );
804                if !table_matches {
805                    continue;
806                }
807            }
808
809            let mut child = LineageNode::new(
810                format!("{}.*", name),
811                Expression::Star(crate::expressions::Star {
812                    table: star.table.clone(),
813                    except: None,
814                    replace: None,
815                    rename: None,
816                    trailing_comments: vec![],
817                    span: None,
818                }),
819                source_info.expression.clone(),
820            );
821            apply_source_info_context(&mut child, name, source_info);
822            node.downstream.push(child);
823        }
824        return Ok(node);
825    }
826
827    // 6. Subqueries in select — trace through scalar subqueries
828    let subqueries: Vec<&Expression> =
829        select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
830    for sq_expr in subqueries {
831        if let Expression::Subquery(sq) = sq_expr {
832            for sq_scope in &scope.subquery_scopes {
833                if sq_scope.expression == sq.this {
834                    if let Ok(child) = to_node_inner(
835                        ColumnRef::Index(0),
836                        sq_scope,
837                        dialect,
838                        &column_name,
839                        "",
840                        "",
841                        trim_selects,
842                        ancestor_cte_scopes,
843                        depth + 1,
844                    ) {
845                        node.downstream.push(child);
846                    }
847                    break;
848                }
849            }
850        }
851    }
852
853    // 7. Column references — trace each column to its source
854    let col_refs = find_column_refs_in_expr(&select_expr, dialect);
855    for col_ref in col_refs {
856        let col_name = &col_ref.column;
857        if let Some(ref table_id) = col_ref.table {
858            let tbl = &table_id.name;
859            resolve_qualified_column(
860                &mut node,
861                scope,
862                dialect,
863                tbl,
864                col_name,
865                &column_name,
866                trim_selects,
867                &all_cte_scopes,
868                depth,
869            );
870        } else {
871            resolve_unqualified_column(
872                &mut node,
873                scope,
874                dialect,
875                col_name,
876                &column_name,
877                trim_selects,
878                &all_cte_scopes,
879                depth,
880            );
881        }
882    }
883
884    Ok(node)
885}
886
887// ---------------------------------------------------------------------------
888// Set operation handling
889// ---------------------------------------------------------------------------
890
891fn handle_set_operation(
892    column: &ColumnRef<'_>,
893    scope: &Scope,
894    dialect: Option<DialectType>,
895    scope_name: &str,
896    source_name: &str,
897    reference_node_name: &str,
898    trim_selects: bool,
899    ancestor_cte_scopes: &[Scope],
900    depth: usize,
901) -> Result<LineageNode> {
902    let scope_expr = &scope.expression;
903
904    // Determine column index
905    let col_index = match column {
906        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
907        ColumnRef::Index(i) => *i,
908    };
909
910    let col_name = match column {
911        ColumnRef::Name(name) => name.to_string(),
912        ColumnRef::Index(_) => format!("_{col_index}"),
913    };
914
915    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
916    apply_scope_context(&mut node, scope, source_name, reference_node_name);
917
918    // Recurse into each union branch
919    for branch_scope in &scope.union_scopes {
920        if let Ok(child) = to_node_inner(
921            ColumnRef::Index(col_index),
922            branch_scope,
923            dialect,
924            scope_name,
925            "",
926            "",
927            trim_selects,
928            ancestor_cte_scopes,
929            depth + 1,
930        ) {
931            node.downstream.push(child);
932        }
933    }
934
935    Ok(node)
936}
937
938// ---------------------------------------------------------------------------
939// Column resolution helpers
940// ---------------------------------------------------------------------------
941
942fn resolve_qualified_column(
943    node: &mut LineageNode,
944    scope: &Scope,
945    dialect: Option<DialectType>,
946    table: &str,
947    col_name: &str,
948    parent_name: &str,
949    trim_selects: bool,
950    all_cte_scopes: &[&Scope],
951    depth: usize,
952) {
953    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
954    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
955    let resolved_cte_name = resolve_cte_alias(scope, table);
956    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
957
958    // Check if table is a CTE reference — check both the current scope's cte_sources
959    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
960    let is_cte = scope.cte_sources.contains_key(effective_table)
961        || all_cte_scopes.iter().any(
962            |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
963        );
964    if is_cte {
965        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
966            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
967            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
968            if let Ok(child) = to_node_inner(
969                ColumnRef::Name(col_name),
970                child_scope,
971                dialect,
972                parent_name,
973                effective_table,
974                parent_name,
975                trim_selects,
976                &ancestors,
977                depth + 1,
978            ) {
979                node.downstream.push(child);
980                return;
981            }
982        }
983    }
984
985    // Check if table is a derived table (is_scope = true in sources)
986    if let Some(source_info) = scope.sources.get(table) {
987        if source_info.is_scope {
988            if let Some(child_scope) = find_child_scope(scope, table) {
989                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
990                if let Ok(child) = to_node_inner(
991                    ColumnRef::Name(col_name),
992                    child_scope,
993                    dialect,
994                    parent_name,
995                    table,
996                    parent_name,
997                    trim_selects,
998                    &ancestors,
999                    depth + 1,
1000                ) {
1001                    node.downstream.push(child);
1002                    return;
1003                }
1004            }
1005        }
1006    }
1007
1008    // Base table source found in current scope: preserve alias in the display name
1009    // but store the resolved table expression and name for downstream consumers.
1010    if let Some(source_info) = scope.sources.get(table) {
1011        if !source_info.is_scope {
1012            let mut child = make_table_column_node_from_source(table, col_name, source_info);
1013            if source_info.kind == SourceKind::Virtual {
1014                attach_virtual_source_dependencies(
1015                    &mut child,
1016                    scope,
1017                    dialect,
1018                    table,
1019                    &source_info.expression,
1020                    trim_selects,
1021                    all_cte_scopes,
1022                    depth,
1023                );
1024            }
1025            node.downstream.push(child);
1026            return;
1027        }
1028    }
1029
1030    // Base table or unresolved — terminal node
1031    node.downstream
1032        .push(make_table_column_node(table, col_name));
1033}
1034
1035/// Resolve a FROM alias to the original CTE name.
1036///
1037/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
1038/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
1039/// This function checks if `name` is such an alias and returns the CTE name.
1040fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1041    // If it's already a known CTE name, no resolution needed
1042    if scope.cte_sources.contains_key(name) {
1043        return None;
1044    }
1045    // Check if the source's expression is a CTE — if so, extract the CTE name
1046    if let Some(source_info) = scope.sources.get(name) {
1047        if source_info.is_scope {
1048            if let Expression::Cte(cte) = &source_info.expression {
1049                let cte_name = &cte.alias.name;
1050                if scope.cte_sources.contains_key(cte_name) {
1051                    return Some(cte_name.clone());
1052                }
1053            }
1054        }
1055    }
1056    None
1057}
1058
1059fn resolve_unqualified_column(
1060    node: &mut LineageNode,
1061    scope: &Scope,
1062    dialect: Option<DialectType>,
1063    col_name: &str,
1064    parent_name: &str,
1065    trim_selects: bool,
1066    all_cte_scopes: &[&Scope],
1067    depth: usize,
1068) {
1069    // Try to find which source this column belongs to.
1070    // Build the source list from the actual FROM/JOIN clauses to avoid
1071    // mixing in CTE definitions that are in scope but not referenced.
1072    let from_source_names = source_names_from_from_join(scope);
1073
1074    if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1075        resolve_qualified_column(
1076            node,
1077            scope,
1078            dialect,
1079            &tbl,
1080            col_name,
1081            parent_name,
1082            trim_selects,
1083            all_cte_scopes,
1084            depth,
1085        );
1086        return;
1087    }
1088
1089    if from_source_names.len() == 1 {
1090        let tbl = &from_source_names[0];
1091        resolve_qualified_column(
1092            node,
1093            scope,
1094            dialect,
1095            tbl,
1096            col_name,
1097            parent_name,
1098            trim_selects,
1099            all_cte_scopes,
1100            depth,
1101        );
1102        return;
1103    }
1104
1105    // Multiple sources — can't resolve without schema info, add unqualified node
1106    let child = LineageNode::new(
1107        col_name.to_string(),
1108        Expression::Column(Box::new(crate::expressions::Column {
1109            name: crate::expressions::Identifier::new(col_name.to_string()),
1110            table: None,
1111            join_mark: false,
1112            trailing_comments: vec![],
1113            span: None,
1114            inferred_type: None,
1115        })),
1116        node.source.clone(),
1117    );
1118    node.downstream.push(child);
1119}
1120
1121fn unique_virtual_source_for_column(
1122    scope: &Scope,
1123    source_names: &[String],
1124    col_name: &str,
1125) -> Option<String> {
1126    let mut matches = source_names.iter().filter_map(|source_name| {
1127        let source = scope.sources.get(source_name)?;
1128        if source.kind == SourceKind::Virtual
1129            && virtual_source_output_columns(source)
1130                .any(|column| column.eq_ignore_ascii_case(col_name))
1131        {
1132            Some(source_name.clone())
1133        } else {
1134            None
1135        }
1136    });
1137
1138    let first = matches.next()?;
1139    if matches.next().is_none() {
1140        Some(first)
1141    } else {
1142        None
1143    }
1144}
1145
1146fn virtual_source_output_columns(
1147    source_info: &ScopeSourceInfo,
1148) -> Box<dyn Iterator<Item = String> + '_> {
1149    match &source_info.expression {
1150        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1151        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1152            Box::new(alias_output_columns(alias))
1153        }
1154        Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1155        Expression::LateralView(lateral_view) => {
1156            Box::new(lateral_view_output_columns(lateral_view))
1157        }
1158        _ => Box::new(source_info.alias.clone().into_iter()),
1159    }
1160}
1161
1162fn unnest_output_columns(
1163    unnest: &crate::expressions::UnnestFunc,
1164) -> impl Iterator<Item = String> + '_ {
1165    unnest
1166        .alias
1167        .iter()
1168        .map(|alias| alias.name.clone())
1169        .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1170}
1171
1172fn alias_output_columns(
1173    alias: &crate::expressions::Alias,
1174) -> Box<dyn Iterator<Item = String> + '_> {
1175    if alias.column_aliases.is_empty() {
1176        Box::new(std::iter::once(alias.alias.name.clone()))
1177    } else {
1178        Box::new(
1179            alias
1180                .column_aliases
1181                .iter()
1182                .map(|column| column.name.clone()),
1183        )
1184    }
1185}
1186
1187fn lateral_output_columns(
1188    lateral: &crate::expressions::Lateral,
1189) -> Box<dyn Iterator<Item = String> + '_> {
1190    if lateral.column_aliases.is_empty() {
1191        default_virtual_output_columns(&lateral.this)
1192    } else {
1193        Box::new(lateral.column_aliases.iter().cloned())
1194    }
1195}
1196
1197fn lateral_view_output_columns(
1198    lateral_view: &crate::expressions::LateralView,
1199) -> Box<dyn Iterator<Item = String> + '_> {
1200    Box::new(
1201        lateral_view
1202            .column_aliases
1203            .iter()
1204            .map(|column| column.name.clone()),
1205    )
1206}
1207
1208fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1209    match expr {
1210        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1211        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1212            alias_output_columns(alias)
1213        }
1214        Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1215            Box::new(
1216                ["seq", "key", "path", "index", "value", "this"]
1217                    .into_iter()
1218                    .map(String::from),
1219            )
1220        }
1221        _ => Box::new(std::iter::empty()),
1222    }
1223}
1224
1225fn attach_virtual_source_dependencies(
1226    node: &mut LineageNode,
1227    scope: &Scope,
1228    dialect: Option<DialectType>,
1229    source_alias: &str,
1230    source_expr: &Expression,
1231    trim_selects: bool,
1232    all_cte_scopes: &[&Scope],
1233    depth: usize,
1234) {
1235    let parent_name = node.name.clone();
1236    let mut seen = HashSet::new();
1237    for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1238        let key = (
1239            col_ref.table.as_ref().map(|t| t.name.clone()),
1240            col_ref.column.clone(),
1241        );
1242        if !seen.insert(key) {
1243            continue;
1244        }
1245
1246        if let Some(table_id) = col_ref.table {
1247            let table = table_id.name;
1248            if table == source_alias {
1249                continue;
1250            }
1251            resolve_qualified_column(
1252                node,
1253                scope,
1254                dialect,
1255                &table,
1256                &col_ref.column,
1257                &parent_name,
1258                trim_selects,
1259                all_cte_scopes,
1260                depth + 1,
1261            );
1262        } else {
1263            let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1264            if non_virtual_sources.len() == 1 {
1265                resolve_qualified_column(
1266                    node,
1267                    scope,
1268                    dialect,
1269                    &non_virtual_sources[0],
1270                    &col_ref.column,
1271                    &parent_name,
1272                    trim_selects,
1273                    all_cte_scopes,
1274                    depth + 1,
1275                );
1276            }
1277        }
1278    }
1279}
1280
1281fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1282    fn source_name(expr: &Expression) -> Option<String> {
1283        match expr {
1284            Expression::Table(table) => Some(
1285                table
1286                    .alias
1287                    .as_ref()
1288                    .map(|a| a.name.clone())
1289                    .unwrap_or_else(|| table.name.name.clone()),
1290            ),
1291            Expression::Subquery(subquery) => {
1292                subquery.alias.as_ref().map(|alias| alias.name.clone())
1293            }
1294            Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1295            Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1296                Some(alias.alias.name.clone())
1297            }
1298            Expression::Lateral(lateral) => lateral.alias.clone(),
1299            Expression::LateralView(lateral_view) => lateral_view
1300                .table_alias
1301                .as_ref()
1302                .or_else(|| lateral_view.column_aliases.first())
1303                .map(|alias| alias.name.clone()),
1304            Expression::Paren(paren) => source_name(&paren.this),
1305            _ => None,
1306        }
1307    }
1308
1309    let effective_expr = match &scope.expression {
1310        Expression::Cte(cte) => &cte.this,
1311        expr => expr,
1312    };
1313
1314    let mut names = Vec::new();
1315    let mut seen = std::collections::HashSet::new();
1316
1317    if let Expression::Select(select) = effective_expr {
1318        if let Some(from) = &select.from {
1319            for expr in &from.expressions {
1320                if let Some(name) = source_name(expr) {
1321                    if !name.is_empty() && seen.insert(name.clone()) {
1322                        names.push(name);
1323                    }
1324                }
1325            }
1326        }
1327        for join in &select.joins {
1328            if let Some(name) = source_name(&join.this) {
1329                if !name.is_empty() && seen.insert(name.clone()) {
1330                    names.push(name);
1331                }
1332            }
1333        }
1334        for lateral_view in &select.lateral_views {
1335            if let Some(name) =
1336                source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
1337            {
1338                if !name.is_empty() && seen.insert(name.clone()) {
1339                    names.push(name);
1340                }
1341            }
1342        }
1343    }
1344
1345    names
1346}
1347
1348fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1349    source_names_from_from_join(scope)
1350        .into_iter()
1351        .filter(|name| {
1352            !matches!(
1353                scope.sources.get(name).map(|source| source.kind),
1354                Some(SourceKind::Virtual)
1355            )
1356        })
1357        .collect()
1358}
1359
1360// ---------------------------------------------------------------------------
1361// Helper functions
1362// ---------------------------------------------------------------------------
1363
1364/// Get the alias or name of an expression
1365fn get_alias_or_name(expr: &Expression) -> Option<String> {
1366    match expr {
1367        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1368        Expression::Column(col) => Some(col.name.name.clone()),
1369        Expression::Identifier(id) => Some(id.name.clone()),
1370        Expression::Star(_) => Some("*".to_string()),
1371        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
1372        // Unwrap to get the actual column/alias name from the inner expression.
1373        Expression::Annotated(a) => get_alias_or_name(&a.this),
1374        _ => None,
1375    }
1376}
1377
1378/// Resolve the display name for a column reference.
1379fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1380    match column {
1381        ColumnRef::Name(n) => n.to_string(),
1382        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1383    }
1384}
1385
1386/// Find the select expression matching a column reference.
1387fn find_select_expr(
1388    scope_expr: &Expression,
1389    column: &ColumnRef<'_>,
1390    dialect: Option<DialectType>,
1391) -> Result<Expression> {
1392    if let Expression::Select(ref select) = scope_expr {
1393        match column {
1394            ColumnRef::Name(name) => {
1395                let normalized_name = normalize_column_name(name, dialect);
1396                for expr in &select.expressions {
1397                    if let Some(alias_or_name) = get_alias_or_name(expr) {
1398                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1399                            return Ok(expr.clone());
1400                        }
1401                    }
1402                }
1403                if let Some(expr) = synthesize_star_passthrough_expr(select, name) {
1404                    return Ok(expr);
1405                }
1406                Err(crate::error::Error::parse(
1407                    format!("Cannot find column '{}' in query", name),
1408                    0,
1409                    0,
1410                    0,
1411                    0,
1412                ))
1413            }
1414            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1415                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1416            }),
1417        }
1418    } else {
1419        Err(crate::error::Error::parse(
1420            "Expected SELECT expression for column lookup",
1421            0,
1422            0,
1423            0,
1424            0,
1425        ))
1426    }
1427}
1428
1429fn synthesize_star_passthrough_expr(select: &Select, name: &str) -> Option<Expression> {
1430    let sources = get_select_sources(select);
1431    if sources.is_empty() {
1432        return None;
1433    }
1434
1435    let mut candidate_aliases = Vec::new();
1436    let mut seen = HashSet::new();
1437
1438    for expr in &select.expressions {
1439        let aliases = match star_passthrough_source_aliases(expr, &sources) {
1440            StarPassthroughSources::None => continue,
1441            StarPassthroughSources::Ambiguous => return None,
1442            StarPassthroughSources::Aliases(aliases) => aliases,
1443        };
1444
1445        for alias in aliases {
1446            if seen.insert(alias.clone()) {
1447                candidate_aliases.push(alias);
1448            }
1449        }
1450    }
1451
1452    match candidate_aliases.as_slice() {
1453        [alias] => {
1454            let table = Identifier::new(alias.clone());
1455            Some(make_column_expr(name, Some(&table)))
1456        }
1457        _ => None,
1458    }
1459}
1460
1461enum StarPassthroughSources {
1462    None,
1463    Ambiguous,
1464    Aliases(Vec<String>),
1465}
1466
1467fn star_passthrough_source_aliases(
1468    expr: &Expression,
1469    sources: &[SourceInfo],
1470) -> StarPassthroughSources {
1471    match expr {
1472        Expression::Star(star) => star_source_aliases(star.table.as_ref(), sources),
1473        Expression::Column(column) if column.name.name == "*" => {
1474            star_source_aliases(column.table.as_ref(), sources)
1475        }
1476        Expression::Annotated(annotated) => {
1477            star_passthrough_source_aliases(&annotated.this, sources)
1478        }
1479        _ => StarPassthroughSources::None,
1480    }
1481}
1482
1483fn star_source_aliases(
1484    qualifier: Option<&Identifier>,
1485    sources: &[SourceInfo],
1486) -> StarPassthroughSources {
1487    if let Some(qualifier) = qualifier {
1488        let mut aliases = Vec::new();
1489
1490        for source in sources {
1491            if source_matches_star_qualifier(source, qualifier) {
1492                aliases.push(source.alias.clone());
1493            }
1494        }
1495
1496        return match aliases.len() {
1497            0 => StarPassthroughSources::None,
1498            1 => StarPassthroughSources::Aliases(aliases),
1499            _ => StarPassthroughSources::Ambiguous,
1500        };
1501    }
1502
1503    match sources {
1504        // Do not synthesize a source column for unresolved quoted table stars.
1505        // This keeps quoted CTE/table case semantics intact while still allowing
1506        // the schema-less fallback for common unquoted SELECT * passthroughs.
1507        [source] if source.quoted => StarPassthroughSources::None,
1508        [source] => StarPassthroughSources::Aliases(vec![source.alias.clone()]),
1509        [] => StarPassthroughSources::None,
1510        _ => StarPassthroughSources::Ambiguous,
1511    }
1512}
1513
1514fn source_matches_star_qualifier(source: &SourceInfo, qualifier: &Identifier) -> bool {
1515    if source.normalized == normalize_cte_name(qualifier) {
1516        return true;
1517    }
1518
1519    if qualifier.quoted {
1520        source.alias == qualifier.name
1521    } else {
1522        source.alias.eq_ignore_ascii_case(&qualifier.name)
1523    }
1524}
1525
1526/// Find the positional index of a column name in a set operation's first SELECT branch.
1527fn column_to_index(
1528    set_op_expr: &Expression,
1529    name: &str,
1530    dialect: Option<DialectType>,
1531) -> Result<usize> {
1532    let normalized_name = normalize_column_name(name, dialect);
1533    let mut expr = set_op_expr;
1534    loop {
1535        match expr {
1536            Expression::Union(u) => expr = &u.left,
1537            Expression::Intersect(i) => expr = &i.left,
1538            Expression::Except(e) => expr = &e.left,
1539            Expression::Select(select) => {
1540                for (i, e) in select.expressions.iter().enumerate() {
1541                    if let Some(alias_or_name) = get_alias_or_name(e) {
1542                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1543                            return Ok(i);
1544                        }
1545                    }
1546                }
1547                return Err(crate::error::Error::parse(
1548                    format!("Cannot find column '{}' in set operation", name),
1549                    0,
1550                    0,
1551                    0,
1552                    0,
1553                ));
1554            }
1555            _ => {
1556                return Err(crate::error::Error::parse(
1557                    "Expected SELECT or set operation",
1558                    0,
1559                    0,
1560                    0,
1561                    0,
1562                ))
1563            }
1564        }
1565    }
1566}
1567
1568fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1569    normalize_name(name, dialect, false, true)
1570}
1571
1572/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
1573fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1574    if let Expression::Select(select) = select_expr {
1575        let mut trimmed = select.as_ref().clone();
1576        trimmed.expressions = vec![target_expr.clone()];
1577        Expression::Select(Box::new(trimmed))
1578    } else {
1579        select_expr.clone()
1580    }
1581}
1582
1583/// Find the child scope (CTE or derived table) for a given source name.
1584fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1585    // Check CTE scopes
1586    if scope.cte_sources.contains_key(source_name) {
1587        for cte_scope in &scope.cte_scopes {
1588            if let Expression::Cte(cte) = &cte_scope.expression {
1589                if cte.alias.name == source_name {
1590                    return Some(cte_scope);
1591                }
1592            }
1593        }
1594    }
1595
1596    // Check derived table scopes
1597    if let Some(source_info) = scope.sources.get(source_name) {
1598        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1599            if let Expression::Subquery(sq) = &source_info.expression {
1600                for dt_scope in &scope.derived_table_scopes {
1601                    if dt_scope.expression == sq.this {
1602                        return Some(dt_scope);
1603                    }
1604                }
1605            }
1606        }
1607    }
1608
1609    None
1610}
1611
1612/// Find a CTE scope by name, searching through a combined list of CTE scopes.
1613/// This handles nested CTEs where the current scope doesn't have the CTE scope
1614/// as a direct child but knows about it via cte_sources.
1615fn find_child_scope_in<'a>(
1616    all_cte_scopes: &[&'a Scope],
1617    scope: &'a Scope,
1618    source_name: &str,
1619) -> Option<&'a Scope> {
1620    // First try the scope's own cte_scopes
1621    for cte_scope in &scope.cte_scopes {
1622        if let Expression::Cte(cte) = &cte_scope.expression {
1623            if cte.alias.name == source_name {
1624                return Some(cte_scope);
1625            }
1626        }
1627    }
1628
1629    // Then search through all ancestor CTE scopes
1630    for cte_scope in all_cte_scopes {
1631        if let Expression::Cte(cte) = &cte_scope.expression {
1632            if cte.alias.name == source_name {
1633                return Some(cte_scope);
1634            }
1635        }
1636    }
1637
1638    // Fall back to derived table scopes
1639    if let Some(source_info) = scope.sources.get(source_name) {
1640        if source_info.is_scope {
1641            if let Expression::Subquery(sq) = &source_info.expression {
1642                for dt_scope in &scope.derived_table_scopes {
1643                    if dt_scope.expression == sq.this {
1644                        return Some(dt_scope);
1645                    }
1646                }
1647            }
1648        }
1649    }
1650
1651    None
1652}
1653
1654/// Create a terminal lineage node for a table.column reference.
1655fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1656    let mut node = LineageNode::new(
1657        format!("{}.{}", table, column),
1658        Expression::Column(Box::new(crate::expressions::Column {
1659            name: crate::expressions::Identifier::new(column.to_string()),
1660            table: Some(crate::expressions::Identifier::new(table.to_string())),
1661            join_mark: false,
1662            trailing_comments: vec![],
1663            span: None,
1664            inferred_type: None,
1665        })),
1666        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1667    );
1668    node.source_name = table.to_string();
1669    node.source_kind = SourceKind::Table;
1670    node
1671}
1672
1673fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1674    let mut parts: Vec<String> = Vec::new();
1675    if let Some(catalog) = &table_ref.catalog {
1676        parts.push(catalog.name.clone());
1677    }
1678    if let Some(schema) = &table_ref.schema {
1679        parts.push(schema.name.clone());
1680    }
1681    parts.push(table_ref.name.name.clone());
1682    parts.join(".")
1683}
1684
1685fn apply_source_info_context(
1686    node: &mut LineageNode,
1687    source_key: &str,
1688    source_info: &ScopeSourceInfo,
1689) {
1690    node.source_kind = source_info.kind;
1691    node.source_name =
1692        source_info
1693            .lineage_name
1694            .clone()
1695            .unwrap_or_else(|| match &source_info.expression {
1696                Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
1697                _ => source_key.to_string(),
1698            });
1699    node.source_alias = source_info.alias.clone();
1700}
1701
1702fn make_table_column_node_from_source(
1703    source_key: &str,
1704    column: &str,
1705    source_info: &ScopeSourceInfo,
1706) -> LineageNode {
1707    let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
1708    let mut node = LineageNode::new(
1709        format!("{}.{}", lineage_name, column),
1710        Expression::Column(Box::new(crate::expressions::Column {
1711            name: crate::expressions::Identifier::new(column.to_string()),
1712            table: Some(crate::expressions::Identifier::new(
1713                lineage_name.to_string(),
1714            )),
1715            join_mark: false,
1716            trailing_comments: vec![],
1717            span: None,
1718            inferred_type: None,
1719        })),
1720        source_info.expression.clone(),
1721    );
1722
1723    apply_source_info_context(&mut node, source_key, source_info);
1724
1725    node
1726}
1727
1728/// Simple column reference extracted from an expression
1729#[derive(Debug, Clone)]
1730struct SimpleColumnRef {
1731    table: Option<crate::expressions::Identifier>,
1732    column: String,
1733}
1734
1735/// Find all column references in an expression (does not recurse into subqueries).
1736fn find_column_refs_in_expr(
1737    expr: &Expression,
1738    dialect: Option<DialectType>,
1739) -> Vec<SimpleColumnRef> {
1740    let mut refs = Vec::new();
1741    collect_column_refs(expr, dialect, &mut refs);
1742    refs
1743}
1744
1745fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1746    match expr {
1747        Expression::Column(col) => {
1748            col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1749        }
1750        Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1751        _ => false,
1752    }
1753}
1754
1755fn collect_column_refs(
1756    expr: &Expression,
1757    dialect: Option<DialectType>,
1758    refs: &mut Vec<SimpleColumnRef>,
1759) {
1760    let mut stack: Vec<&Expression> = vec![expr];
1761
1762    while let Some(current) = stack.pop() {
1763        match current {
1764            // === Leaf: collect Column references ===
1765            Expression::Column(col) => {
1766                refs.push(SimpleColumnRef {
1767                    table: col.table.clone(),
1768                    column: col.name.name.clone(),
1769                });
1770            }
1771
1772            // === Boundary: don't recurse into subqueries (handled separately) ===
1773            Expression::Subquery(_) | Expression::Exists(_) => {}
1774
1775            // === BinaryOp variants: left, right ===
1776            Expression::And(op)
1777            | Expression::Or(op)
1778            | Expression::Eq(op)
1779            | Expression::Neq(op)
1780            | Expression::Lt(op)
1781            | Expression::Lte(op)
1782            | Expression::Gt(op)
1783            | Expression::Gte(op)
1784            | Expression::Add(op)
1785            | Expression::Sub(op)
1786            | Expression::Mul(op)
1787            | Expression::Div(op)
1788            | Expression::Mod(op)
1789            | Expression::BitwiseAnd(op)
1790            | Expression::BitwiseOr(op)
1791            | Expression::BitwiseXor(op)
1792            | Expression::BitwiseLeftShift(op)
1793            | Expression::BitwiseRightShift(op)
1794            | Expression::Concat(op)
1795            | Expression::Adjacent(op)
1796            | Expression::TsMatch(op)
1797            | Expression::PropertyEQ(op)
1798            | Expression::ArrayContainsAll(op)
1799            | Expression::ArrayContainedBy(op)
1800            | Expression::ArrayOverlaps(op)
1801            | Expression::JSONBContainsAllTopKeys(op)
1802            | Expression::JSONBContainsAnyTopKeys(op)
1803            | Expression::JSONBDeleteAtPath(op)
1804            | Expression::ExtendsLeft(op)
1805            | Expression::ExtendsRight(op)
1806            | Expression::Is(op)
1807            | Expression::MemberOf(op)
1808            | Expression::NullSafeEq(op)
1809            | Expression::NullSafeNeq(op)
1810            | Expression::Glob(op)
1811            | Expression::Match(op) => {
1812                stack.push(&op.left);
1813                stack.push(&op.right);
1814            }
1815
1816            // === UnaryOp variants: this ===
1817            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1818                stack.push(&u.this);
1819            }
1820
1821            // === UnaryFunc variants: this ===
1822            Expression::Upper(f)
1823            | Expression::Lower(f)
1824            | Expression::Length(f)
1825            | Expression::LTrim(f)
1826            | Expression::RTrim(f)
1827            | Expression::Reverse(f)
1828            | Expression::Abs(f)
1829            | Expression::Sqrt(f)
1830            | Expression::Cbrt(f)
1831            | Expression::Ln(f)
1832            | Expression::Exp(f)
1833            | Expression::Sign(f)
1834            | Expression::Date(f)
1835            | Expression::Time(f)
1836            | Expression::DateFromUnixDate(f)
1837            | Expression::UnixDate(f)
1838            | Expression::UnixSeconds(f)
1839            | Expression::UnixMillis(f)
1840            | Expression::UnixMicros(f)
1841            | Expression::TimeStrToDate(f)
1842            | Expression::DateToDi(f)
1843            | Expression::DiToDate(f)
1844            | Expression::TsOrDiToDi(f)
1845            | Expression::TsOrDsToDatetime(f)
1846            | Expression::TsOrDsToTimestamp(f)
1847            | Expression::YearOfWeek(f)
1848            | Expression::YearOfWeekIso(f)
1849            | Expression::Initcap(f)
1850            | Expression::Ascii(f)
1851            | Expression::Chr(f)
1852            | Expression::Soundex(f)
1853            | Expression::ByteLength(f)
1854            | Expression::Hex(f)
1855            | Expression::LowerHex(f)
1856            | Expression::Unicode(f)
1857            | Expression::Radians(f)
1858            | Expression::Degrees(f)
1859            | Expression::Sin(f)
1860            | Expression::Cos(f)
1861            | Expression::Tan(f)
1862            | Expression::Asin(f)
1863            | Expression::Acos(f)
1864            | Expression::Atan(f)
1865            | Expression::IsNan(f)
1866            | Expression::IsInf(f)
1867            | Expression::ArrayLength(f)
1868            | Expression::ArraySize(f)
1869            | Expression::Cardinality(f)
1870            | Expression::ArrayReverse(f)
1871            | Expression::ArrayDistinct(f)
1872            | Expression::ArrayFlatten(f)
1873            | Expression::ArrayCompact(f)
1874            | Expression::Explode(f)
1875            | Expression::ExplodeOuter(f)
1876            | Expression::ToArray(f)
1877            | Expression::MapFromEntries(f)
1878            | Expression::MapKeys(f)
1879            | Expression::MapValues(f)
1880            | Expression::JsonArrayLength(f)
1881            | Expression::JsonKeys(f)
1882            | Expression::JsonType(f)
1883            | Expression::ParseJson(f)
1884            | Expression::ToJson(f)
1885            | Expression::Typeof(f)
1886            | Expression::BitwiseCount(f)
1887            | Expression::Year(f)
1888            | Expression::Month(f)
1889            | Expression::Day(f)
1890            | Expression::Hour(f)
1891            | Expression::Minute(f)
1892            | Expression::Second(f)
1893            | Expression::DayOfWeek(f)
1894            | Expression::DayOfWeekIso(f)
1895            | Expression::DayOfMonth(f)
1896            | Expression::DayOfYear(f)
1897            | Expression::WeekOfYear(f)
1898            | Expression::Quarter(f)
1899            | Expression::Epoch(f)
1900            | Expression::EpochMs(f)
1901            | Expression::TimeStrToUnix(f)
1902            | Expression::SHA(f)
1903            | Expression::SHA1Digest(f)
1904            | Expression::TimeToUnix(f)
1905            | Expression::JSONBool(f)
1906            | Expression::Int64(f)
1907            | Expression::MD5NumberLower64(f)
1908            | Expression::MD5NumberUpper64(f)
1909            | Expression::DateStrToDate(f)
1910            | Expression::DateToDateStr(f) => {
1911                stack.push(&f.this);
1912            }
1913
1914            // === BinaryFunc variants: this, expression ===
1915            Expression::Power(f)
1916            | Expression::NullIf(f)
1917            | Expression::IfNull(f)
1918            | Expression::Nvl(f)
1919            | Expression::UnixToTimeStr(f)
1920            | Expression::Contains(f)
1921            | Expression::StartsWith(f)
1922            | Expression::EndsWith(f)
1923            | Expression::Levenshtein(f)
1924            | Expression::ModFunc(f)
1925            | Expression::Atan2(f)
1926            | Expression::IntDiv(f)
1927            | Expression::AddMonths(f)
1928            | Expression::MonthsBetween(f)
1929            | Expression::NextDay(f)
1930            | Expression::ArrayContains(f)
1931            | Expression::ArrayPosition(f)
1932            | Expression::ArrayAppend(f)
1933            | Expression::ArrayPrepend(f)
1934            | Expression::ArrayUnion(f)
1935            | Expression::ArrayExcept(f)
1936            | Expression::ArrayRemove(f)
1937            | Expression::StarMap(f)
1938            | Expression::MapFromArrays(f)
1939            | Expression::MapContainsKey(f)
1940            | Expression::ElementAt(f)
1941            | Expression::JsonMergePatch(f)
1942            | Expression::JSONBContains(f)
1943            | Expression::JSONBExtract(f) => {
1944                stack.push(&f.this);
1945                stack.push(&f.expression);
1946            }
1947
1948            // === VarArgFunc variants: expressions ===
1949            Expression::Greatest(f)
1950            | Expression::Least(f)
1951            | Expression::Coalesce(f)
1952            | Expression::ArrayConcat(f)
1953            | Expression::ArrayIntersect(f)
1954            | Expression::ArrayZip(f)
1955            | Expression::MapConcat(f)
1956            | Expression::JsonArray(f) => {
1957                for e in &f.expressions {
1958                    stack.push(e);
1959                }
1960            }
1961
1962            // === AggFunc variants: this, filter, having_max, limit ===
1963            Expression::Sum(f)
1964            | Expression::Avg(f)
1965            | Expression::Min(f)
1966            | Expression::Max(f)
1967            | Expression::ArrayAgg(f)
1968            | Expression::CountIf(f)
1969            | Expression::Stddev(f)
1970            | Expression::StddevPop(f)
1971            | Expression::StddevSamp(f)
1972            | Expression::Variance(f)
1973            | Expression::VarPop(f)
1974            | Expression::VarSamp(f)
1975            | Expression::Median(f)
1976            | Expression::Mode(f)
1977            | Expression::First(f)
1978            | Expression::Last(f)
1979            | Expression::AnyValue(f)
1980            | Expression::ApproxDistinct(f)
1981            | Expression::ApproxCountDistinct(f)
1982            | Expression::LogicalAnd(f)
1983            | Expression::LogicalOr(f)
1984            | Expression::Skewness(f)
1985            | Expression::ArrayConcatAgg(f)
1986            | Expression::ArrayUniqueAgg(f)
1987            | Expression::BoolXorAgg(f)
1988            | Expression::BitwiseAndAgg(f)
1989            | Expression::BitwiseOrAgg(f)
1990            | Expression::BitwiseXorAgg(f) => {
1991                stack.push(&f.this);
1992                if let Some(ref filter) = f.filter {
1993                    stack.push(filter);
1994                }
1995                if let Some((ref expr, _)) = f.having_max {
1996                    stack.push(expr);
1997                }
1998                if let Some(ref limit) = f.limit {
1999                    stack.push(limit);
2000                }
2001            }
2002
2003            // === Generic Function / AggregateFunction: args ===
2004            Expression::Function(func) => {
2005                for arg in &func.args {
2006                    stack.push(arg);
2007                }
2008            }
2009            Expression::AggregateFunction(func) => {
2010                for arg in &func.args {
2011                    stack.push(arg);
2012                }
2013                if let Some(ref filter) = func.filter {
2014                    stack.push(filter);
2015                }
2016                if let Some(ref limit) = func.limit {
2017                    stack.push(limit);
2018                }
2019            }
2020
2021            // === WindowFunction: this (skip Over for lineage purposes) ===
2022            Expression::WindowFunction(wf) => {
2023                stack.push(&wf.this);
2024            }
2025
2026            // === Containers and special expressions ===
2027            Expression::Alias(a) => {
2028                stack.push(&a.this);
2029            }
2030            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
2031                stack.push(&c.this);
2032                if let Some(ref fmt) = c.format {
2033                    stack.push(fmt);
2034                }
2035                if let Some(ref def) = c.default {
2036                    stack.push(def);
2037                }
2038            }
2039            Expression::Paren(p) => {
2040                stack.push(&p.this);
2041            }
2042            Expression::Annotated(a) => {
2043                stack.push(&a.this);
2044            }
2045            Expression::Case(case) => {
2046                if let Some(ref operand) = case.operand {
2047                    stack.push(operand);
2048                }
2049                for (cond, result) in &case.whens {
2050                    stack.push(cond);
2051                    stack.push(result);
2052                }
2053                if let Some(ref else_expr) = case.else_ {
2054                    stack.push(else_expr);
2055                }
2056            }
2057            Expression::Collation(c) => {
2058                stack.push(&c.this);
2059            }
2060            Expression::In(i) => {
2061                stack.push(&i.this);
2062                for e in &i.expressions {
2063                    stack.push(e);
2064                }
2065                if let Some(ref q) = i.query {
2066                    stack.push(q);
2067                }
2068                if let Some(ref u) = i.unnest {
2069                    stack.push(u);
2070                }
2071            }
2072            Expression::Between(b) => {
2073                stack.push(&b.this);
2074                stack.push(&b.low);
2075                stack.push(&b.high);
2076            }
2077            Expression::IsNull(n) => {
2078                stack.push(&n.this);
2079            }
2080            Expression::IsTrue(t) | Expression::IsFalse(t) => {
2081                stack.push(&t.this);
2082            }
2083            Expression::IsJson(j) => {
2084                stack.push(&j.this);
2085            }
2086            Expression::Like(l) | Expression::ILike(l) => {
2087                stack.push(&l.left);
2088                stack.push(&l.right);
2089                if let Some(ref esc) = l.escape {
2090                    stack.push(esc);
2091                }
2092            }
2093            Expression::SimilarTo(s) => {
2094                stack.push(&s.this);
2095                stack.push(&s.pattern);
2096                if let Some(ref esc) = s.escape {
2097                    stack.push(esc);
2098                }
2099            }
2100            Expression::Ordered(o) => {
2101                stack.push(&o.this);
2102            }
2103            Expression::Array(a) => {
2104                for e in &a.expressions {
2105                    stack.push(e);
2106                }
2107            }
2108            Expression::Tuple(t) => {
2109                for e in &t.expressions {
2110                    stack.push(e);
2111                }
2112            }
2113            Expression::Struct(s) => {
2114                for (_, e) in &s.fields {
2115                    stack.push(e);
2116                }
2117            }
2118            Expression::Subscript(s) => {
2119                stack.push(&s.this);
2120                stack.push(&s.index);
2121            }
2122            Expression::Dot(d) => {
2123                stack.push(&d.this);
2124            }
2125            Expression::MethodCall(m) => {
2126                if !matches!(dialect, Some(DialectType::BigQuery))
2127                    || !is_bigquery_safe_namespace_receiver(&m.this)
2128                {
2129                    stack.push(&m.this);
2130                }
2131                for arg in &m.args {
2132                    stack.push(arg);
2133                }
2134            }
2135            Expression::ArraySlice(s) => {
2136                stack.push(&s.this);
2137                if let Some(ref start) = s.start {
2138                    stack.push(start);
2139                }
2140                if let Some(ref end) = s.end {
2141                    stack.push(end);
2142                }
2143            }
2144            Expression::Lambda(l) => {
2145                stack.push(&l.body);
2146            }
2147            Expression::NamedArgument(n) => {
2148                stack.push(&n.value);
2149            }
2150            Expression::Lateral(l) => {
2151                stack.push(&l.this);
2152                if let Some(ref view) = l.view {
2153                    stack.push(view);
2154                }
2155                if let Some(ref outer) = l.outer {
2156                    stack.push(outer);
2157                }
2158                if let Some(ref ordinality) = l.ordinality {
2159                    stack.push(ordinality);
2160                }
2161            }
2162            Expression::LateralView(lv) => {
2163                stack.push(&lv.this);
2164            }
2165            Expression::TryCatch(t) => {
2166                for stmt in &t.try_body {
2167                    stack.push(stmt);
2168                }
2169                if let Some(catch_body) = &t.catch_body {
2170                    for stmt in catch_body {
2171                        stack.push(stmt);
2172                    }
2173                }
2174            }
2175            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
2176                stack.push(e);
2177            }
2178
2179            // === Custom function structs ===
2180            Expression::Substring(f) => {
2181                stack.push(&f.this);
2182                stack.push(&f.start);
2183                if let Some(ref len) = f.length {
2184                    stack.push(len);
2185                }
2186            }
2187            Expression::Trim(f) => {
2188                stack.push(&f.this);
2189                if let Some(ref chars) = f.characters {
2190                    stack.push(chars);
2191                }
2192            }
2193            Expression::Replace(f) => {
2194                stack.push(&f.this);
2195                stack.push(&f.old);
2196                stack.push(&f.new);
2197            }
2198            Expression::IfFunc(f) => {
2199                stack.push(&f.condition);
2200                stack.push(&f.true_value);
2201                if let Some(ref fv) = f.false_value {
2202                    stack.push(fv);
2203                }
2204            }
2205            Expression::Nvl2(f) => {
2206                stack.push(&f.this);
2207                stack.push(&f.true_value);
2208                stack.push(&f.false_value);
2209            }
2210            Expression::ConcatWs(f) => {
2211                stack.push(&f.separator);
2212                for e in &f.expressions {
2213                    stack.push(e);
2214                }
2215            }
2216            Expression::Count(f) => {
2217                if let Some(ref this) = f.this {
2218                    stack.push(this);
2219                }
2220                if let Some(ref filter) = f.filter {
2221                    stack.push(filter);
2222                }
2223            }
2224            Expression::GroupConcat(f) => {
2225                stack.push(&f.this);
2226                if let Some(ref sep) = f.separator {
2227                    stack.push(sep);
2228                }
2229                if let Some(ref filter) = f.filter {
2230                    stack.push(filter);
2231                }
2232            }
2233            Expression::StringAgg(f) => {
2234                stack.push(&f.this);
2235                if let Some(ref sep) = f.separator {
2236                    stack.push(sep);
2237                }
2238                if let Some(ref filter) = f.filter {
2239                    stack.push(filter);
2240                }
2241                if let Some(ref limit) = f.limit {
2242                    stack.push(limit);
2243                }
2244            }
2245            Expression::ListAgg(f) => {
2246                stack.push(&f.this);
2247                if let Some(ref sep) = f.separator {
2248                    stack.push(sep);
2249                }
2250                if let Some(ref filter) = f.filter {
2251                    stack.push(filter);
2252                }
2253            }
2254            Expression::SumIf(f) => {
2255                stack.push(&f.this);
2256                stack.push(&f.condition);
2257                if let Some(ref filter) = f.filter {
2258                    stack.push(filter);
2259                }
2260            }
2261            Expression::DateAdd(f) | Expression::DateSub(f) => {
2262                stack.push(&f.this);
2263                stack.push(&f.interval);
2264            }
2265            Expression::DateDiff(f) => {
2266                stack.push(&f.this);
2267                stack.push(&f.expression);
2268            }
2269            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
2270                stack.push(&f.this);
2271            }
2272            Expression::Extract(f) => {
2273                stack.push(&f.this);
2274            }
2275            Expression::Round(f) => {
2276                stack.push(&f.this);
2277                if let Some(ref d) = f.decimals {
2278                    stack.push(d);
2279                }
2280            }
2281            Expression::Floor(f) => {
2282                stack.push(&f.this);
2283                if let Some(ref s) = f.scale {
2284                    stack.push(s);
2285                }
2286                if let Some(ref t) = f.to {
2287                    stack.push(t);
2288                }
2289            }
2290            Expression::Ceil(f) => {
2291                stack.push(&f.this);
2292                if let Some(ref d) = f.decimals {
2293                    stack.push(d);
2294                }
2295                if let Some(ref t) = f.to {
2296                    stack.push(t);
2297                }
2298            }
2299            Expression::Log(f) => {
2300                stack.push(&f.this);
2301                if let Some(ref b) = f.base {
2302                    stack.push(b);
2303                }
2304            }
2305            Expression::AtTimeZone(f) => {
2306                stack.push(&f.this);
2307                stack.push(&f.zone);
2308            }
2309            Expression::Lead(f) | Expression::Lag(f) => {
2310                stack.push(&f.this);
2311                if let Some(ref off) = f.offset {
2312                    stack.push(off);
2313                }
2314                if let Some(ref def) = f.default {
2315                    stack.push(def);
2316                }
2317            }
2318            Expression::FirstValue(f) | Expression::LastValue(f) => {
2319                stack.push(&f.this);
2320            }
2321            Expression::NthValue(f) => {
2322                stack.push(&f.this);
2323                stack.push(&f.offset);
2324            }
2325            Expression::Position(f) => {
2326                stack.push(&f.substring);
2327                stack.push(&f.string);
2328                if let Some(ref start) = f.start {
2329                    stack.push(start);
2330                }
2331            }
2332            Expression::Decode(f) => {
2333                stack.push(&f.this);
2334                for (search, result) in &f.search_results {
2335                    stack.push(search);
2336                    stack.push(result);
2337                }
2338                if let Some(ref def) = f.default {
2339                    stack.push(def);
2340                }
2341            }
2342            Expression::CharFunc(f) => {
2343                for arg in &f.args {
2344                    stack.push(arg);
2345                }
2346            }
2347            Expression::ArraySort(f) => {
2348                stack.push(&f.this);
2349                if let Some(ref cmp) = f.comparator {
2350                    stack.push(cmp);
2351                }
2352            }
2353            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2354                stack.push(&f.this);
2355                stack.push(&f.separator);
2356                if let Some(ref nr) = f.null_replacement {
2357                    stack.push(nr);
2358                }
2359            }
2360            Expression::ArrayFilter(f) => {
2361                stack.push(&f.this);
2362                stack.push(&f.filter);
2363            }
2364            Expression::ArrayTransform(f) => {
2365                stack.push(&f.this);
2366                stack.push(&f.transform);
2367            }
2368            Expression::Sequence(f)
2369            | Expression::Generate(f)
2370            | Expression::ExplodingGenerateSeries(f) => {
2371                stack.push(&f.start);
2372                stack.push(&f.stop);
2373                if let Some(ref step) = f.step {
2374                    stack.push(step);
2375                }
2376            }
2377            Expression::JsonExtract(f)
2378            | Expression::JsonExtractScalar(f)
2379            | Expression::JsonQuery(f)
2380            | Expression::JsonValue(f) => {
2381                stack.push(&f.this);
2382                stack.push(&f.path);
2383            }
2384            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2385                stack.push(&f.this);
2386                for p in &f.paths {
2387                    stack.push(p);
2388                }
2389            }
2390            Expression::JsonObject(f) => {
2391                for (k, v) in &f.pairs {
2392                    stack.push(k);
2393                    stack.push(v);
2394                }
2395            }
2396            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2397                stack.push(&f.this);
2398                for (path, val) in &f.path_values {
2399                    stack.push(path);
2400                    stack.push(val);
2401                }
2402            }
2403            Expression::Overlay(f) => {
2404                stack.push(&f.this);
2405                stack.push(&f.replacement);
2406                stack.push(&f.from);
2407                if let Some(ref len) = f.length {
2408                    stack.push(len);
2409                }
2410            }
2411            Expression::Convert(f) => {
2412                stack.push(&f.this);
2413                if let Some(ref style) = f.style {
2414                    stack.push(style);
2415                }
2416            }
2417            Expression::ApproxPercentile(f) => {
2418                stack.push(&f.this);
2419                stack.push(&f.percentile);
2420                if let Some(ref acc) = f.accuracy {
2421                    stack.push(acc);
2422                }
2423                if let Some(ref filter) = f.filter {
2424                    stack.push(filter);
2425                }
2426            }
2427            Expression::Percentile(f)
2428            | Expression::PercentileCont(f)
2429            | Expression::PercentileDisc(f) => {
2430                stack.push(&f.this);
2431                stack.push(&f.percentile);
2432                if let Some(ref filter) = f.filter {
2433                    stack.push(filter);
2434                }
2435            }
2436            Expression::WithinGroup(f) => {
2437                stack.push(&f.this);
2438            }
2439            Expression::Left(f) | Expression::Right(f) => {
2440                stack.push(&f.this);
2441                stack.push(&f.length);
2442            }
2443            Expression::Repeat(f) => {
2444                stack.push(&f.this);
2445                stack.push(&f.times);
2446            }
2447            Expression::Lpad(f) | Expression::Rpad(f) => {
2448                stack.push(&f.this);
2449                stack.push(&f.length);
2450                if let Some(ref fill) = f.fill {
2451                    stack.push(fill);
2452                }
2453            }
2454            Expression::Split(f) => {
2455                stack.push(&f.this);
2456                stack.push(&f.delimiter);
2457            }
2458            Expression::RegexpLike(f) => {
2459                stack.push(&f.this);
2460                stack.push(&f.pattern);
2461                if let Some(ref flags) = f.flags {
2462                    stack.push(flags);
2463                }
2464            }
2465            Expression::RegexpReplace(f) => {
2466                stack.push(&f.this);
2467                stack.push(&f.pattern);
2468                stack.push(&f.replacement);
2469                if let Some(ref flags) = f.flags {
2470                    stack.push(flags);
2471                }
2472            }
2473            Expression::RegexpExtract(f) => {
2474                stack.push(&f.this);
2475                stack.push(&f.pattern);
2476                if let Some(ref group) = f.group {
2477                    stack.push(group);
2478                }
2479            }
2480            Expression::ToDate(f) => {
2481                stack.push(&f.this);
2482                if let Some(ref fmt) = f.format {
2483                    stack.push(fmt);
2484                }
2485            }
2486            Expression::ToTimestamp(f) => {
2487                stack.push(&f.this);
2488                if let Some(ref fmt) = f.format {
2489                    stack.push(fmt);
2490                }
2491            }
2492            Expression::DateFormat(f) | Expression::FormatDate(f) => {
2493                stack.push(&f.this);
2494                stack.push(&f.format);
2495            }
2496            Expression::LastDay(f) => {
2497                stack.push(&f.this);
2498            }
2499            Expression::FromUnixtime(f) => {
2500                stack.push(&f.this);
2501                if let Some(ref fmt) = f.format {
2502                    stack.push(fmt);
2503                }
2504            }
2505            Expression::UnixTimestamp(f) => {
2506                if let Some(ref this) = f.this {
2507                    stack.push(this);
2508                }
2509                if let Some(ref fmt) = f.format {
2510                    stack.push(fmt);
2511                }
2512            }
2513            Expression::MakeDate(f) => {
2514                stack.push(&f.year);
2515                stack.push(&f.month);
2516                stack.push(&f.day);
2517            }
2518            Expression::MakeTimestamp(f) => {
2519                stack.push(&f.year);
2520                stack.push(&f.month);
2521                stack.push(&f.day);
2522                stack.push(&f.hour);
2523                stack.push(&f.minute);
2524                stack.push(&f.second);
2525                if let Some(ref tz) = f.timezone {
2526                    stack.push(tz);
2527                }
2528            }
2529            Expression::TruncFunc(f) => {
2530                stack.push(&f.this);
2531                if let Some(ref d) = f.decimals {
2532                    stack.push(d);
2533                }
2534            }
2535            Expression::ArrayFunc(f) => {
2536                for e in &f.expressions {
2537                    stack.push(e);
2538                }
2539            }
2540            Expression::Unnest(f) => {
2541                stack.push(&f.this);
2542                for e in &f.expressions {
2543                    stack.push(e);
2544                }
2545            }
2546            Expression::StructFunc(f) => {
2547                for (_, e) in &f.fields {
2548                    stack.push(e);
2549                }
2550            }
2551            Expression::StructExtract(f) => {
2552                stack.push(&f.this);
2553            }
2554            Expression::NamedStruct(f) => {
2555                for (k, v) in &f.pairs {
2556                    stack.push(k);
2557                    stack.push(v);
2558                }
2559            }
2560            Expression::MapFunc(f) => {
2561                for k in &f.keys {
2562                    stack.push(k);
2563                }
2564                for v in &f.values {
2565                    stack.push(v);
2566                }
2567            }
2568            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2569                stack.push(&f.this);
2570                stack.push(&f.transform);
2571            }
2572            Expression::JsonArrayAgg(f) => {
2573                stack.push(&f.this);
2574                if let Some(ref filter) = f.filter {
2575                    stack.push(filter);
2576                }
2577            }
2578            Expression::JsonObjectAgg(f) => {
2579                stack.push(&f.key);
2580                stack.push(&f.value);
2581                if let Some(ref filter) = f.filter {
2582                    stack.push(filter);
2583                }
2584            }
2585            Expression::NTile(f) => {
2586                if let Some(ref n) = f.num_buckets {
2587                    stack.push(n);
2588                }
2589            }
2590            Expression::Rand(f) => {
2591                if let Some(ref s) = f.seed {
2592                    stack.push(s);
2593                }
2594                if let Some(ref lo) = f.lower {
2595                    stack.push(lo);
2596                }
2597                if let Some(ref hi) = f.upper {
2598                    stack.push(hi);
2599                }
2600            }
2601            Expression::Any(q) | Expression::All(q) => {
2602                stack.push(&q.this);
2603                stack.push(&q.subquery);
2604            }
2605            Expression::Overlaps(o) => {
2606                if let Some(ref this) = o.this {
2607                    stack.push(this);
2608                }
2609                if let Some(ref expr) = o.expression {
2610                    stack.push(expr);
2611                }
2612                if let Some(ref ls) = o.left_start {
2613                    stack.push(ls);
2614                }
2615                if let Some(ref le) = o.left_end {
2616                    stack.push(le);
2617                }
2618                if let Some(ref rs) = o.right_start {
2619                    stack.push(rs);
2620                }
2621                if let Some(ref re) = o.right_end {
2622                    stack.push(re);
2623                }
2624            }
2625            Expression::Interval(i) => {
2626                if let Some(ref this) = i.this {
2627                    stack.push(this);
2628                }
2629            }
2630            Expression::TimeStrToTime(f) => {
2631                stack.push(&f.this);
2632                if let Some(ref zone) = f.zone {
2633                    stack.push(zone);
2634                }
2635            }
2636            Expression::JSONBExtractScalar(f) => {
2637                stack.push(&f.this);
2638                stack.push(&f.expression);
2639                if let Some(ref jt) = f.json_type {
2640                    stack.push(jt);
2641                }
2642            }
2643            Expression::JSONExtract(f) => {
2644                stack.push(&f.this);
2645                stack.push(&f.expression);
2646                for e in &f.expressions {
2647                    stack.push(e);
2648                }
2649                if let Some(ref option) = f.option {
2650                    stack.push(option);
2651                }
2652                if let Some(ref on_condition) = f.on_condition {
2653                    stack.push(on_condition);
2654                }
2655            }
2656
2657            // === True leaves and non-expression-bearing nodes ===
2658            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
2659            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
2660            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
2661            _ => {}
2662        }
2663    }
2664}
2665
2666// ---------------------------------------------------------------------------
2667// Tests
2668// ---------------------------------------------------------------------------
2669
2670#[cfg(test)]
2671mod tests {
2672    use super::*;
2673    use crate::dialects::{Dialect, DialectType};
2674    use crate::expressions::DataType;
2675    use crate::optimizer::annotate_types::annotate_types;
2676    use crate::parse_one;
2677    use crate::schema::{MappingSchema, Schema};
2678
2679    fn parse(sql: &str) -> Expression {
2680        let dialect = Dialect::get(DialectType::Generic);
2681        let ast = dialect.parse(sql).unwrap();
2682        ast.into_iter().next().unwrap()
2683    }
2684
2685    #[test]
2686    fn test_simple_lineage() {
2687        let expr = parse("SELECT a FROM t");
2688        let node = lineage("a", &expr, None, false).unwrap();
2689
2690        assert_eq!(node.name, "a");
2691        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2692        // Should trace to t.a
2693        let names = node.downstream_names();
2694        assert!(
2695            names.iter().any(|n| n == "t.a"),
2696            "Expected t.a in downstream, got: {:?}",
2697            names
2698        );
2699    }
2700
2701    #[test]
2702    fn test_lineage_walk() {
2703        let root = LineageNode {
2704            name: "col_a".to_string(),
2705            expression: Expression::Null(crate::expressions::Null),
2706            source: Expression::Null(crate::expressions::Null),
2707            downstream: vec![LineageNode::new(
2708                "t.a",
2709                Expression::Null(crate::expressions::Null),
2710                Expression::Null(crate::expressions::Null),
2711            )],
2712            source_name: String::new(),
2713            source_kind: SourceKind::Unknown,
2714            source_alias: None,
2715            reference_node_name: String::new(),
2716        };
2717
2718        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2719        assert_eq!(names.len(), 2);
2720        assert_eq!(names[0], "col_a");
2721        assert_eq!(names[1], "t.a");
2722    }
2723
2724    #[test]
2725    fn test_aliased_column() {
2726        let expr = parse("SELECT a + 1 AS b FROM t");
2727        let node = lineage("b", &expr, None, false).unwrap();
2728
2729        assert_eq!(node.name, "b");
2730        // Should trace through the expression to t.a
2731        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2732        assert!(
2733            all_names.iter().any(|n| n.contains("a")),
2734            "Expected to trace to column a, got: {:?}",
2735            all_names
2736        );
2737    }
2738
2739    #[test]
2740    fn test_qualified_column() {
2741        let expr = parse("SELECT t.a FROM t");
2742        let node = lineage("a", &expr, None, false).unwrap();
2743
2744        assert_eq!(node.name, "a");
2745        let names = node.downstream_names();
2746        assert!(
2747            names.iter().any(|n| n == "t.a"),
2748            "Expected t.a, got: {:?}",
2749            names
2750        );
2751    }
2752
2753    #[test]
2754    fn test_unqualified_column() {
2755        let expr = parse("SELECT a FROM t");
2756        let node = lineage("a", &expr, None, false).unwrap();
2757
2758        // Unqualified but single source → resolved to t.a
2759        let names = node.downstream_names();
2760        assert!(
2761            names.iter().any(|n| n == "t.a"),
2762            "Expected t.a, got: {:?}",
2763            names
2764        );
2765    }
2766
2767    #[test]
2768    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2769        let query = "SELECT name FROM users";
2770        let dialect = Dialect::get(DialectType::BigQuery);
2771        let expr = dialect
2772            .parse(query)
2773            .unwrap()
2774            .into_iter()
2775            .next()
2776            .expect("expected one expression");
2777
2778        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2779        schema
2780            .add_table("users", &[("name".into(), DataType::Text)], None)
2781            .expect("schema setup");
2782
2783        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2784            .expect("lineage without schema");
2785        let mut expr_without = node_without_schema.expression.clone();
2786        annotate_types(
2787            &mut expr_without,
2788            Some(&schema),
2789            Some(DialectType::BigQuery),
2790        );
2791        assert_eq!(
2792            expr_without.inferred_type(),
2793            None,
2794            "Expected unresolved root type without schema-aware lineage qualification"
2795        );
2796
2797        let node_with_schema = lineage_with_schema(
2798            "name",
2799            &expr,
2800            Some(&schema),
2801            Some(DialectType::BigQuery),
2802            false,
2803        )
2804        .expect("lineage with schema");
2805        let mut expr_with = node_with_schema.expression.clone();
2806        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2807
2808        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2809    }
2810
2811    #[test]
2812    fn test_lineage_with_schema_correlated_scalar_subquery() {
2813        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2814        let dialect = Dialect::get(DialectType::BigQuery);
2815        let expr = dialect
2816            .parse(query)
2817            .unwrap()
2818            .into_iter()
2819            .next()
2820            .expect("expected one expression");
2821
2822        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2823        schema
2824            .add_table(
2825                "t1",
2826                &[("id".into(), DataType::BigInt { length: None })],
2827                None,
2828            )
2829            .expect("schema setup");
2830        schema
2831            .add_table(
2832                "t2",
2833                &[
2834                    ("id".into(), DataType::BigInt { length: None }),
2835                    ("val".into(), DataType::BigInt { length: None }),
2836                ],
2837                None,
2838            )
2839            .expect("schema setup");
2840
2841        let node = lineage_with_schema(
2842            "id",
2843            &expr,
2844            Some(&schema),
2845            Some(DialectType::BigQuery),
2846            false,
2847        )
2848        .expect("lineage_with_schema should handle correlated scalar subqueries");
2849
2850        assert_eq!(node.name, "id");
2851    }
2852
2853    #[test]
2854    fn test_lineage_with_schema_join_using() {
2855        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2856        let dialect = Dialect::get(DialectType::BigQuery);
2857        let expr = dialect
2858            .parse(query)
2859            .unwrap()
2860            .into_iter()
2861            .next()
2862            .expect("expected one expression");
2863
2864        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2865        schema
2866            .add_table(
2867                "t1",
2868                &[("a".into(), DataType::BigInt { length: None })],
2869                None,
2870            )
2871            .expect("schema setup");
2872        schema
2873            .add_table(
2874                "t2",
2875                &[("a".into(), DataType::BigInt { length: None })],
2876                None,
2877            )
2878            .expect("schema setup");
2879
2880        let node = lineage_with_schema(
2881            "a",
2882            &expr,
2883            Some(&schema),
2884            Some(DialectType::BigQuery),
2885            false,
2886        )
2887        .expect("lineage_with_schema should handle JOIN USING");
2888
2889        assert_eq!(node.name, "a");
2890    }
2891
2892    #[test]
2893    fn test_lineage_with_schema_qualified_table_name() {
2894        let query = "SELECT a FROM raw.t1";
2895        let dialect = Dialect::get(DialectType::BigQuery);
2896        let expr = dialect
2897            .parse(query)
2898            .unwrap()
2899            .into_iter()
2900            .next()
2901            .expect("expected one expression");
2902
2903        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2904        schema
2905            .add_table(
2906                "raw.t1",
2907                &[("a".into(), DataType::BigInt { length: None })],
2908                None,
2909            )
2910            .expect("schema setup");
2911
2912        let node = lineage_with_schema(
2913            "a",
2914            &expr,
2915            Some(&schema),
2916            Some(DialectType::BigQuery),
2917            false,
2918        )
2919        .expect("lineage_with_schema should handle dotted schema.table names");
2920
2921        assert_eq!(node.name, "a");
2922    }
2923
2924    #[test]
2925    fn test_lineage_with_schema_none_matches_lineage() {
2926        let expr = parse("SELECT a FROM t");
2927        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2928        let with_none =
2929            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2930
2931        assert_eq!(with_none.name, baseline.name);
2932        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2933    }
2934
2935    #[test]
2936    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2937        let dialect = Dialect::get(DialectType::BigQuery);
2938        let expr = dialect
2939            .parse("SELECT Name AS name FROM teams")
2940            .unwrap()
2941            .into_iter()
2942            .next()
2943            .expect("expected one expression");
2944
2945        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2946        schema
2947            .add_table(
2948                "teams",
2949                &[("Name".into(), DataType::String { length: None })],
2950                None,
2951            )
2952            .expect("schema setup");
2953
2954        let node = lineage_with_schema(
2955            "name",
2956            &expr,
2957            Some(&schema),
2958            Some(DialectType::BigQuery),
2959            false,
2960        )
2961        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2962
2963        let names = node.downstream_names();
2964        assert!(
2965            names.iter().any(|n| n == "teams.Name"),
2966            "Expected teams.Name in downstream, got: {:?}",
2967            names
2968        );
2969    }
2970
2971    #[test]
2972    fn test_lineage_bigquery_mixed_case_alias_lookup() {
2973        let dialect = Dialect::get(DialectType::BigQuery);
2974        let expr = dialect
2975            .parse("SELECT Name AS Name FROM teams")
2976            .unwrap()
2977            .into_iter()
2978            .next()
2979            .expect("expected one expression");
2980
2981        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2982            .expect("lineage should resolve mixed-case aliases in BigQuery");
2983
2984        assert_eq!(node.name, "name");
2985    }
2986
2987    #[test]
2988    fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2989        let expr = parse_one(
2990            r#"
2991SELECT date_val AS week_start
2992FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2993"#,
2994            DialectType::BigQuery,
2995        )
2996        .expect("parse");
2997
2998        let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2999            .expect("lineage should resolve UNNEST alias as a source");
3000        let child = node
3001            .downstream
3002            .first()
3003            .expect("week_start should have downstream lineage");
3004
3005        assert_eq!(child.name, "_0.date_val");
3006        assert_eq!(child.source_name, "_0");
3007        assert_eq!(child.source_kind, SourceKind::Virtual);
3008        assert_eq!(child.source_alias.as_deref(), Some("date_val"));
3009
3010        let Expression::Column(column) = &child.expression else {
3011            panic!(
3012                "expected downstream column expression, got {:?}",
3013                child.expression
3014            );
3015        };
3016        assert_eq!(column.name.name, "date_val");
3017        assert_eq!(
3018            column.table.as_ref().map(|table| table.name.as_str()),
3019            Some("_0")
3020        );
3021        assert!(
3022            matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
3023            "expected UNNEST source expression, got {:?}",
3024            child.source
3025        );
3026    }
3027
3028    #[test]
3029    fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
3030        let expr =
3031            parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
3032
3033        let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3034        let child = node.downstream.first().expect("id should have lineage");
3035
3036        assert_eq!(child.name, "date_val.id");
3037        assert_eq!(child.source_name, "date_val");
3038        assert_eq!(child.source_kind, SourceKind::Table);
3039        assert_eq!(child.source_alias, None);
3040    }
3041
3042    #[test]
3043    fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
3044        let expr = parse_one(
3045            r#"
3046SELECT a.a AS first_value, b.b AS second_value
3047FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
3048JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
3049"#,
3050            DialectType::BigQuery,
3051        )
3052        .expect("parse");
3053
3054        let first =
3055            lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3056        let second =
3057            lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3058
3059        let first_child = first.downstream.first().expect("first source");
3060        let second_child = second.downstream.first().expect("second source");
3061
3062        assert_eq!(first_child.name, "_0.a");
3063        assert_eq!(first_child.source_name, "_0");
3064        assert_eq!(first_child.source_alias.as_deref(), Some("a"));
3065        assert_eq!(first_child.source_kind, SourceKind::Virtual);
3066
3067        assert_eq!(second_child.name, "_1.b");
3068        assert_eq!(second_child.source_name, "_1");
3069        assert_eq!(second_child.source_alias.as_deref(), Some("b"));
3070        assert_eq!(second_child.source_kind, SourceKind::Virtual);
3071    }
3072
3073    #[test]
3074    fn test_lineage_table_backed_unnest_points_to_real_source_column() {
3075        let expr = parse_one(
3076            r#"
3077SELECT item.item AS item
3078FROM t JOIN UNNEST(t.items) AS item ON TRUE
3079"#,
3080            DialectType::BigQuery,
3081        )
3082        .expect("parse");
3083
3084        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3085        let virtual_child = node.downstream.first().expect("virtual item source");
3086        assert_eq!(virtual_child.name, "_0.item");
3087        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3088
3089        let real_child = virtual_child
3090            .downstream
3091            .first()
3092            .expect("UNNEST(t.items) should depend on t.items");
3093        assert_eq!(real_child.name, "t.items");
3094        assert_eq!(real_child.source_name, "t");
3095        assert_eq!(real_child.source_kind, SourceKind::Table);
3096    }
3097
3098    #[test]
3099    fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
3100        let expr = parse_one(
3101            r#"
3102SELECT item AS item
3103FROM t JOIN UNNEST(t.items) AS item ON TRUE
3104"#,
3105            DialectType::BigQuery,
3106        )
3107        .expect("parse");
3108
3109        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3110        let virtual_child = node.downstream.first().expect("virtual item source");
3111        assert_eq!(virtual_child.name, "_0.item");
3112        assert_eq!(virtual_child.source_name, "_0");
3113        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3114        assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
3115
3116        let real_child = virtual_child
3117            .downstream
3118            .first()
3119            .expect("UNNEST(t.items) should depend on t.items");
3120        assert_eq!(real_child.name, "t.items");
3121        assert_eq!(real_child.source_name, "t");
3122        assert_eq!(real_child.source_kind, SourceKind::Table);
3123    }
3124
3125    #[test]
3126    fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
3127        let cases = [
3128            (
3129                DialectType::PostgreSQL,
3130                "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
3131            ),
3132            (
3133                DialectType::Presto,
3134                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3135            ),
3136            (
3137                DialectType::Trino,
3138                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3139            ),
3140        ];
3141
3142        for (dialect, sql) in cases {
3143            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3144            let node = lineage("out", &expr, Some(dialect), false)
3145                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3146            let virtual_child = node
3147                .downstream
3148                .first()
3149                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3150
3151            assert_eq!(
3152                virtual_child.name, "_0.x",
3153                "unexpected virtual child for {dialect:?}"
3154            );
3155            assert_eq!(virtual_child.source_name, "_0");
3156            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3157            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3158
3159            let real_child = virtual_child
3160                .downstream
3161                .first()
3162                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3163            assert_eq!(real_child.name, "t.items");
3164            assert_eq!(real_child.source_kind, SourceKind::Table);
3165        }
3166    }
3167
3168    #[test]
3169    fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
3170        let cases = [
3171            (
3172                DialectType::Spark,
3173                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3174            ),
3175            (
3176                DialectType::Hive,
3177                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3178            ),
3179        ];
3180
3181        for (dialect, sql) in cases {
3182            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3183            let node = lineage("out", &expr, Some(dialect), false)
3184                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3185            let virtual_child = node
3186                .downstream
3187                .first()
3188                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3189
3190            assert_eq!(virtual_child.name, "_0.x");
3191            assert_eq!(virtual_child.source_name, "_0");
3192            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3193            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3194
3195            let real_child = virtual_child
3196                .downstream
3197                .first()
3198                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3199            assert_eq!(real_child.name, "t.items");
3200            assert_eq!(real_child.source_kind, SourceKind::Table);
3201        }
3202    }
3203
3204    #[test]
3205    fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
3206        let expr = parse_one(
3207            "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
3208            DialectType::Snowflake,
3209        )
3210        .expect("parse");
3211
3212        let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
3213        let virtual_child = node.downstream.first().expect("virtual flatten source");
3214        assert_eq!(virtual_child.name, "_0.value");
3215        assert_eq!(virtual_child.source_name, "_0");
3216        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3217        assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
3218
3219        let real_child = virtual_child
3220            .downstream
3221            .first()
3222            .expect("FLATTEN input should depend on raw_events.payload");
3223        assert_eq!(real_child.name, "raw_events.payload");
3224        assert_eq!(real_child.source_kind, SourceKind::Table);
3225    }
3226
3227    #[test]
3228    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
3229        let expr = parse_one(
3230            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3231            DialectType::Snowflake,
3232        )
3233        .expect("parse");
3234
3235        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3236        schema
3237            .add_table(
3238                "fact.some_daily_metrics",
3239                &[("date_utc".to_string(), DataType::Date)],
3240                None,
3241            )
3242            .expect("schema setup");
3243
3244        let node = lineage_with_schema(
3245            "recency",
3246            &expr,
3247            Some(&schema),
3248            Some(DialectType::Snowflake),
3249            false,
3250        )
3251        .expect("lineage_with_schema should not treat date part as a column");
3252
3253        let names = node.downstream_names();
3254        assert!(
3255            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3256            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3257            names
3258        );
3259        assert!(
3260            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3261            "Did not expect date part to appear as lineage column, got: {:?}",
3262            names
3263        );
3264    }
3265
3266    #[test]
3267    fn test_snowflake_datediff_parses_to_typed_ast() {
3268        let expr = parse_one(
3269            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3270            DialectType::Snowflake,
3271        )
3272        .expect("parse");
3273
3274        match expr {
3275            Expression::Select(select) => match &select.expressions[0] {
3276                Expression::Alias(alias) => match &alias.this {
3277                    Expression::DateDiff(f) => {
3278                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
3279                    }
3280                    other => panic!("expected DateDiff, got {other:?}"),
3281                },
3282                other => panic!("expected Alias, got {other:?}"),
3283            },
3284            other => panic!("expected Select, got {other:?}"),
3285        }
3286    }
3287
3288    #[test]
3289    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
3290        let expr = parse_one(
3291            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3292            DialectType::Snowflake,
3293        )
3294        .expect("parse");
3295
3296        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3297        schema
3298            .add_table(
3299                "fact.some_daily_metrics",
3300                &[("date_utc".to_string(), DataType::Date)],
3301                None,
3302            )
3303            .expect("schema setup");
3304
3305        let node = lineage_with_schema(
3306            "next_day",
3307            &expr,
3308            Some(&schema),
3309            Some(DialectType::Snowflake),
3310            false,
3311        )
3312        .expect("lineage_with_schema should not treat DATEADD date part as a column");
3313
3314        let names = node.downstream_names();
3315        assert!(
3316            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3317            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3318            names
3319        );
3320        assert!(
3321            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3322            "Did not expect date part to appear as lineage column, got: {:?}",
3323            names
3324        );
3325    }
3326
3327    #[test]
3328    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
3329        let expr = parse_one(
3330            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3331            DialectType::Snowflake,
3332        )
3333        .expect("parse");
3334
3335        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3336        schema
3337            .add_table(
3338                "fact.some_daily_metrics",
3339                &[("date_utc".to_string(), DataType::Date)],
3340                None,
3341            )
3342            .expect("schema setup");
3343
3344        let node = lineage_with_schema(
3345            "day_part",
3346            &expr,
3347            Some(&schema),
3348            Some(DialectType::Snowflake),
3349            false,
3350        )
3351        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
3352
3353        let names = node.downstream_names();
3354        assert!(
3355            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3356            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3357            names
3358        );
3359        assert!(
3360            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3361            "Did not expect date part to appear as lineage column, got: {:?}",
3362            names
3363        );
3364    }
3365
3366    #[test]
3367    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
3368        let expr = parse_one(
3369            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3370            DialectType::Snowflake,
3371        )
3372        .expect("parse");
3373
3374        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3375        schema
3376            .add_table(
3377                "fact.some_daily_metrics",
3378                &[("date_utc".to_string(), DataType::Date)],
3379                None,
3380            )
3381            .expect("schema setup");
3382
3383        let node = lineage_with_schema(
3384            "day_part",
3385            &expr,
3386            Some(&schema),
3387            Some(DialectType::Snowflake),
3388            false,
3389        )
3390        .expect("quoted DATE_PART should continue to work");
3391
3392        let names = node.downstream_names();
3393        assert!(
3394            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3395            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3396            names
3397        );
3398    }
3399
3400    #[test]
3401    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
3402        let expr = parse_one(
3403            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3404            DialectType::Snowflake,
3405        )
3406        .expect("parse");
3407
3408        match expr {
3409            Expression::Select(select) => match &select.expressions[0] {
3410                Expression::Alias(alias) => match &alias.this {
3411                    Expression::Function(f) => {
3412                        assert_eq!(f.name.to_uppercase(), "DATEADD");
3413                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3414                    }
3415                    other => panic!("expected generic DATEADD function, got {other:?}"),
3416                },
3417                other => panic!("expected Alias, got {other:?}"),
3418            },
3419            other => panic!("expected Select, got {other:?}"),
3420        }
3421    }
3422
3423    #[test]
3424    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
3425        let expr = parse_one(
3426            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3427            DialectType::Snowflake,
3428        )
3429        .expect("parse");
3430
3431        match expr {
3432            Expression::Select(select) => match &select.expressions[0] {
3433                Expression::Alias(alias) => match &alias.this {
3434                    Expression::Function(f) => {
3435                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
3436                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
3437                    }
3438                    other => panic!("expected generic DATE_PART function, got {other:?}"),
3439                },
3440                other => panic!("expected Alias, got {other:?}"),
3441            },
3442            other => panic!("expected Select, got {other:?}"),
3443        }
3444    }
3445
3446    #[test]
3447    fn test_snowflake_date_part_string_literal_stays_generic_function() {
3448        let expr = parse_one(
3449            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3450            DialectType::Snowflake,
3451        )
3452        .expect("parse");
3453
3454        match expr {
3455            Expression::Select(select) => match &select.expressions[0] {
3456                Expression::Alias(alias) => match &alias.this {
3457                    Expression::Function(f) => {
3458                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
3459                    }
3460                    other => panic!("expected generic DATE_PART function, got {other:?}"),
3461                },
3462                other => panic!("expected Alias, got {other:?}"),
3463            },
3464            other => panic!("expected Select, got {other:?}"),
3465        }
3466    }
3467
3468    #[test]
3469    fn test_lineage_join() {
3470        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3471
3472        let node_a = lineage("a", &expr, None, false).unwrap();
3473        let names_a = node_a.downstream_names();
3474        assert!(
3475            names_a.iter().any(|n| n == "t.a"),
3476            "Expected t.a, got: {:?}",
3477            names_a
3478        );
3479
3480        let node_b = lineage("b", &expr, None, false).unwrap();
3481        let names_b = node_b.downstream_names();
3482        assert!(
3483            names_b.iter().any(|n| n == "s.b"),
3484            "Expected s.b, got: {:?}",
3485            names_b
3486        );
3487    }
3488
3489    #[test]
3490    fn test_lineage_alias_leaf_has_resolved_source_name() {
3491        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
3492        let node = lineage("col1", &expr, None, false).unwrap();
3493
3494        // Keep alias in the display lineage edge.
3495        let names = node.downstream_names();
3496        assert!(
3497            names.iter().any(|n| n == "t1.col1"),
3498            "Expected aliased column edge t1.col1, got: {:?}",
3499            names
3500        );
3501
3502        // Leaf should expose the resolved base table for consumers.
3503        let leaf = node
3504            .downstream
3505            .iter()
3506            .find(|n| n.name == "t1.col1")
3507            .expect("Expected t1.col1 leaf");
3508        assert_eq!(leaf.source_name, "table1");
3509        match &leaf.source {
3510            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
3511            _ => panic!("Expected leaf source to be a table expression"),
3512        }
3513    }
3514
3515    #[test]
3516    fn test_lineage_derived_table() {
3517        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
3518        let node = lineage("a", &expr, None, false).unwrap();
3519
3520        assert_eq!(node.name, "a");
3521        // Should trace through the derived table to t.a
3522        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3523        assert!(
3524            all_names.iter().any(|n| n == "t.a"),
3525            "Expected to trace through derived table to t.a, got: {:?}",
3526            all_names
3527        );
3528    }
3529
3530    #[test]
3531    fn test_lineage_cte() {
3532        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
3533        let node = lineage("a", &expr, None, false).unwrap();
3534
3535        assert_eq!(node.name, "a");
3536        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3537        assert!(
3538            all_names.iter().any(|n| n == "t.a"),
3539            "Expected to trace through CTE to t.a, got: {:?}",
3540            all_names
3541        );
3542    }
3543
3544    #[test]
3545    fn test_lineage_union() {
3546        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
3547        let node = lineage("a", &expr, None, false).unwrap();
3548
3549        assert_eq!(node.name, "a");
3550        // Should have 2 downstream branches
3551        assert_eq!(
3552            node.downstream.len(),
3553            2,
3554            "Expected 2 branches for UNION, got {}",
3555            node.downstream.len()
3556        );
3557    }
3558
3559    #[test]
3560    fn test_lineage_cte_union() {
3561        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
3562        let node = lineage("a", &expr, None, false).unwrap();
3563
3564        // Should trace through CTE into both UNION branches
3565        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3566        assert!(
3567            all_names.len() >= 3,
3568            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
3569            all_names
3570        );
3571    }
3572
3573    #[test]
3574    fn test_lineage_star() {
3575        let expr = parse("SELECT * FROM t");
3576        let node = lineage("*", &expr, None, false).unwrap();
3577
3578        assert_eq!(node.name, "*");
3579        // Should have downstream for table t
3580        assert!(
3581            !node.downstream.is_empty(),
3582            "Star should produce downstream nodes"
3583        );
3584    }
3585
3586    #[test]
3587    fn test_lineage_subquery_in_select() {
3588        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
3589        let node = lineage("x", &expr, None, false).unwrap();
3590
3591        assert_eq!(node.name, "x");
3592        // Should have traced into the scalar subquery
3593        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3594        assert!(
3595            all_names.len() >= 2,
3596            "Expected tracing into scalar subquery, got: {:?}",
3597            all_names
3598        );
3599    }
3600
3601    #[test]
3602    fn test_lineage_multiple_columns() {
3603        let expr = parse("SELECT a, b FROM t");
3604
3605        let node_a = lineage("a", &expr, None, false).unwrap();
3606        let node_b = lineage("b", &expr, None, false).unwrap();
3607
3608        assert_eq!(node_a.name, "a");
3609        assert_eq!(node_b.name, "b");
3610
3611        // Each should trace independently
3612        let names_a = node_a.downstream_names();
3613        let names_b = node_b.downstream_names();
3614        assert!(names_a.iter().any(|n| n == "t.a"));
3615        assert!(names_b.iter().any(|n| n == "t.b"));
3616    }
3617
3618    #[test]
3619    fn test_get_source_tables() {
3620        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3621        let node = lineage("a", &expr, None, false).unwrap();
3622
3623        let tables = get_source_tables(&node);
3624        assert!(
3625            tables.contains("t"),
3626            "Expected source table 't', got: {:?}",
3627            tables
3628        );
3629    }
3630
3631    #[test]
3632    fn test_lineage_column_not_found() {
3633        let expr = parse("SELECT a FROM t");
3634        let result = lineage("nonexistent", &expr, None, false);
3635        assert!(result.is_err());
3636    }
3637
3638    #[test]
3639    fn test_lineage_nested_cte() {
3640        let expr = parse(
3641            "WITH cte1 AS (SELECT a FROM t), \
3642             cte2 AS (SELECT a FROM cte1) \
3643             SELECT a FROM cte2",
3644        );
3645        let node = lineage("a", &expr, None, false).unwrap();
3646
3647        // Should trace through cte2 → cte1 → t
3648        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3649        assert!(
3650            all_names.len() >= 3,
3651            "Expected to trace through nested CTEs, got: {:?}",
3652            all_names
3653        );
3654    }
3655
3656    #[test]
3657    fn test_trim_selects_true() {
3658        let expr = parse("SELECT a, b, c FROM t");
3659        let node = lineage("a", &expr, None, true).unwrap();
3660
3661        // The source should be trimmed to only include 'a'
3662        if let Expression::Select(select) = &node.source {
3663            assert_eq!(
3664                select.expressions.len(),
3665                1,
3666                "Trimmed source should have 1 expression, got {}",
3667                select.expressions.len()
3668            );
3669        } else {
3670            panic!("Expected Select source");
3671        }
3672    }
3673
3674    #[test]
3675    fn test_trim_selects_false() {
3676        let expr = parse("SELECT a, b, c FROM t");
3677        let node = lineage("a", &expr, None, false).unwrap();
3678
3679        // The source should keep all columns
3680        if let Expression::Select(select) = &node.source {
3681            assert_eq!(
3682                select.expressions.len(),
3683                3,
3684                "Untrimmed source should have 3 expressions"
3685            );
3686        } else {
3687            panic!("Expected Select source");
3688        }
3689    }
3690
3691    #[test]
3692    fn test_lineage_expression_in_select() {
3693        let expr = parse("SELECT a + b AS c FROM t");
3694        let node = lineage("c", &expr, None, false).unwrap();
3695
3696        // Should trace to both a and b from t
3697        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3698        assert!(
3699            all_names.len() >= 3,
3700            "Expected to trace a + b to both columns, got: {:?}",
3701            all_names
3702        );
3703    }
3704
3705    #[test]
3706    fn test_set_operation_by_index() {
3707        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3708
3709        // Trace column "a" which is at index 0
3710        let node = lineage("a", &expr, None, false).unwrap();
3711
3712        // UNION branches should be traced by index
3713        assert_eq!(node.downstream.len(), 2);
3714    }
3715
3716    // --- Tests for column lineage inside function calls (issue #18) ---
3717
3718    fn print_node(node: &LineageNode, indent: usize) {
3719        let pad = "  ".repeat(indent);
3720        println!(
3721            "{pad}name={:?} source_name={:?}",
3722            node.name, node.source_name
3723        );
3724        for child in &node.downstream {
3725            print_node(child, indent + 1);
3726        }
3727    }
3728
3729    #[test]
3730    fn test_issue18_repro() {
3731        // Exact scenario from the issue
3732        let query = "SELECT UPPER(name) as upper_name FROM users";
3733        println!("Query: {query}\n");
3734
3735        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3736        let exprs = dialect.parse(query).unwrap();
3737        let expr = &exprs[0];
3738
3739        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3740        println!("lineage(\"upper_name\"):");
3741        print_node(&node, 1);
3742
3743        let names = node.downstream_names();
3744        assert!(
3745            names.iter().any(|n| n == "users.name"),
3746            "Expected users.name in downstream, got: {:?}",
3747            names
3748        );
3749    }
3750
3751    #[test]
3752    fn test_lineage_bigquery_safe_namespace_issue207() {
3753        let query = r#"
3754WITH import_cte AS (
3755  SELECT timestamp, data, operation
3756  FROM `project`.`dataset`.`source_table`
3757),
3758transform_cte AS (
3759  SELECT
3760    timestamp,
3761    SAFE.PARSE_JSON(data) AS json_data
3762  FROM import_cte
3763)
3764SELECT json_data FROM transform_cte
3765"#;
3766        let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3767        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3768            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3769        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3770
3771        assert!(
3772            names.iter().any(|name| name == "source_table.data"),
3773            "expected source_table.data in lineage, got {names:?}"
3774        );
3775        assert!(
3776            !names
3777                .iter()
3778                .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3779            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3780        );
3781    }
3782
3783    #[test]
3784    fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3785        let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3786        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3787            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3788        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3789
3790        assert!(
3791            names.iter().any(|name| name == "t.data"),
3792            "expected t.data in lineage, got {names:?}"
3793        );
3794        assert!(
3795            !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3796            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3797        );
3798    }
3799
3800    #[test]
3801    fn test_lineage_method_call_receiver_control() {
3802        let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3803        let node = lineage("out", &expr, None, false).expect("lineage");
3804        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3805
3806        assert!(
3807            names.iter().any(|name| name == "t.obj"),
3808            "expected ordinary method receiver to remain in lineage, got {names:?}"
3809        );
3810        assert!(
3811            names.iter().any(|name| name == "t.arg"),
3812            "expected method argument in lineage, got {names:?}"
3813        );
3814    }
3815
3816    #[test]
3817    fn test_lineage_upper_function() {
3818        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3819        let node = lineage("upper_name", &expr, None, false).unwrap();
3820
3821        let names = node.downstream_names();
3822        assert!(
3823            names.iter().any(|n| n == "users.name"),
3824            "Expected users.name in downstream, got: {:?}",
3825            names
3826        );
3827    }
3828
3829    #[test]
3830    fn test_lineage_round_function() {
3831        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3832        let node = lineage("rounded", &expr, None, false).unwrap();
3833
3834        let names = node.downstream_names();
3835        assert!(
3836            names.iter().any(|n| n == "products.price"),
3837            "Expected products.price in downstream, got: {:?}",
3838            names
3839        );
3840    }
3841
3842    #[test]
3843    fn test_lineage_coalesce_function() {
3844        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3845        let node = lineage("val", &expr, None, false).unwrap();
3846
3847        let names = node.downstream_names();
3848        assert!(
3849            names.iter().any(|n| n == "t.a"),
3850            "Expected t.a in downstream, got: {:?}",
3851            names
3852        );
3853        assert!(
3854            names.iter().any(|n| n == "t.b"),
3855            "Expected t.b in downstream, got: {:?}",
3856            names
3857        );
3858    }
3859
3860    #[test]
3861    fn test_lineage_count_function() {
3862        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3863        let node = lineage("cnt", &expr, None, false).unwrap();
3864
3865        let names = node.downstream_names();
3866        assert!(
3867            names.iter().any(|n| n == "t.id"),
3868            "Expected t.id in downstream, got: {:?}",
3869            names
3870        );
3871    }
3872
3873    #[test]
3874    fn test_lineage_sum_function() {
3875        let expr = parse("SELECT SUM(amount) AS total FROM t");
3876        let node = lineage("total", &expr, None, false).unwrap();
3877
3878        let names = node.downstream_names();
3879        assert!(
3880            names.iter().any(|n| n == "t.amount"),
3881            "Expected t.amount in downstream, got: {:?}",
3882            names
3883        );
3884    }
3885
3886    #[test]
3887    fn test_lineage_case_with_nested_functions() {
3888        let expr =
3889            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3890        let node = lineage("result", &expr, None, false).unwrap();
3891
3892        let names = node.downstream_names();
3893        assert!(
3894            names.iter().any(|n| n == "t.x"),
3895            "Expected t.x in downstream, got: {:?}",
3896            names
3897        );
3898        assert!(
3899            names.iter().any(|n| n == "t.name"),
3900            "Expected t.name in downstream, got: {:?}",
3901            names
3902        );
3903    }
3904
3905    #[test]
3906    fn test_lineage_substring_function() {
3907        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3908        let node = lineage("short", &expr, None, false).unwrap();
3909
3910        let names = node.downstream_names();
3911        assert!(
3912            names.iter().any(|n| n == "t.name"),
3913            "Expected t.name in downstream, got: {:?}",
3914            names
3915        );
3916    }
3917
3918    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
3919
3920    #[test]
3921    fn test_lineage_cte_select_star() {
3922        // Ported from sqlglot: test_lineage_source_with_star
3923        // WITH y AS (SELECT * FROM x) SELECT a FROM y
3924        // After star expansion: SELECT y.a AS a FROM y
3925        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3926        let node = lineage("a", &expr, None, false).unwrap();
3927
3928        assert_eq!(node.name, "a");
3929        // Should successfully resolve column 'a' through the CTE
3930        // (previously failed with "Cannot find column 'a' in query")
3931        assert!(
3932            !node.downstream.is_empty(),
3933            "Expected downstream nodes tracing through CTE, got none"
3934        );
3935    }
3936
3937    #[test]
3938    fn test_lineage_schema_less_cte_star_passthrough_resolves_base_column() {
3939        let expr = parse("WITH c AS (SELECT * FROM t) SELECT c.x FROM c");
3940        let node = lineage("x", &expr, None, false).unwrap();
3941
3942        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3943        assert!(
3944            all_names.iter().any(|name| name == "t.x"),
3945            "Expected schema-less CTE star passthrough to reach t.x, got: {:?}",
3946            all_names
3947        );
3948
3949        let cte_node = node
3950            .walk()
3951            .find(|child| child.source_kind == SourceKind::Cte && child.source_name == "c")
3952            .expect("expected CTE hop with source_name c");
3953        assert_eq!(cte_node.source_kind, SourceKind::Cte);
3954        assert_eq!(cte_node.source_name, "c");
3955    }
3956
3957    #[test]
3958    fn test_lineage_schema_less_cte_star_passthrough_with_aggregation() {
3959        let expr = parse(
3960            "WITH c AS (SELECT * FROM t) \
3961             SELECT SUM(c.x) AS s FROM c GROUP BY 1",
3962        );
3963        let node = lineage("s", &expr, None, false).unwrap();
3964
3965        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3966        assert!(
3967            all_names.iter().any(|name| name == "t.x"),
3968            "Expected aggregate over CTE star passthrough to reach t.x, got: {:?}",
3969            all_names
3970        );
3971    }
3972
3973    #[test]
3974    fn test_lineage_schema_less_cte_star_passthrough_with_join_and_alias() {
3975        let expr = parse(
3976            "WITH a AS (SELECT * FROM t1), b AS (SELECT * FROM t2) \
3977             SELECT SUM(b.x) AS s FROM a LEFT JOIN b ON b.id = a.id GROUP BY a.k",
3978        );
3979        let node = lineage("s", &expr, None, false).unwrap();
3980
3981        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3982        assert!(
3983            all_names.iter().any(|name| name == "t2.x"),
3984            "Expected joined CTE star passthrough to reach t2.x, got: {:?}",
3985            all_names
3986        );
3987    }
3988
3989    #[test]
3990    fn test_lineage_schema_less_chained_cte_star_passthrough() {
3991        let expr = parse(
3992            "WITH c1 AS (SELECT * FROM t), \
3993             c2 AS (SELECT * FROM c1), \
3994             c3 AS (SELECT * FROM c2) \
3995             SELECT c3.x FROM c3",
3996        );
3997        let node = lineage("x", &expr, None, false).unwrap();
3998
3999        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4000        assert!(
4001            all_names.iter().any(|name| name == "t.x"),
4002            "Expected chained CTE star passthrough to reach t.x, got: {:?}",
4003            all_names
4004        );
4005    }
4006
4007    #[test]
4008    fn test_lineage_schema_less_unqualified_star_with_multiple_sources_does_not_guess() {
4009        let expr = parse("SELECT * FROM t1 JOIN t2 ON t1.id = t2.id");
4010        let result = lineage("x", &expr, None, false);
4011
4012        assert!(
4013            result.is_err(),
4014            "Unqualified star over multiple sources should remain ambiguous, got: {:?}",
4015            result
4016        );
4017    }
4018
4019    #[test]
4020    fn test_lineage_cte_select_star_renamed_column() {
4021        // dbt standard pattern: CTE with column rename + outer SELECT *
4022        // This is the primary use case for dbt projects (jaffle-shop etc.)
4023        let expr =
4024            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
4025        let node = lineage("customer_id", &expr, None, false).unwrap();
4026
4027        assert_eq!(node.name, "customer_id");
4028        // Should trace customer_id → renamed CTE → source.id
4029        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4030        assert!(
4031            all_names.len() >= 2,
4032            "Expected at least 2 nodes (customer_id → source), got: {:?}",
4033            all_names
4034        );
4035    }
4036
4037    #[test]
4038    fn test_lineage_cte_select_star_multiple_columns() {
4039        // CTE exposes multiple columns, outer SELECT * should resolve each
4040        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
4041
4042        for col in &["a", "b", "c"] {
4043            let node = lineage(col, &expr, None, false).unwrap();
4044            assert_eq!(node.name, *col);
4045            // Verify lineage resolves without error (star expanded to explicit columns)
4046            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4047            assert!(
4048                all_names.len() >= 2,
4049                "Expected at least 2 nodes for column {}, got: {:?}",
4050                col,
4051                all_names
4052            );
4053        }
4054    }
4055
4056    #[test]
4057    fn test_lineage_nested_cte_select_star() {
4058        // Nested CTE star expansion: cte2 references cte1 via SELECT *
4059        let expr = parse(
4060            "WITH cte1 AS (SELECT a FROM t), \
4061             cte2 AS (SELECT * FROM cte1) \
4062             SELECT * FROM cte2",
4063        );
4064        let node = lineage("a", &expr, None, false).unwrap();
4065
4066        assert_eq!(node.name, "a");
4067        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4068        assert!(
4069            all_names.len() >= 3,
4070            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
4071            all_names
4072        );
4073    }
4074
4075    #[test]
4076    fn test_lineage_three_level_nested_cte_star() {
4077        // Three-level nested CTE: cte3 → cte2 → cte1 → t
4078        let expr = parse(
4079            "WITH cte1 AS (SELECT x FROM t), \
4080             cte2 AS (SELECT * FROM cte1), \
4081             cte3 AS (SELECT * FROM cte2) \
4082             SELECT * FROM cte3",
4083        );
4084        let node = lineage("x", &expr, None, false).unwrap();
4085
4086        assert_eq!(node.name, "x");
4087        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4088        assert!(
4089            all_names.len() >= 4,
4090            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
4091            all_names
4092        );
4093    }
4094
4095    #[test]
4096    fn test_lineage_cte_union_star() {
4097        // CTE with UNION body, outer SELECT * should resolve from left branch
4098        let expr = parse(
4099            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
4100             SELECT * FROM cte",
4101        );
4102        let node = lineage("a", &expr, None, false).unwrap();
4103
4104        assert_eq!(node.name, "a");
4105        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4106        assert!(
4107            all_names.len() >= 2,
4108            "Expected at least 2 nodes for CTE union star, got: {:?}",
4109            all_names
4110        );
4111    }
4112
4113    #[test]
4114    fn test_lineage_cte_star_unknown_table() {
4115        // When CTE references an unknown table, star expansion is skipped gracefully
4116        // and lineage falls back to normal resolution (which may fail)
4117        let expr = parse(
4118            "WITH cte AS (SELECT * FROM unknown_table) \
4119             SELECT * FROM cte",
4120        );
4121        // This should not panic — it may succeed or fail depending on resolution,
4122        // but should not crash
4123        let _result = lineage("x", &expr, None, false);
4124    }
4125
4126    #[test]
4127    fn test_lineage_cte_explicit_columns() {
4128        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
4129        let expr = parse(
4130            "WITH cte(x, y) AS (SELECT a, b FROM t) \
4131             SELECT * FROM cte",
4132        );
4133        let node = lineage("x", &expr, None, false).unwrap();
4134        assert_eq!(node.name, "x");
4135    }
4136
4137    #[test]
4138    fn test_lineage_cte_qualified_star() {
4139        // Qualified star: SELECT cte.* FROM cte
4140        let expr = parse(
4141            "WITH cte AS (SELECT a, b FROM t) \
4142             SELECT cte.* FROM cte",
4143        );
4144        for col in &["a", "b"] {
4145            let node = lineage(col, &expr, None, false).unwrap();
4146            assert_eq!(node.name, *col);
4147            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4148            assert!(
4149                all_names.len() >= 2,
4150                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
4151                col,
4152                all_names
4153            );
4154        }
4155    }
4156
4157    #[test]
4158    fn test_lineage_subquery_select_star() {
4159        // Ported from sqlglot: test_select_star
4160        // SELECT x FROM (SELECT * FROM table_a)
4161        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
4162        let node = lineage("x", &expr, None, false).unwrap();
4163
4164        assert_eq!(node.name, "x");
4165        assert!(
4166            !node.downstream.is_empty(),
4167            "Expected downstream nodes for subquery with SELECT *, got none"
4168        );
4169    }
4170
4171    #[test]
4172    fn test_lineage_cte_star_with_schema_external_table() {
4173        // CTE references an external table via SELECT * — schema enables expansion
4174        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
4175SELECT * FROM orders"#;
4176        let expr = parse(sql);
4177
4178        let mut schema = MappingSchema::new();
4179        let cols = vec![
4180            ("order_id".to_string(), DataType::Unknown),
4181            ("customer_id".to_string(), DataType::Unknown),
4182            ("amount".to_string(), DataType::Unknown),
4183        ];
4184        schema.add_table("stg_orders", &cols, None).unwrap();
4185
4186        let node =
4187            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4188                .unwrap();
4189        assert_eq!(node.name, "order_id");
4190    }
4191
4192    #[test]
4193    fn test_lineage_cte_star_with_schema_three_part_name() {
4194        // CTE references an external table with fully-qualified 3-part name
4195        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
4196SELECT * FROM orders"#;
4197        let expr = parse(sql);
4198
4199        let mut schema = MappingSchema::new();
4200        let cols = vec![
4201            ("order_id".to_string(), DataType::Unknown),
4202            ("customer_id".to_string(), DataType::Unknown),
4203        ];
4204        schema
4205            .add_table("db.schema.stg_orders", &cols, None)
4206            .unwrap();
4207
4208        let node = lineage_with_schema(
4209            "customer_id",
4210            &expr,
4211            Some(&schema as &dyn Schema),
4212            None,
4213            false,
4214        )
4215        .unwrap();
4216        assert_eq!(node.name, "customer_id");
4217    }
4218
4219    #[test]
4220    fn test_lineage_cte_star_with_schema_nested() {
4221        // Nested CTEs: outer CTE references inner CTE with SELECT *,
4222        // inner CTE references external table with SELECT *
4223        let sql = r#"WITH
4224            raw AS (SELECT * FROM external_table),
4225            enriched AS (SELECT * FROM raw)
4226        SELECT * FROM enriched"#;
4227        let expr = parse(sql);
4228
4229        let mut schema = MappingSchema::new();
4230        let cols = vec![
4231            ("id".to_string(), DataType::Unknown),
4232            ("name".to_string(), DataType::Unknown),
4233        ];
4234        schema.add_table("external_table", &cols, None).unwrap();
4235
4236        let node =
4237            lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4238        assert_eq!(node.name, "name");
4239    }
4240
4241    #[test]
4242    fn test_lineage_cte_qualified_star_with_schema() {
4243        // CTE uses qualified star (orders.*) from a CTE whose columns
4244        // come from an external table via SELECT *
4245        let sql = r#"WITH
4246            orders AS (SELECT * FROM stg_orders),
4247            enriched AS (
4248                SELECT orders.*, 'extra' AS extra
4249                FROM orders
4250            )
4251        SELECT * FROM enriched"#;
4252        let expr = parse(sql);
4253
4254        let mut schema = MappingSchema::new();
4255        let cols = vec![
4256            ("order_id".to_string(), DataType::Unknown),
4257            ("total".to_string(), DataType::Unknown),
4258        ];
4259        schema.add_table("stg_orders", &cols, None).unwrap();
4260
4261        let node =
4262            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4263                .unwrap();
4264        assert_eq!(node.name, "order_id");
4265
4266        // Also verify the extra column works
4267        let extra =
4268            lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4269        assert_eq!(extra.name, "extra");
4270    }
4271
4272    #[test]
4273    fn test_lineage_cte_star_without_schema_still_works() {
4274        // Without schema, CTE-to-CTE star expansion still works
4275        let sql = r#"WITH
4276            cte1 AS (SELECT id, name FROM raw_table),
4277            cte2 AS (SELECT * FROM cte1)
4278        SELECT * FROM cte2"#;
4279        let expr = parse(sql);
4280
4281        // No schema — should still resolve through CTE chain
4282        let node = lineage("id", &expr, None, false).unwrap();
4283        assert_eq!(node.name, "id");
4284    }
4285
4286    #[test]
4287    fn test_lineage_nested_cte_star_with_join_and_schema() {
4288        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
4289        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
4290        let sql = r#"WITH
4291base_orders AS (
4292    SELECT * FROM stg_orders
4293),
4294with_payments AS (
4295    SELECT
4296        base_orders.*,
4297        p.amount
4298    FROM base_orders
4299    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
4300),
4301final_cte AS (
4302    SELECT * FROM with_payments
4303)
4304SELECT * FROM final_cte"#;
4305        let expr = parse(sql);
4306
4307        let mut schema = MappingSchema::new();
4308        let order_cols = vec![
4309            (
4310                "order_id".to_string(),
4311                crate::expressions::DataType::Unknown,
4312            ),
4313            (
4314                "customer_id".to_string(),
4315                crate::expressions::DataType::Unknown,
4316            ),
4317            ("status".to_string(), crate::expressions::DataType::Unknown),
4318        ];
4319        let pay_cols = vec![
4320            (
4321                "payment_id".to_string(),
4322                crate::expressions::DataType::Unknown,
4323            ),
4324            (
4325                "order_id".to_string(),
4326                crate::expressions::DataType::Unknown,
4327            ),
4328            ("amount".to_string(), crate::expressions::DataType::Unknown),
4329        ];
4330        schema.add_table("stg_orders", &order_cols, None).unwrap();
4331        schema.add_table("stg_payments", &pay_cols, None).unwrap();
4332
4333        // order_id should trace back to stg_orders
4334        let node =
4335            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4336                .unwrap();
4337        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4338
4339        // The leaf should be "stg_orders.order_id" (not just "order_id")
4340        let has_table_qualified = all_names
4341            .iter()
4342            .any(|n| n.contains('.') && n.contains("order_id"));
4343        assert!(
4344            has_table_qualified,
4345            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
4346            all_names
4347        );
4348
4349        // amount should trace back to stg_payments
4350        let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
4351            .unwrap();
4352        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4353
4354        let has_table_qualified = all_names
4355            .iter()
4356            .any(|n| n.contains('.') && n.contains("amount"));
4357        assert!(
4358            has_table_qualified,
4359            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
4360            all_names
4361        );
4362    }
4363
4364    #[test]
4365    fn test_lineage_cte_alias_resolution() {
4366        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
4367        let sql = r#"WITH import_stg_items AS (
4368    SELECT item_id, name, status FROM stg_items
4369)
4370SELECT base.item_id, base.status
4371FROM import_stg_items AS base"#;
4372        let expr = parse(sql);
4373
4374        let node = lineage("item_id", &expr, None, false).unwrap();
4375        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4376        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
4377        assert!(
4378            all_names.iter().any(|n| n == "stg_items.item_id"),
4379            "Expected leaf 'stg_items.item_id', got: {:?}",
4380            all_names
4381        );
4382    }
4383
4384    #[test]
4385    fn test_lineage_cte_alias_with_schema_and_star() {
4386        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
4387        let sql = r#"WITH import_stg AS (
4388    SELECT * FROM stg_items
4389)
4390SELECT base.item_id, base.status
4391FROM import_stg AS base"#;
4392        let expr = parse(sql);
4393
4394        let mut schema = MappingSchema::new();
4395        schema
4396            .add_table(
4397                "stg_items",
4398                &[
4399                    ("item_id".to_string(), DataType::Unknown),
4400                    ("name".to_string(), DataType::Unknown),
4401                    ("status".to_string(), DataType::Unknown),
4402                ],
4403                None,
4404            )
4405            .unwrap();
4406
4407        let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
4408            .unwrap();
4409        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4410        assert!(
4411            all_names.iter().any(|n| n == "stg_items.item_id"),
4412            "Expected leaf 'stg_items.item_id', got: {:?}",
4413            all_names
4414        );
4415    }
4416
4417    #[test]
4418    fn test_lineage_cte_alias_with_join() {
4419        // Multiple CTE aliases in a JOIN: each should resolve independently
4420        let sql = r#"WITH
4421    import_users AS (SELECT id, name FROM users),
4422    import_orders AS (SELECT id, user_id, amount FROM orders)
4423SELECT u.name, o.amount
4424FROM import_users AS u
4425LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
4426        let expr = parse(sql);
4427
4428        let node = lineage("name", &expr, None, false).unwrap();
4429        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4430        assert!(
4431            all_names.iter().any(|n| n == "users.name"),
4432            "Expected leaf 'users.name', got: {:?}",
4433            all_names
4434        );
4435
4436        let node = lineage("amount", &expr, None, false).unwrap();
4437        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4438        assert!(
4439            all_names.iter().any(|n| n == "orders.amount"),
4440            "Expected leaf 'orders.amount', got: {:?}",
4441            all_names
4442        );
4443    }
4444
4445    // -----------------------------------------------------------------------
4446    // Quoted CTE name tests — verifying SQL identifier case semantics
4447    // -----------------------------------------------------------------------
4448
4449    #[test]
4450    fn test_lineage_unquoted_cte_case_insensitive() {
4451        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
4452        // MyCte and MYCTE should match.
4453        let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
4454        let node = lineage("col", &expr, None, false).unwrap();
4455        assert_eq!(node.name, "col");
4456        assert!(
4457            !node.downstream.is_empty(),
4458            "Unquoted CTE should resolve case-insensitively"
4459        );
4460    }
4461
4462    #[test]
4463    fn test_lineage_quoted_cte_case_preserved() {
4464        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
4465        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
4466        let node = lineage("col", &expr, None, false).unwrap();
4467        assert_eq!(node.name, "col");
4468        assert!(
4469            !node.downstream.is_empty(),
4470            "Quoted CTE with matching case should resolve"
4471        );
4472    }
4473
4474    #[test]
4475    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
4476        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
4477        // sqlglot treats this as a table reference, not a CTE match.
4478        // Star expansion should NOT resolve through the CTE.
4479        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
4480        // lineage("col", ...) should fail because "mycte" is treated as an external
4481        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
4482        let result = lineage("col", &expr, None, false);
4483        assert!(
4484            result.is_err(),
4485            "Quoted CTE with case mismatch should not expand star: {:?}",
4486            result
4487        );
4488    }
4489
4490    #[test]
4491    fn test_lineage_mixed_quoted_unquoted_cte() {
4492        // Mix of unquoted and quoted CTEs in a nested chain.
4493        let expr = parse(
4494            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
4495        );
4496        let node = lineage("a", &expr, None, false).unwrap();
4497        assert_eq!(node.name, "a");
4498        assert!(
4499            !node.downstream.is_empty(),
4500            "Mixed quoted/unquoted CTE chain should resolve"
4501        );
4502    }
4503
4504    // -----------------------------------------------------------------------
4505    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
4506    // -----------------------------------------------------------------------
4507    //
4508    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
4509    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
4510    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
4511    // direct string comparison for CTE name matching, ignoring the quoted status.
4512    //
4513    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
4514    // and unquoted as case-insensitive. The scope system should do the same.
4515    //
4516    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
4517    // which is broader than the star expansion scope of this PR.
4518
4519    #[test]
4520    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
4521        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
4522        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
4523        // as "mycte" incorrectly resolves to the CTE.
4524        //
4525        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
4526        // case-sensitive: "mycte" should NOT match CTE "MyCte".
4527        //
4528        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
4529        // this test should fail — update the assertion to match correct behavior:
4530        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
4531        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
4532        let node = lineage("col", &expr, None, false).unwrap();
4533        assert!(!node.downstream.is_empty());
4534        let child = &node.downstream[0];
4535        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
4536        assert_eq!(
4537            child.source_name, "MyCte",
4538            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4539             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4540        );
4541    }
4542
4543    #[test]
4544    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
4545        // Known bug: same as above but with qualified column reference ("mycte".col).
4546        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
4547        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
4548        //
4549        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
4550        // this test should fail — update to assert source_name != "MyCte".
4551        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
4552        let node = lineage("col", &expr, None, false).unwrap();
4553        assert!(!node.downstream.is_empty());
4554        let child = &node.downstream[0];
4555        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
4556        assert_eq!(
4557            child.source_name, "MyCte",
4558            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4559             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4560        );
4561    }
4562
4563    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
4564
4565    /// sqlglot: test_node_name_doesnt_contain_comment
4566    /// Comments in column expressions should not affect lineage resolution.
4567    /// NOTE: This test uses SELECT * from a derived table, which is a separate
4568    /// known limitation in polyglot-sql (star expansion in subqueries).
4569    #[test]
4570    #[ignore = "requires derived table star expansion (separate issue)"]
4571    fn test_node_name_doesnt_contain_comment() {
4572        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
4573        let node = lineage("x", &expr, None, false).unwrap();
4574
4575        assert_eq!(node.name, "x");
4576        assert!(!node.downstream.is_empty());
4577    }
4578
4579    /// A line comment between SELECT and the first column wraps the column
4580    /// in an Annotated node. Lineage must unwrap it to find the column name.
4581    /// Verify that commented and uncommented queries produce identical lineage.
4582    #[test]
4583    fn test_comment_before_first_column_in_cte() {
4584        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
4585        let sql_without_comment = "with t as (select 1 as a) select a from t";
4586
4587        // Without comment — baseline
4588        let expr_ok = parse(sql_without_comment);
4589        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
4590
4591        // With comment — should produce identical lineage
4592        let expr_comment = parse(sql_with_comment);
4593        let node_comment = lineage("a", &expr_comment, None, false)
4594            .expect("with comment before first column should succeed");
4595
4596        assert_eq!(node_ok.name, node_comment.name, "node names should match");
4597        assert_eq!(
4598            node_ok.downstream_names(),
4599            node_comment.downstream_names(),
4600            "downstream lineage should be identical with or without comment"
4601        );
4602    }
4603
4604    /// Block comment between SELECT and first column.
4605    #[test]
4606    fn test_block_comment_before_first_column() {
4607        let sql = "with t as (select 1 as a) select /* section */ a from t";
4608        let expr = parse(sql);
4609        let node = lineage("a", &expr, None, false)
4610            .expect("block comment before first column should succeed");
4611        assert_eq!(node.name, "a");
4612        assert!(
4613            !node.downstream.is_empty(),
4614            "should have downstream lineage"
4615        );
4616    }
4617
4618    /// Comment before first column should not affect second column resolution.
4619    #[test]
4620    fn test_comment_before_first_column_second_col_ok() {
4621        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
4622        let expr = parse(sql);
4623
4624        let node_a =
4625            lineage("a", &expr, None, false).expect("column a with comment should succeed");
4626        assert_eq!(node_a.name, "a");
4627
4628        let node_b =
4629            lineage("b", &expr, None, false).expect("column b with comment should succeed");
4630        assert_eq!(node_b.name, "b");
4631    }
4632
4633    /// Aliased column with preceding comment.
4634    #[test]
4635    fn test_comment_before_aliased_column() {
4636        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
4637        let expr = parse(sql);
4638        let node =
4639            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
4640        assert_eq!(node.name, "y");
4641        assert!(
4642            !node.downstream.is_empty(),
4643            "aliased column should have downstream lineage"
4644        );
4645    }
4646}