Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, NamedWindow, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19/// A node in the column lineage graph
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22    /// Name of this lineage step (e.g., "table.column")
23    pub name: String,
24    /// The expression at this node
25    pub expression: Expression,
26    /// The source expression (the full query context)
27    pub source: Expression,
28    /// Downstream nodes that depend on this one
29    pub downstream: Vec<LineageNode>,
30    /// Optional source name (e.g., for derived tables)
31    pub source_name: String,
32    /// Semantic source kind for downstream consumers.
33    pub source_kind: SourceKind,
34    /// User-written source alias when different from canonical source name.
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub source_alias: Option<String>,
37    /// Optional reference node name (e.g., for CTEs)
38    pub reference_node_name: String,
39}
40
41impl LineageNode {
42    /// Create a new lineage node
43    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44        Self {
45            name: name.into(),
46            expression,
47            source,
48            downstream: Vec::new(),
49            source_name: String::new(),
50            source_kind: SourceKind::Unknown,
51            source_alias: None,
52            reference_node_name: String::new(),
53        }
54    }
55
56    /// Iterate over all nodes in the lineage graph using DFS
57    pub fn walk(&self) -> LineageWalker<'_> {
58        LineageWalker { stack: vec![self] }
59    }
60
61    /// Get all downstream column names
62    pub fn downstream_names(&self) -> Vec<String> {
63        self.downstream.iter().map(|n| n.name.clone()).collect()
64    }
65}
66
67fn source_kind_for_scope_context(
68    scope: &Scope,
69    source_name: &str,
70    reference_node_name: &str,
71) -> SourceKind {
72    if source_name.is_empty() && reference_node_name.is_empty() {
73        return SourceKind::Root;
74    }
75    if let Some(source_info) = scope.sources.get(source_name) {
76        return source_info.kind;
77    }
78    if scope.cte_sources.contains_key(source_name) {
79        return SourceKind::Cte;
80    }
81    match scope.scope_type {
82        ScopeType::Cte => SourceKind::Cte,
83        ScopeType::DerivedTable => SourceKind::DerivedTable,
84        ScopeType::Udtf => SourceKind::Virtual,
85        _ => SourceKind::Unknown,
86    }
87}
88
89fn apply_scope_context(
90    node: &mut LineageNode,
91    scope: &Scope,
92    source_name: &str,
93    reference_node_name: &str,
94) {
95    node.source_name = source_name.to_string();
96    node.reference_node_name = reference_node_name.to_string();
97    node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100/// Iterator for walking the lineage graph
101pub struct LineageWalker<'a> {
102    stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106    type Item = &'a LineageNode;
107
108    fn next(&mut self) -> Option<Self::Item> {
109        if let Some(node) = self.stack.pop() {
110            // Add children in reverse order so they're visited in order
111            for child in node.downstream.iter().rev() {
112                self.stack.push(child);
113            }
114            Some(node)
115        } else {
116            None
117        }
118    }
119}
120
121// ---------------------------------------------------------------------------
122// ColumnRef: name or positional index for column lookup
123// ---------------------------------------------------------------------------
124
125/// Column reference for lineage tracing — by name or positional index.
126enum ColumnRef<'a> {
127    Name(&'a str),
128    Index(usize),
129}
130
131// ---------------------------------------------------------------------------
132// Public API
133// ---------------------------------------------------------------------------
134
135/// Build the lineage graph for a column in a SQL query
136///
137/// # Arguments
138/// * `column` - The column name to trace lineage for
139/// * `sql` - The SQL expression (SELECT, UNION, etc.)
140/// * `dialect` - Optional dialect for parsing
141/// * `trim_selects` - If true, trim the source SELECT to only include the target column
142///
143/// # Returns
144/// The root lineage node for the specified column
145///
146/// # Example
147/// ```ignore
148/// use polyglot_sql::lineage::lineage;
149/// use polyglot_sql::parse_one;
150/// use polyglot_sql::DialectType;
151///
152/// let sql = "SELECT a, b + 1 AS c FROM t";
153/// let expr = parse_one(sql, DialectType::Generic).unwrap();
154/// let node = lineage("c", &expr, None, false).unwrap();
155/// ```
156pub fn lineage(
157    column: &str,
158    sql: &Expression,
159    dialect: Option<DialectType>,
160    trim_selects: bool,
161) -> Result<LineageNode> {
162    let mut owned = lineage_normalized_expression(sql);
163    // Fast path: skip clone when there are no CTEs to expand
164    if has_lineage_with_clause(&owned) {
165        expand_cte_stars(&mut owned, None);
166    }
167    lineage_from_expression(column, &owned, dialect, trim_selects)
168}
169
170/// Build the lineage graph for a column in a SQL query using optional schema metadata.
171///
172/// When `schema` is provided, the query is first qualified with
173/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
174/// ambiguous column references.
175///
176/// # Arguments
177/// * `column` - The column name to trace lineage for
178/// * `sql` - The SQL expression (SELECT, UNION, etc.)
179/// * `schema` - Optional schema used for qualification
180/// * `dialect` - Optional dialect for qualification and lineage handling
181/// * `trim_selects` - If true, trim the source SELECT to only include the target column
182///
183/// # Returns
184/// The root lineage node for the specified column
185pub fn lineage_with_schema(
186    column: &str,
187    sql: &Expression,
188    schema: Option<&dyn Schema>,
189    dialect: Option<DialectType>,
190    trim_selects: bool,
191) -> Result<LineageNode> {
192    let normalized_expression = lineage_normalized_expression(sql);
193    let mut qualified_expression = if let Some(schema) = schema {
194        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
195            QualifyColumnsOptions::new().with_dialect(dialect_type)
196        } else {
197            QualifyColumnsOptions::new()
198        };
199
200        qualify_columns(normalized_expression.clone(), schema, &options).map_err(|e| {
201            Error::internal(format!("Lineage qualification failed with schema: {}", e))
202        })?
203    } else {
204        normalized_expression
205    };
206
207    // Annotate types in-place so lineage nodes carry type information
208    annotate_types(&mut qualified_expression, schema, dialect);
209
210    // Expand CTE stars on the already-owned expression (no extra clone).
211    // Pass schema so that stars from external tables can also be resolved.
212    expand_cte_stars(&mut qualified_expression, schema);
213
214    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
215}
216
217fn lineage_from_expression(
218    column: &str,
219    sql: &Expression,
220    dialect: Option<DialectType>,
221    trim_selects: bool,
222) -> Result<LineageNode> {
223    let scope = build_scope(sql);
224    to_node(
225        ColumnRef::Name(column),
226        &scope,
227        dialect,
228        "",
229        "",
230        "",
231        trim_selects,
232    )
233}
234
235pub(crate) fn lineage_by_index_from_expression(
236    column_index: usize,
237    sql: &Expression,
238    dialect: Option<DialectType>,
239    trim_selects: bool,
240) -> Result<LineageNode> {
241    let normalized = lineage_normalized_expression(sql);
242    let scope = build_scope(&normalized);
243    to_node(
244        ColumnRef::Index(column_index),
245        &scope,
246        dialect,
247        "",
248        "",
249        "",
250        trim_selects,
251    )
252}
253
254fn lineage_normalized_expression(sql: &Expression) -> Expression {
255    match sql {
256        Expression::Prepare(prepare) => lineage_normalized_expression(&prepare.statement),
257        Expression::CreateTable(create) => create
258            .as_select
259            .as_ref()
260            .map(|query| attach_with_to_query(query.clone(), create.with_cte.clone()))
261            .unwrap_or_else(|| sql.clone()),
262        Expression::CreateView(create) => lineage_normalized_expression(&create.query),
263        Expression::Insert(insert) => insert
264            .query
265            .as_ref()
266            .map(|query| attach_with_to_query(query.clone(), insert.with.clone()))
267            .unwrap_or_else(|| sql.clone()),
268        _ => sql.clone(),
269    }
270}
271
272fn attach_with_to_query(
273    mut query: Expression,
274    with: Option<crate::expressions::With>,
275) -> Expression {
276    if let Some(with) = with {
277        attach_with_to_query_mut(&mut query, with);
278    }
279    query
280}
281
282fn attach_with_to_query_mut(query: &mut Expression, with: crate::expressions::With) {
283    match query {
284        Expression::Select(select) => {
285            if select.with.is_none() {
286                select.with = Some(with);
287            }
288        }
289        Expression::Union(union) => {
290            if union.with.is_none() {
291                union.with = Some(with);
292            }
293        }
294        Expression::Intersect(intersect) => {
295            if intersect.with.is_none() {
296                intersect.with = Some(with);
297            }
298        }
299        Expression::Except(except) => {
300            if except.with.is_none() {
301                except.with = Some(with);
302            }
303        }
304        Expression::Paren(paren) => attach_with_to_query_mut(&mut paren.this, with),
305        _ => {}
306    }
307}
308
309fn has_lineage_with_clause(expr: &Expression) -> bool {
310    match expr {
311        Expression::Select(select) => select.with.is_some(),
312        Expression::Union(union) => {
313            union.with.is_some()
314                || has_lineage_with_clause(&union.left)
315                || has_lineage_with_clause(&union.right)
316        }
317        Expression::Intersect(intersect) => {
318            intersect.with.is_some()
319                || has_lineage_with_clause(&intersect.left)
320                || has_lineage_with_clause(&intersect.right)
321        }
322        Expression::Except(except) => {
323            except.with.is_some()
324                || has_lineage_with_clause(&except.left)
325                || has_lineage_with_clause(&except.right)
326        }
327        Expression::Paren(paren) => has_lineage_with_clause(&paren.this),
328        _ => false,
329    }
330}
331
332// ---------------------------------------------------------------------------
333// CTE star expansion
334// ---------------------------------------------------------------------------
335
336/// Normalize an identifier for CTE name matching.
337///
338/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
339/// quoted identifiers preserve their original case. This matches sqlglot's
340/// `normalize_identifiers` behavior.
341fn normalize_cte_name(ident: &Identifier) -> String {
342    if ident.quoted {
343        ident.name.clone()
344    } else {
345        ident.name.to_lowercase()
346    }
347}
348
349/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
350/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
351/// which qualify_columns cannot resolve because it processes each SELECT independently.
352///
353/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
354/// by looking up column names in the schema. This enables correct expansion of patterns
355/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
356///
357/// CTE name matching follows SQL identifier semantics: unquoted names are compared
358/// case-insensitively (lowercased), while quoted names preserve their original case.
359/// This matches sqlglot's `normalize_identifiers` behavior.
360pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
361    if let Expression::Prepare(prepare) = expr {
362        expand_cte_stars(&mut prepare.statement, schema);
363        return;
364    }
365
366    let select = match expr {
367        Expression::Select(s) => s,
368        _ => return,
369    };
370
371    let with = match &mut select.with {
372        Some(w) => w,
373        None => return,
374    };
375
376    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
377
378    for cte in &mut with.ctes {
379        let cte_name = normalize_cte_name(&cte.alias);
380
381        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
382        if !cte.columns.is_empty() {
383            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
384            resolved_cte_columns.insert(cte_name, cols);
385            continue;
386        }
387
388        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
389        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
390        // references itself. We detect this conservatively: if the CTE name appears as
391        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
392        // block are still expanded.
393        if with.recursive {
394            let is_self_referencing =
395                if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
396                    let body_sources = get_select_sources(body_select);
397                    body_sources.iter().any(|s| s.normalized == cte_name)
398                } else {
399                    false
400                };
401            if is_self_referencing {
402                continue;
403            }
404        }
405
406        // Get the SELECT from the CTE body (handle UNION by taking left branch)
407        let body_select = match get_leftmost_select_mut(&mut cte.this) {
408            Some(s) => s,
409            None => continue,
410        };
411
412        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
413        resolved_cte_columns.insert(cte_name, columns);
414    }
415
416    // Also expand stars in the outer SELECT itself
417    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
418}
419
420/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
421///
422/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
423/// are determined by the left branch. This matches sqlglot's behavior.
424fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
425    let mut current = expr;
426    for _ in 0..MAX_LINEAGE_DEPTH {
427        match current {
428            Expression::Select(s) => return Some(s),
429            Expression::Union(u) => current = &mut u.left,
430            Expression::Intersect(i) => current = &mut i.left,
431            Expression::Except(e) => current = &mut e.left,
432            Expression::Paren(p) => current = &mut p.this,
433            _ => return None,
434        }
435    }
436    None
437}
438
439/// Rewrite star expressions in a SELECT using resolved CTE column lists.
440/// Falls back to `schema` for external table column lookup.
441/// Returns the list of output column names after expansion.
442fn rewrite_stars_in_select(
443    select: &mut Select,
444    resolved_ctes: &HashMap<String, Vec<String>>,
445    schema: Option<&dyn Schema>,
446) -> Vec<String> {
447    // The AST represents star expressions in two forms depending on syntax:
448    //   - `SELECT *`      → Expression::Star (unqualified star)
449    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
450    // Both must be checked to handle all star patterns.
451    let has_star = select
452        .expressions
453        .iter()
454        .any(|e| matches!(e, Expression::Star(_)));
455    let has_qualified_star = select
456        .expressions
457        .iter()
458        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
459
460    if !has_star && !has_qualified_star {
461        // No stars — just extract column names without rewriting
462        return select
463            .expressions
464            .iter()
465            .filter_map(get_expression_output_name)
466            .collect();
467    }
468
469    let sources = get_select_sources(select);
470    let mut new_expressions = Vec::new();
471    let mut result_columns = Vec::new();
472
473    for expr in &select.expressions {
474        match expr {
475            Expression::Star(star) => {
476                let qual = star.table.as_ref();
477                if let Some(expanded) =
478                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
479                {
480                    for (src_alias, col_name) in &expanded {
481                        let table_id = Identifier::new(src_alias);
482                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
483                        result_columns.push(col_name.clone());
484                    }
485                } else {
486                    new_expressions.push(expr.clone());
487                    result_columns.push("*".to_string());
488                }
489            }
490            Expression::Column(c) if c.name.name == "*" => {
491                let qual = c.table.as_ref();
492                if let Some(expanded) =
493                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
494                {
495                    for (_src_alias, col_name) in &expanded {
496                        // Keep the original table qualifier for qualified stars (table.*)
497                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
498                        result_columns.push(col_name.clone());
499                    }
500                } else {
501                    new_expressions.push(expr.clone());
502                    result_columns.push("*".to_string());
503                }
504            }
505            _ => {
506                new_expressions.push(expr.clone());
507                if let Some(name) = get_expression_output_name(expr) {
508                    result_columns.push(name);
509                }
510            }
511        }
512    }
513
514    select.expressions = new_expressions;
515    result_columns
516}
517
518/// Try to expand a star expression by looking up source columns from resolved CTEs,
519/// falling back to the schema for external tables.
520/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
521/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
522fn expand_star_from_sources(
523    qualifier: Option<&Identifier>,
524    sources: &[SourceInfo],
525    resolved_ctes: &HashMap<String, Vec<String>>,
526    schema: Option<&dyn Schema>,
527) -> Option<Vec<(String, String)>> {
528    let mut expanded = Vec::new();
529
530    if let Some(qual) = qualifier {
531        // Qualified star: table.*
532        let qual_normalized = normalize_cte_name(qual);
533        for src in sources {
534            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
535                // Try CTE first
536                if let Some(cols) = resolved_ctes.get(&src.normalized) {
537                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
538                    return Some(expanded);
539                }
540                // Fall back to schema
541                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
542                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
543                    return Some(expanded);
544                }
545            }
546        }
547        None
548    } else {
549        // Unqualified star: expand all sources.
550        // Intentionally conservative: if any source can't be resolved, the entire
551        // expansion is aborted. Partial expansion would produce an incomplete column
552        // list, causing downstream lineage resolution to silently omit columns.
553        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
554        let mut any_expanded = false;
555        for src in sources {
556            if let Some(cols) = resolved_ctes.get(&src.normalized) {
557                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
558                any_expanded = true;
559            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
560                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
561                any_expanded = true;
562            } else {
563                return None;
564            }
565        }
566        if any_expanded {
567            Some(expanded)
568        } else {
569            None
570        }
571    }
572}
573
574/// Look up column names for a table from the schema.
575fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
576    let schema = schema?;
577    if fq_name.is_empty() {
578        return None;
579    }
580    schema
581        .column_names(fq_name)
582        .ok()
583        .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
584}
585
586/// Create a Column expression with the given name and optional table qualifier.
587fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
588    Expression::Column(Box::new(crate::expressions::Column {
589        name: Identifier::new(name),
590        table: table.cloned(),
591        join_mark: false,
592        trailing_comments: Vec::new(),
593        span: None,
594        inferred_type: None,
595    }))
596}
597
598/// Extract the output name of a SELECT expression.
599fn get_expression_output_name(expr: &Expression) -> Option<String> {
600    match expr {
601        Expression::Alias(a) => Some(a.alias.name.clone()),
602        Expression::Column(c) => Some(c.name.name.clone()),
603        Expression::Identifier(id) => Some(id.name.clone()),
604        Expression::Star(_) => Some("*".to_string()),
605        _ => None,
606    }
607}
608
609/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
610struct SourceInfo {
611    alias: String,
612    /// Whether this source was introduced through a quoted identifier.
613    ///
614    /// The schema-less star passthrough heuristic must stay conservative for
615    /// quoted sources because unresolved quoted table names can be distinct
616    /// from similarly named CTEs that older scope paths still compare
617    /// case-insensitively.
618    quoted: bool,
619    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
620    normalized: String,
621    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
622    fq_name: String,
623}
624
625/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
626/// SELECT's FROM and JOIN clauses in a single pass.
627fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
628    let mut sources = Vec::new();
629
630    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
631        fn virtual_source_info(alias: &Identifier) -> SourceInfo {
632            SourceInfo {
633                alias: alias.name.clone(),
634                quoted: alias.quoted,
635                normalized: normalize_cte_name(alias),
636                fq_name: alias.name.clone(),
637            }
638        }
639
640        fn named_virtual_source_info(alias: &str) -> SourceInfo {
641            SourceInfo {
642                alias: alias.to_string(),
643                quoted: false,
644                normalized: alias.to_lowercase(),
645                fq_name: alias.to_string(),
646            }
647        }
648
649        match expr {
650            Expression::Table(t) => {
651                let normalized = normalize_cte_name(&t.name);
652                let alias = t
653                    .alias
654                    .as_ref()
655                    .map(|a| a.name.clone())
656                    .unwrap_or_else(|| t.name.name.clone());
657                let mut parts = Vec::new();
658                if let Some(catalog) = &t.catalog {
659                    parts.push(catalog.name.clone());
660                }
661                if let Some(schema) = &t.schema {
662                    parts.push(schema.name.clone());
663                }
664                parts.push(t.name.name.clone());
665                let fq_name = parts.join(".");
666                Some(SourceInfo {
667                    alias,
668                    quoted: t.name.quoted,
669                    normalized,
670                    fq_name,
671                })
672            }
673            Expression::Subquery(s) => {
674                let alias_identifier = s.alias.as_ref()?;
675                let alias = alias_identifier.name.clone();
676                let normalized = alias.to_lowercase();
677                let fq_name = alias.clone();
678                Some(SourceInfo {
679                    alias,
680                    quoted: alias_identifier.quoted,
681                    normalized,
682                    fq_name,
683                })
684            }
685            Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
686            Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
687                Some(virtual_source_info(&a.alias))
688            }
689            Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
690            Expression::LateralView(lateral_view) => lateral_view
691                .table_alias
692                .as_ref()
693                .or_else(|| lateral_view.column_aliases.first())
694                .map(virtual_source_info),
695            Expression::Pivot(pivot) => {
696                let alias = pivot_lineage_source_name(
697                    &pivot.this,
698                    pivot.alias.as_ref().map(|alias| alias.name.as_str()),
699                );
700                Some(SourceInfo {
701                    alias: alias.clone(),
702                    quoted: false,
703                    normalized: alias.to_lowercase(),
704                    fq_name: alias,
705                })
706            }
707            Expression::Unpivot(unpivot) => {
708                let alias = pivot_lineage_source_name(
709                    &unpivot.this,
710                    unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
711                );
712                Some(SourceInfo {
713                    alias: alias.clone(),
714                    quoted: false,
715                    normalized: alias.to_lowercase(),
716                    fq_name: alias,
717                })
718            }
719            Expression::Paren(p) => extract_source(&p.this),
720            _ => None,
721        }
722    }
723
724    if let Some(from) = &select.from {
725        for expr in &from.expressions {
726            if let Some(info) = extract_source(expr) {
727                sources.push(info);
728            }
729        }
730    }
731    for join in &select.joins {
732        if let Some(info) = extract_source(&join.this) {
733            sources.push(info);
734        }
735    }
736    for lateral_view in &select.lateral_views {
737        if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
738        {
739            sources.push(info);
740        }
741    }
742    sources
743}
744
745fn pivot_lineage_source_name(source: &Expression, explicit_alias: Option<&str>) -> String {
746    if let Some(alias) = explicit_alias {
747        return alias.to_string();
748    }
749
750    match source {
751        Expression::Table(table) => table
752            .alias
753            .as_ref()
754            .map(|alias| alias.name.clone())
755            .unwrap_or_else(|| table.name.name.clone()),
756        Expression::Subquery(subquery) => subquery
757            .alias
758            .as_ref()
759            .map(|alias| alias.name.clone())
760            .unwrap_or_else(|| "_0".to_string()),
761        Expression::Paren(paren) => pivot_lineage_source_name(&paren.this, explicit_alias),
762        _ => "_0".to_string(),
763    }
764}
765
766/// Get all source tables from a lineage graph
767pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
768    let mut tables = HashSet::new();
769    collect_source_tables(node, &mut tables);
770    tables
771}
772
773/// Recursively collect source table names from lineage graph
774pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
775    if let Expression::Table(table) = &node.source {
776        tables.insert(table.name.name.clone());
777    }
778    for child in &node.downstream {
779        collect_source_tables(child, tables);
780    }
781}
782
783// ---------------------------------------------------------------------------
784// Core recursive lineage builder
785// ---------------------------------------------------------------------------
786
787/// Maximum recursion depth for lineage tracing to prevent stack overflow
788/// on circular or deeply nested CTE chains.
789const MAX_LINEAGE_DEPTH: usize = 64;
790
791/// Recursively build a lineage node for a column in a scope.
792fn to_node(
793    column: ColumnRef<'_>,
794    scope: &Scope,
795    dialect: Option<DialectType>,
796    scope_name: &str,
797    source_name: &str,
798    reference_node_name: &str,
799    trim_selects: bool,
800) -> Result<LineageNode> {
801    to_node_inner(
802        column,
803        scope,
804        dialect,
805        scope_name,
806        source_name,
807        reference_node_name,
808        trim_selects,
809        &[],
810        0,
811    )
812}
813
814fn to_node_inner(
815    column: ColumnRef<'_>,
816    scope: &Scope,
817    dialect: Option<DialectType>,
818    scope_name: &str,
819    source_name: &str,
820    reference_node_name: &str,
821    trim_selects: bool,
822    ancestor_cte_scopes: &[Scope],
823    depth: usize,
824) -> Result<LineageNode> {
825    if depth > MAX_LINEAGE_DEPTH {
826        return Err(Error::internal(format!(
827            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
828        )));
829    }
830    let scope_expr = &scope.expression;
831
832    // Build combined CTE scopes: current scope's cte_scopes + ancestors
833    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
834    for s in ancestor_cte_scopes {
835        all_cte_scopes.push(s);
836    }
837    let descendant_cte_scopes = descendant_cte_scope_clones(&all_cte_scopes, scope);
838
839    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
840    //    but we need the inner query (SELECT/UNION) for column lookup.
841    let effective_expr = match scope_expr {
842        Expression::Cte(cte) => &cte.this,
843        other => other,
844    };
845
846    // 1. Set operations (UNION / INTERSECT / EXCEPT)
847    if matches!(
848        effective_expr,
849        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
850    ) {
851        // For CTE wrapping a set op, create a temporary scope with the inner expression
852        if matches!(scope_expr, Expression::Cte(_)) {
853            let mut inner_scope = Scope::new(effective_expr.clone());
854            inner_scope.union_scopes = scope.union_scopes.clone();
855            inner_scope.sources = scope.sources.clone();
856            inner_scope.cte_sources = scope.cte_sources.clone();
857            inner_scope.cte_scopes = scope.cte_scopes.clone();
858            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
859            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
860            return handle_set_operation(
861                &column,
862                &inner_scope,
863                dialect,
864                scope_name,
865                source_name,
866                reference_node_name,
867                trim_selects,
868                &descendant_cte_scopes,
869                depth,
870            );
871        }
872        return handle_set_operation(
873            &column,
874            scope,
875            dialect,
876            scope_name,
877            source_name,
878            reference_node_name,
879            trim_selects,
880            &descendant_cte_scopes,
881            depth,
882        );
883    }
884
885    // 2. Find the select expression for this column
886    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
887    let column_name = resolve_column_name(&column, &select_expr);
888
889    // 3. Trim source if requested
890    let node_source = if trim_selects {
891        trim_source(effective_expr, &select_expr)
892    } else {
893        effective_expr.clone()
894    };
895
896    // 4. Create the lineage node
897    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
898    apply_scope_context(&mut node, scope, source_name, reference_node_name);
899
900    // 5. Star handling — add downstream for each source
901    if let Expression::Star(star) = &select_expr {
902        let star_table = star
903            .table
904            .as_ref()
905            .map(|identifier| identifier.name.as_str());
906        for (name, source_info) in &scope.sources {
907            if let Some(star_table) = star_table {
908                let table_matches = name.eq_ignore_ascii_case(star_table)
909                    || source_info
910                        .alias
911                        .as_deref()
912                        .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
913                    || matches!(
914                        &source_info.expression,
915                        Expression::Table(table_ref)
916                            if table_name_from_table_ref(table_ref).eq_ignore_ascii_case(star_table)
917                    );
918                if !table_matches {
919                    continue;
920                }
921            }
922
923            let mut child = LineageNode::new(
924                format!("{}.*", name),
925                Expression::Star(crate::expressions::Star {
926                    table: star.table.clone(),
927                    except: None,
928                    replace: None,
929                    rename: None,
930                    trailing_comments: vec![],
931                    span: None,
932                }),
933                source_info.expression.clone(),
934            );
935            apply_source_info_context(&mut child, name, source_info);
936            node.downstream.push(child);
937        }
938        return Ok(node);
939    }
940
941    // 6. Subqueries in select — trace through scalar subqueries
942    let subqueries: Vec<&Expression> =
943        select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
944    for sq_expr in subqueries {
945        if let Expression::Subquery(sq) = sq_expr {
946            for sq_scope in &scope.subquery_scopes {
947                if sq_scope.expression == sq.this {
948                    if let Ok(child) = to_node_inner(
949                        ColumnRef::Index(0),
950                        sq_scope,
951                        dialect,
952                        &column_name,
953                        "",
954                        "",
955                        trim_selects,
956                        &descendant_cte_scopes,
957                        depth + 1,
958                    ) {
959                        node.downstream.push(child);
960                    }
961                    break;
962                }
963            }
964        }
965    }
966
967    // 7. Column references — trace each column to its source
968    let col_refs = find_column_refs_in_expr_with_select(&select_expr, effective_expr, dialect);
969    for col_ref in col_refs {
970        let col_name = &col_ref.column;
971        if let Some(ref table_id) = col_ref.table {
972            let tbl = &table_id.name;
973            resolve_qualified_column(
974                &mut node,
975                scope,
976                dialect,
977                tbl,
978                col_name,
979                &column_name,
980                trim_selects,
981                &all_cte_scopes,
982                depth,
983            );
984        } else {
985            resolve_unqualified_column(
986                &mut node,
987                scope,
988                dialect,
989                col_name,
990                &column_name,
991                trim_selects,
992                &all_cte_scopes,
993                depth,
994            );
995        }
996    }
997
998    Ok(node)
999}
1000
1001fn descendant_cte_scope_clones(all_cte_scopes: &[&Scope], current_scope: &Scope) -> Vec<Scope> {
1002    all_cte_scopes
1003        .iter()
1004        .filter(|scope| scope.expression != current_scope.expression)
1005        .map(|scope| (*scope).clone())
1006        .collect()
1007}
1008
1009// ---------------------------------------------------------------------------
1010// Set operation handling
1011// ---------------------------------------------------------------------------
1012
1013fn handle_set_operation(
1014    column: &ColumnRef<'_>,
1015    scope: &Scope,
1016    dialect: Option<DialectType>,
1017    scope_name: &str,
1018    source_name: &str,
1019    reference_node_name: &str,
1020    trim_selects: bool,
1021    ancestor_cte_scopes: &[Scope],
1022    depth: usize,
1023) -> Result<LineageNode> {
1024    let scope_expr = &scope.expression;
1025
1026    // Determine column index
1027    let col_index = match column {
1028        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
1029        ColumnRef::Index(i) => *i,
1030    };
1031
1032    let col_name = match column {
1033        ColumnRef::Name(name) => name.to_string(),
1034        ColumnRef::Index(_) => format!("_{col_index}"),
1035    };
1036
1037    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
1038    apply_scope_context(&mut node, scope, source_name, reference_node_name);
1039
1040    // Recurse into each union branch
1041    for branch_scope in &scope.union_scopes {
1042        if let Ok(child) = to_node_inner(
1043            ColumnRef::Index(col_index),
1044            branch_scope,
1045            dialect,
1046            scope_name,
1047            "",
1048            "",
1049            trim_selects,
1050            ancestor_cte_scopes,
1051            depth + 1,
1052        ) {
1053            node.downstream.push(child);
1054        }
1055    }
1056
1057    Ok(node)
1058}
1059
1060// ---------------------------------------------------------------------------
1061// Column resolution helpers
1062// ---------------------------------------------------------------------------
1063
1064fn resolve_qualified_column(
1065    node: &mut LineageNode,
1066    scope: &Scope,
1067    dialect: Option<DialectType>,
1068    table: &str,
1069    col_name: &str,
1070    parent_name: &str,
1071    trim_selects: bool,
1072    all_cte_scopes: &[&Scope],
1073    depth: usize,
1074) {
1075    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
1076    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
1077    let resolved_cte_name = resolve_cte_alias(scope, table);
1078    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
1079
1080    if let Some(source_info) = scope
1081        .sources
1082        .get(table)
1083        .or_else(|| scope.sources.get(effective_table))
1084    {
1085        match &source_info.expression {
1086            Expression::Pivot(pivot) => {
1087                if attach_pivot_dependencies(
1088                    node,
1089                    scope,
1090                    dialect,
1091                    pivot,
1092                    col_name,
1093                    trim_selects,
1094                    all_cte_scopes,
1095                    depth,
1096                ) {
1097                    return;
1098                }
1099            }
1100            Expression::Unpivot(unpivot) => {
1101                if attach_unpivot_dependencies(
1102                    node,
1103                    scope,
1104                    dialect,
1105                    unpivot,
1106                    col_name,
1107                    trim_selects,
1108                    all_cte_scopes,
1109                    depth,
1110                ) {
1111                    return;
1112                }
1113            }
1114            _ => {}
1115        }
1116    }
1117
1118    // Check if table is a CTE reference — check both the current scope's cte_sources
1119    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
1120    let is_cte = scope.cte_sources.contains_key(effective_table)
1121        || all_cte_scopes.iter().any(
1122            |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
1123        );
1124    if is_cte {
1125        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
1126            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
1127            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1128            if let Ok(child) = to_node_inner(
1129                ColumnRef::Name(col_name),
1130                child_scope,
1131                dialect,
1132                parent_name,
1133                effective_table,
1134                parent_name,
1135                trim_selects,
1136                &ancestors,
1137                depth + 1,
1138            ) {
1139                node.downstream.push(child);
1140                return;
1141            }
1142        }
1143
1144        if let Some(source_info) = scope
1145            .sources
1146            .get(table)
1147            .or_else(|| scope.sources.get(effective_table))
1148            .filter(|source_info| source_info.kind == SourceKind::Cte)
1149        {
1150            node.downstream.push(make_table_column_node_from_source(
1151                effective_table,
1152                col_name,
1153                source_info,
1154            ));
1155            return;
1156        }
1157    }
1158
1159    // Check if table is a derived table (is_scope = true in sources)
1160    if let Some(source_info) = scope.sources.get(table) {
1161        if source_info.is_scope {
1162            if let Some(child_scope) = find_child_scope(scope, table) {
1163                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1164                if let Ok(child) = to_node_inner(
1165                    ColumnRef::Name(col_name),
1166                    child_scope,
1167                    dialect,
1168                    parent_name,
1169                    table,
1170                    parent_name,
1171                    trim_selects,
1172                    &ancestors,
1173                    depth + 1,
1174                ) {
1175                    node.downstream.push(child);
1176                    return;
1177                }
1178            }
1179        }
1180    }
1181
1182    // Base table source found in current scope: preserve alias in the display name
1183    // but store the resolved table expression and name for downstream consumers.
1184    if let Some(source_info) = scope.sources.get(table) {
1185        if !source_info.is_scope {
1186            let mut child = make_table_column_node_from_source(table, col_name, source_info);
1187            if source_info.kind == SourceKind::Virtual {
1188                attach_virtual_source_dependencies(
1189                    &mut child,
1190                    scope,
1191                    dialect,
1192                    table,
1193                    &source_info.expression,
1194                    trim_selects,
1195                    all_cte_scopes,
1196                    depth,
1197                );
1198            }
1199            node.downstream.push(child);
1200            return;
1201        }
1202    }
1203
1204    // Base table or unresolved — terminal node
1205    node.downstream
1206        .push(make_table_column_node(table, col_name));
1207}
1208
1209fn attach_pivot_dependencies(
1210    node: &mut LineageNode,
1211    scope: &Scope,
1212    dialect: Option<DialectType>,
1213    pivot: &crate::expressions::Pivot,
1214    col_name: &str,
1215    trim_selects: bool,
1216    all_cte_scopes: &[&Scope],
1217    depth: usize,
1218) -> bool {
1219    if pivot.unpivot {
1220        return false;
1221    }
1222
1223    let mapping = pivot_column_mapping(pivot, dialect);
1224    let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1225        if pivot_implicit_source_column(pivot, col_name) {
1226            let col_ref = SimpleColumnRef {
1227                table: None,
1228                column: col_name.to_string(),
1229            };
1230            attach_pivot_input_column(
1231                node,
1232                scope,
1233                dialect,
1234                &pivot.this,
1235                &col_ref,
1236                trim_selects,
1237                all_cte_scopes,
1238                depth,
1239            );
1240            return true;
1241        }
1242        return false;
1243    };
1244
1245    for col_ref in input_columns {
1246        attach_pivot_input_column(
1247            node,
1248            scope,
1249            dialect,
1250            &pivot.this,
1251            col_ref,
1252            trim_selects,
1253            all_cte_scopes,
1254            depth,
1255        );
1256    }
1257    true
1258}
1259
1260fn attach_unpivot_dependencies(
1261    node: &mut LineageNode,
1262    scope: &Scope,
1263    dialect: Option<DialectType>,
1264    unpivot: &crate::expressions::Unpivot,
1265    col_name: &str,
1266    trim_selects: bool,
1267    all_cte_scopes: &[&Scope],
1268    depth: usize,
1269) -> bool {
1270    let mapping = unpivot_column_mapping(unpivot, dialect);
1271    let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1272        return false;
1273    };
1274
1275    for col_ref in input_columns {
1276        attach_pivot_input_column(
1277            node,
1278            scope,
1279            dialect,
1280            &unpivot.this,
1281            col_ref,
1282            trim_selects,
1283            all_cte_scopes,
1284            depth,
1285        );
1286    }
1287    true
1288}
1289
1290fn pivot_column_mapping(
1291    pivot: &crate::expressions::Pivot,
1292    dialect: Option<DialectType>,
1293) -> HashMap<String, Vec<SimpleColumnRef>> {
1294    let fields = pivot_field_output_names(pivot);
1295    if fields.is_empty() {
1296        return HashMap::new();
1297    }
1298
1299    let mut mapping = HashMap::new();
1300    for (agg_index, agg) in pivot.expressions.iter().enumerate() {
1301        let input_columns = find_column_refs_in_expr(agg, dialect);
1302        if input_columns.is_empty() {
1303            continue;
1304        }
1305        let agg_name = pivot_aggregation_name(agg);
1306        for field in &fields {
1307            let mut output_names = Vec::new();
1308            if pivot.expressions.len() == 1 {
1309                output_names.push(field.clone());
1310            }
1311            if let Some(agg_name) = &agg_name {
1312                output_names.push(format!("{field}_{agg_name}"));
1313                output_names.push(format!("{agg_name}_{field}"));
1314            }
1315            if pivot.expressions.len() > 1 && agg_name.is_none() {
1316                output_names.push(format!("{}_{}", field, agg_index));
1317            }
1318
1319            for output_name in output_names {
1320                mapping.insert(
1321                    normalize_column_name(&output_name, dialect),
1322                    input_columns.clone(),
1323                );
1324            }
1325        }
1326    }
1327    mapping
1328}
1329
1330fn pivot_field_output_names(pivot: &crate::expressions::Pivot) -> Vec<String> {
1331    let mut names = Vec::new();
1332    for field in &pivot.fields {
1333        if let Expression::In(in_expr) = field {
1334            for expr in &in_expr.expressions {
1335                if let Some(name) = pivot_expr_output_name(expr) {
1336                    names.push(name);
1337                }
1338            }
1339        }
1340    }
1341    names
1342}
1343
1344fn pivot_aggregation_name(expr: &Expression) -> Option<String> {
1345    match expr {
1346        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1347        _ => get_alias_or_name(expr),
1348    }
1349}
1350
1351fn pivot_expr_output_name(expr: &Expression) -> Option<String> {
1352    match expr {
1353        Expression::PivotAlias(alias) => pivot_expr_output_name(&alias.alias),
1354        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1355        Expression::Identifier(identifier) => Some(identifier.name.clone()),
1356        Expression::Column(column) => Some(column.name.name.clone()),
1357        Expression::Literal(literal) => Some(literal.value_str().to_string()),
1358        Expression::Var(var) => Some(var.this.clone()),
1359        Expression::Tuple(tuple) => tuple.expressions.first().and_then(pivot_expr_output_name),
1360        _ => None,
1361    }
1362}
1363
1364fn pivot_implicit_source_column(pivot: &crate::expressions::Pivot, col_name: &str) -> bool {
1365    let pivot_columns: HashSet<String> = pivot
1366        .fields
1367        .iter()
1368        .filter_map(|field| match field {
1369            Expression::In(in_expr) => Some(&in_expr.this),
1370            _ => None,
1371        })
1372        .flat_map(|expr| find_column_refs_in_expr(expr, None))
1373        .map(|col| col.column.to_lowercase())
1374        .collect();
1375    let aggregation_columns: HashSet<String> = pivot
1376        .expressions
1377        .iter()
1378        .flat_map(|expr| find_column_refs_in_expr(expr, None))
1379        .map(|col| col.column.to_lowercase())
1380        .collect();
1381
1382    let normalized = col_name.to_lowercase();
1383    !pivot_columns.contains(&normalized) && !aggregation_columns.contains(&normalized)
1384}
1385
1386fn unpivot_column_mapping(
1387    unpivot: &crate::expressions::Unpivot,
1388    dialect: Option<DialectType>,
1389) -> HashMap<String, Vec<SimpleColumnRef>> {
1390    let value_columns: Vec<String> = std::iter::once(unpivot.value_column.name.clone())
1391        .chain(
1392            unpivot
1393                .extra_value_columns
1394                .iter()
1395                .map(|column| column.name.clone()),
1396        )
1397        .collect();
1398    let mut all_input_columns = Vec::new();
1399    let mut value_input_columns: Vec<Vec<SimpleColumnRef>> = vec![Vec::new(); value_columns.len()];
1400
1401    for entry in &unpivot.columns {
1402        let columns = unpivot_entry_columns(entry);
1403        all_input_columns.extend(columns.clone());
1404        if columns.len() == value_columns.len() {
1405            for (idx, col_ref) in columns.into_iter().enumerate() {
1406                value_input_columns[idx].push(col_ref);
1407            }
1408        } else {
1409            for inputs in &mut value_input_columns {
1410                inputs.extend(columns.clone());
1411            }
1412        }
1413    }
1414
1415    let mut mapping = HashMap::new();
1416    mapping.insert(
1417        normalize_column_name(&unpivot.name_column.name, dialect),
1418        all_input_columns.clone(),
1419    );
1420    for (idx, value_column) in value_columns.iter().enumerate() {
1421        mapping.insert(
1422            normalize_column_name(value_column, dialect),
1423            value_input_columns.get(idx).cloned().unwrap_or_default(),
1424        );
1425    }
1426    mapping
1427}
1428
1429fn unpivot_entry_columns(expr: &Expression) -> Vec<SimpleColumnRef> {
1430    match expr {
1431        Expression::PivotAlias(alias) => unpivot_entry_columns(&alias.this),
1432        Expression::Tuple(tuple) => tuple
1433            .expressions
1434            .iter()
1435            .flat_map(unpivot_entry_columns)
1436            .collect(),
1437        Expression::Column(column) => vec![SimpleColumnRef {
1438            table: column.table.clone(),
1439            column: column.name.name.clone(),
1440        }],
1441        Expression::Identifier(identifier) => vec![SimpleColumnRef {
1442            table: None,
1443            column: identifier.name.clone(),
1444        }],
1445        _ => find_column_refs_in_expr(expr, None),
1446    }
1447}
1448
1449fn attach_pivot_input_column(
1450    node: &mut LineageNode,
1451    scope: &Scope,
1452    dialect: Option<DialectType>,
1453    source_expr: &Expression,
1454    col_ref: &SimpleColumnRef,
1455    trim_selects: bool,
1456    all_cte_scopes: &[&Scope],
1457    depth: usize,
1458) {
1459    match source_expr {
1460        Expression::Table(table) => {
1461            let table_name = col_ref
1462                .table
1463                .as_ref()
1464                .map(|identifier| identifier.name.as_str())
1465                .unwrap_or(table.name.name.as_str());
1466            if scope.cte_sources.contains_key(table_name) {
1467                resolve_qualified_column(
1468                    node,
1469                    scope,
1470                    dialect,
1471                    table_name,
1472                    &col_ref.column,
1473                    &node.name.clone(),
1474                    trim_selects,
1475                    all_cte_scopes,
1476                    depth + 1,
1477                );
1478            } else {
1479                let mut source = ScopeSourceInfo::new(
1480                    Expression::Table(Box::new(table.as_ref().clone())),
1481                    false,
1482                    SourceKind::Table,
1483                );
1484                if let Some(alias) = &table.alias {
1485                    source = source.with_alias(alias.name.clone());
1486                }
1487                let source_key = table
1488                    .alias
1489                    .as_ref()
1490                    .map(|alias| alias.name.as_str())
1491                    .unwrap_or(table.name.name.as_str());
1492                node.downstream.push(make_table_column_node_from_source(
1493                    source_key,
1494                    &col_ref.column,
1495                    &source,
1496                ));
1497            }
1498        }
1499        Expression::Subquery(subquery) => {
1500            let source_scope = build_scope(&subquery.this);
1501            let child = if let Some(table) = &col_ref.table {
1502                let mut child_node = LineageNode::new(
1503                    &col_ref.column,
1504                    subquery.this.clone(),
1505                    subquery.this.clone(),
1506                );
1507                resolve_qualified_column(
1508                    &mut child_node,
1509                    &source_scope,
1510                    dialect,
1511                    &table.name,
1512                    &col_ref.column,
1513                    &node.name.clone(),
1514                    trim_selects,
1515                    all_cte_scopes,
1516                    depth + 1,
1517                );
1518                Ok(child_node)
1519            } else {
1520                to_node_inner(
1521                    ColumnRef::Name(&col_ref.column),
1522                    &source_scope,
1523                    dialect,
1524                    "",
1525                    "",
1526                    "",
1527                    trim_selects,
1528                    &all_cte_scopes
1529                        .iter()
1530                        .map(|scope| (*scope).clone())
1531                        .collect::<Vec<_>>(),
1532                    depth + 1,
1533                )
1534            };
1535            if let Ok(child) = child {
1536                node.downstream.push(child);
1537            }
1538        }
1539        Expression::Paren(paren) => attach_pivot_input_column(
1540            node,
1541            scope,
1542            dialect,
1543            &paren.this,
1544            col_ref,
1545            trim_selects,
1546            all_cte_scopes,
1547            depth,
1548        ),
1549        _ => {
1550            if let Some(table) = &col_ref.table {
1551                resolve_qualified_column(
1552                    node,
1553                    scope,
1554                    dialect,
1555                    &table.name,
1556                    &col_ref.column,
1557                    &node.name.clone(),
1558                    trim_selects,
1559                    all_cte_scopes,
1560                    depth + 1,
1561                );
1562            } else {
1563                node.downstream
1564                    .push(make_table_column_node("_", &col_ref.column));
1565            }
1566        }
1567    }
1568}
1569
1570/// Resolve a FROM alias to the original CTE name.
1571///
1572/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
1573/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
1574/// This function checks if `name` is such an alias and returns the CTE name.
1575fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1576    // If it's already a known CTE name, no resolution needed
1577    if scope.cte_sources.contains_key(name) {
1578        return None;
1579    }
1580    // Check if the source's expression is a CTE — if so, extract the CTE name
1581    if let Some(source_info) = scope.sources.get(name) {
1582        if source_info.is_scope {
1583            if let Expression::Cte(cte) = &source_info.expression {
1584                let cte_name = &cte.alias.name;
1585                if scope.cte_sources.contains_key(cte_name) {
1586                    return Some(cte_name.clone());
1587                }
1588            }
1589        }
1590    }
1591    None
1592}
1593
1594fn resolve_unqualified_column(
1595    node: &mut LineageNode,
1596    scope: &Scope,
1597    dialect: Option<DialectType>,
1598    col_name: &str,
1599    parent_name: &str,
1600    trim_selects: bool,
1601    all_cte_scopes: &[&Scope],
1602    depth: usize,
1603) {
1604    // Try to find which source this column belongs to.
1605    // Build the source list from the actual FROM/JOIN clauses to avoid
1606    // mixing in CTE definitions that are in scope but not referenced.
1607    let from_source_names = source_names_from_from_join(scope);
1608
1609    if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1610        resolve_qualified_column(
1611            node,
1612            scope,
1613            dialect,
1614            &tbl,
1615            col_name,
1616            parent_name,
1617            trim_selects,
1618            all_cte_scopes,
1619            depth,
1620        );
1621        return;
1622    }
1623
1624    if from_source_names.len() == 1 {
1625        let tbl = &from_source_names[0];
1626        resolve_qualified_column(
1627            node,
1628            scope,
1629            dialect,
1630            tbl,
1631            col_name,
1632            parent_name,
1633            trim_selects,
1634            all_cte_scopes,
1635            depth,
1636        );
1637        return;
1638    }
1639
1640    // Multiple sources — can't resolve without schema info, add unqualified node
1641    let child = LineageNode::new(
1642        col_name.to_string(),
1643        Expression::Column(Box::new(crate::expressions::Column {
1644            name: crate::expressions::Identifier::new(col_name.to_string()),
1645            table: None,
1646            join_mark: false,
1647            trailing_comments: vec![],
1648            span: None,
1649            inferred_type: None,
1650        })),
1651        node.source.clone(),
1652    );
1653    node.downstream.push(child);
1654}
1655
1656fn unique_virtual_source_for_column(
1657    scope: &Scope,
1658    source_names: &[String],
1659    col_name: &str,
1660) -> Option<String> {
1661    let mut matches = source_names.iter().filter_map(|source_name| {
1662        let source = scope.sources.get(source_name)?;
1663        if source.kind == SourceKind::Virtual
1664            && virtual_source_output_columns(source)
1665                .any(|column| column.eq_ignore_ascii_case(col_name))
1666        {
1667            Some(source_name.clone())
1668        } else {
1669            None
1670        }
1671    });
1672
1673    let first = matches.next()?;
1674    if matches.next().is_none() {
1675        Some(first)
1676    } else {
1677        None
1678    }
1679}
1680
1681fn virtual_source_output_columns(
1682    source_info: &ScopeSourceInfo,
1683) -> Box<dyn Iterator<Item = String> + '_> {
1684    match &source_info.expression {
1685        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1686        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1687            Box::new(alias_output_columns(alias))
1688        }
1689        Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1690        Expression::LateralView(lateral_view) => {
1691            Box::new(lateral_view_output_columns(lateral_view))
1692        }
1693        _ => Box::new(source_info.alias.clone().into_iter()),
1694    }
1695}
1696
1697fn unnest_output_columns(
1698    unnest: &crate::expressions::UnnestFunc,
1699) -> impl Iterator<Item = String> + '_ {
1700    unnest
1701        .alias
1702        .iter()
1703        .map(|alias| alias.name.clone())
1704        .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1705}
1706
1707fn alias_output_columns(
1708    alias: &crate::expressions::Alias,
1709) -> Box<dyn Iterator<Item = String> + '_> {
1710    if alias.column_aliases.is_empty() {
1711        Box::new(std::iter::once(alias.alias.name.clone()))
1712    } else {
1713        Box::new(
1714            alias
1715                .column_aliases
1716                .iter()
1717                .map(|column| column.name.clone()),
1718        )
1719    }
1720}
1721
1722fn lateral_output_columns(
1723    lateral: &crate::expressions::Lateral,
1724) -> Box<dyn Iterator<Item = String> + '_> {
1725    if lateral.column_aliases.is_empty() {
1726        default_virtual_output_columns(&lateral.this)
1727    } else {
1728        Box::new(lateral.column_aliases.iter().cloned())
1729    }
1730}
1731
1732fn lateral_view_output_columns(
1733    lateral_view: &crate::expressions::LateralView,
1734) -> Box<dyn Iterator<Item = String> + '_> {
1735    Box::new(
1736        lateral_view
1737            .column_aliases
1738            .iter()
1739            .map(|column| column.name.clone()),
1740    )
1741}
1742
1743fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1744    match expr {
1745        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1746        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1747            alias_output_columns(alias)
1748        }
1749        Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1750            Box::new(
1751                ["seq", "key", "path", "index", "value", "this"]
1752                    .into_iter()
1753                    .map(String::from),
1754            )
1755        }
1756        _ => Box::new(std::iter::empty()),
1757    }
1758}
1759
1760fn attach_virtual_source_dependencies(
1761    node: &mut LineageNode,
1762    scope: &Scope,
1763    dialect: Option<DialectType>,
1764    source_alias: &str,
1765    source_expr: &Expression,
1766    trim_selects: bool,
1767    all_cte_scopes: &[&Scope],
1768    depth: usize,
1769) {
1770    let parent_name = node.name.clone();
1771    let mut seen = HashSet::new();
1772    for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1773        let key = (
1774            col_ref.table.as_ref().map(|t| t.name.clone()),
1775            col_ref.column.clone(),
1776        );
1777        if !seen.insert(key) {
1778            continue;
1779        }
1780
1781        if let Some(table_id) = col_ref.table {
1782            let table = table_id.name;
1783            if table == source_alias {
1784                continue;
1785            }
1786            resolve_qualified_column(
1787                node,
1788                scope,
1789                dialect,
1790                &table,
1791                &col_ref.column,
1792                &parent_name,
1793                trim_selects,
1794                all_cte_scopes,
1795                depth + 1,
1796            );
1797        } else {
1798            let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1799            if non_virtual_sources.len() == 1 {
1800                resolve_qualified_column(
1801                    node,
1802                    scope,
1803                    dialect,
1804                    &non_virtual_sources[0],
1805                    &col_ref.column,
1806                    &parent_name,
1807                    trim_selects,
1808                    all_cte_scopes,
1809                    depth + 1,
1810                );
1811            }
1812        }
1813    }
1814}
1815
1816fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1817    fn source_name(expr: &Expression) -> Option<String> {
1818        match expr {
1819            Expression::Table(table) => Some(
1820                table
1821                    .alias
1822                    .as_ref()
1823                    .map(|a| a.name.clone())
1824                    .unwrap_or_else(|| table.name.name.clone()),
1825            ),
1826            Expression::Subquery(subquery) => {
1827                subquery.alias.as_ref().map(|alias| alias.name.clone())
1828            }
1829            Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1830            Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1831                Some(alias.alias.name.clone())
1832            }
1833            Expression::Lateral(lateral) => lateral.alias.clone(),
1834            Expression::LateralView(lateral_view) => lateral_view
1835                .table_alias
1836                .as_ref()
1837                .or_else(|| lateral_view.column_aliases.first())
1838                .map(|alias| alias.name.clone()),
1839            Expression::Pivot(pivot) => Some(pivot_lineage_source_name(
1840                &pivot.this,
1841                pivot.alias.as_ref().map(|alias| alias.name.as_str()),
1842            )),
1843            Expression::Unpivot(unpivot) => Some(pivot_lineage_source_name(
1844                &unpivot.this,
1845                unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
1846            )),
1847            Expression::Paren(paren) => source_name(&paren.this),
1848            _ => None,
1849        }
1850    }
1851
1852    let effective_expr = match &scope.expression {
1853        Expression::Cte(cte) => &cte.this,
1854        expr => expr,
1855    };
1856
1857    let mut names = Vec::new();
1858    let mut seen = std::collections::HashSet::new();
1859
1860    if let Expression::Select(select) = effective_expr {
1861        if let Some(from) = &select.from {
1862            for expr in &from.expressions {
1863                if let Some(name) = source_name(expr) {
1864                    if !name.is_empty() && seen.insert(name.clone()) {
1865                        names.push(name);
1866                    }
1867                }
1868            }
1869        }
1870        for join in &select.joins {
1871            if let Some(name) = source_name(&join.this) {
1872                if !name.is_empty() && seen.insert(name.clone()) {
1873                    names.push(name);
1874                }
1875            }
1876        }
1877        for lateral_view in &select.lateral_views {
1878            if let Some(name) =
1879                source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
1880            {
1881                if !name.is_empty() && seen.insert(name.clone()) {
1882                    names.push(name);
1883                }
1884            }
1885        }
1886    }
1887
1888    names
1889}
1890
1891fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1892    source_names_from_from_join(scope)
1893        .into_iter()
1894        .filter(|name| {
1895            !matches!(
1896                scope.sources.get(name).map(|source| source.kind),
1897                Some(SourceKind::Virtual)
1898            )
1899        })
1900        .collect()
1901}
1902
1903// ---------------------------------------------------------------------------
1904// Helper functions
1905// ---------------------------------------------------------------------------
1906
1907/// Get the alias or name of an expression
1908fn get_alias_or_name(expr: &Expression) -> Option<String> {
1909    match expr {
1910        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1911        Expression::Column(col) => Some(col.name.name.clone()),
1912        Expression::Identifier(id) => Some(id.name.clone()),
1913        Expression::Star(_) => Some("*".to_string()),
1914        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
1915        // Unwrap to get the actual column/alias name from the inner expression.
1916        Expression::Annotated(a) => get_alias_or_name(&a.this),
1917        _ => None,
1918    }
1919}
1920
1921/// Resolve the display name for a column reference.
1922fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1923    match column {
1924        ColumnRef::Name(n) => n.to_string(),
1925        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1926    }
1927}
1928
1929/// Find the select expression matching a column reference.
1930fn find_select_expr(
1931    scope_expr: &Expression,
1932    column: &ColumnRef<'_>,
1933    dialect: Option<DialectType>,
1934) -> Result<Expression> {
1935    if let Expression::Select(ref select) = scope_expr {
1936        match column {
1937            ColumnRef::Name(name) => {
1938                let normalized_name = normalize_column_name(name, dialect);
1939                for expr in &select.expressions {
1940                    if let Some(alias_or_name) = get_alias_or_name(expr) {
1941                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1942                            return Ok(expr.clone());
1943                        }
1944                    }
1945                }
1946                if let Some(expr) = synthesize_star_passthrough_expr(select, name) {
1947                    return Ok(expr);
1948                }
1949                Err(crate::error::Error::parse(
1950                    format!("Cannot find column '{}' in query", name),
1951                    0,
1952                    0,
1953                    0,
1954                    0,
1955                ))
1956            }
1957            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1958                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1959            }),
1960        }
1961    } else {
1962        Err(crate::error::Error::parse(
1963            "Expected SELECT expression for column lookup",
1964            0,
1965            0,
1966            0,
1967            0,
1968        ))
1969    }
1970}
1971
1972fn synthesize_star_passthrough_expr(select: &Select, name: &str) -> Option<Expression> {
1973    let sources = get_select_sources(select);
1974    if sources.is_empty() {
1975        return None;
1976    }
1977
1978    let mut candidate_aliases = Vec::new();
1979    let mut seen = HashSet::new();
1980
1981    for expr in &select.expressions {
1982        let aliases = match star_passthrough_source_aliases(expr, &sources) {
1983            StarPassthroughSources::None => continue,
1984            StarPassthroughSources::Ambiguous => return None,
1985            StarPassthroughSources::Aliases(aliases) => aliases,
1986        };
1987
1988        for alias in aliases {
1989            if seen.insert(alias.clone()) {
1990                candidate_aliases.push(alias);
1991            }
1992        }
1993    }
1994
1995    match candidate_aliases.as_slice() {
1996        [alias] => {
1997            let table = Identifier::new(alias.clone());
1998            Some(make_column_expr(name, Some(&table)))
1999        }
2000        _ => None,
2001    }
2002}
2003
2004enum StarPassthroughSources {
2005    None,
2006    Ambiguous,
2007    Aliases(Vec<String>),
2008}
2009
2010fn star_passthrough_source_aliases(
2011    expr: &Expression,
2012    sources: &[SourceInfo],
2013) -> StarPassthroughSources {
2014    match expr {
2015        Expression::Star(star) => star_source_aliases(star.table.as_ref(), sources),
2016        Expression::Column(column) if column.name.name == "*" => {
2017            star_source_aliases(column.table.as_ref(), sources)
2018        }
2019        Expression::Annotated(annotated) => {
2020            star_passthrough_source_aliases(&annotated.this, sources)
2021        }
2022        _ => StarPassthroughSources::None,
2023    }
2024}
2025
2026fn star_source_aliases(
2027    qualifier: Option<&Identifier>,
2028    sources: &[SourceInfo],
2029) -> StarPassthroughSources {
2030    if let Some(qualifier) = qualifier {
2031        let mut aliases = Vec::new();
2032
2033        for source in sources {
2034            if source_matches_star_qualifier(source, qualifier) {
2035                aliases.push(source.alias.clone());
2036            }
2037        }
2038
2039        return match aliases.len() {
2040            0 => StarPassthroughSources::None,
2041            1 => StarPassthroughSources::Aliases(aliases),
2042            _ => StarPassthroughSources::Ambiguous,
2043        };
2044    }
2045
2046    match sources {
2047        // Do not synthesize a source column for unresolved quoted table stars.
2048        // This keeps quoted CTE/table case semantics intact while still allowing
2049        // the schema-less fallback for common unquoted SELECT * passthroughs.
2050        [source] if source.quoted => StarPassthroughSources::None,
2051        [source] => StarPassthroughSources::Aliases(vec![source.alias.clone()]),
2052        [] => StarPassthroughSources::None,
2053        _ => StarPassthroughSources::Ambiguous,
2054    }
2055}
2056
2057fn source_matches_star_qualifier(source: &SourceInfo, qualifier: &Identifier) -> bool {
2058    if source.normalized == normalize_cte_name(qualifier) {
2059        return true;
2060    }
2061
2062    if qualifier.quoted {
2063        source.alias == qualifier.name
2064    } else {
2065        source.alias.eq_ignore_ascii_case(&qualifier.name)
2066    }
2067}
2068
2069/// Find the positional index of a column name in a set operation's first SELECT branch.
2070fn column_to_index(
2071    set_op_expr: &Expression,
2072    name: &str,
2073    dialect: Option<DialectType>,
2074) -> Result<usize> {
2075    let normalized_name = normalize_column_name(name, dialect);
2076    let mut expr = set_op_expr;
2077    loop {
2078        match expr {
2079            Expression::Union(u) => expr = &u.left,
2080            Expression::Intersect(i) => expr = &i.left,
2081            Expression::Except(e) => expr = &e.left,
2082            Expression::Select(select) => {
2083                for (i, e) in select.expressions.iter().enumerate() {
2084                    if let Some(alias_or_name) = get_alias_or_name(e) {
2085                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
2086                            return Ok(i);
2087                        }
2088                    }
2089                }
2090                return Err(crate::error::Error::parse(
2091                    format!("Cannot find column '{}' in set operation", name),
2092                    0,
2093                    0,
2094                    0,
2095                    0,
2096                ));
2097            }
2098            _ => {
2099                return Err(crate::error::Error::parse(
2100                    "Expected SELECT or set operation",
2101                    0,
2102                    0,
2103                    0,
2104                    0,
2105                ))
2106            }
2107        }
2108    }
2109}
2110
2111fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
2112    normalize_name(name, dialect, false, true)
2113}
2114
2115/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
2116fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
2117    if let Expression::Select(select) = select_expr {
2118        let mut trimmed = select.as_ref().clone();
2119        trimmed.expressions = vec![target_expr.clone()];
2120        Expression::Select(Box::new(trimmed))
2121    } else {
2122        select_expr.clone()
2123    }
2124}
2125
2126/// Find the child scope (CTE or derived table) for a given source name.
2127fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
2128    // Check CTE scopes
2129    if scope.cte_sources.contains_key(source_name) {
2130        for cte_scope in &scope.cte_scopes {
2131            if let Expression::Cte(cte) = &cte_scope.expression {
2132                if cte.alias.name == source_name {
2133                    return Some(cte_scope);
2134                }
2135            }
2136        }
2137    }
2138
2139    // Check derived table scopes
2140    if let Some(source_info) = scope.sources.get(source_name) {
2141        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
2142            if let Expression::Subquery(sq) = &source_info.expression {
2143                for dt_scope in &scope.derived_table_scopes {
2144                    if dt_scope.expression == sq.this {
2145                        return Some(dt_scope);
2146                    }
2147                }
2148            }
2149        }
2150    }
2151
2152    None
2153}
2154
2155/// Find a CTE scope by name, searching through a combined list of CTE scopes.
2156/// This handles nested CTEs where the current scope doesn't have the CTE scope
2157/// as a direct child but knows about it via cte_sources.
2158fn find_child_scope_in<'a>(
2159    all_cte_scopes: &[&'a Scope],
2160    scope: &'a Scope,
2161    source_name: &str,
2162) -> Option<&'a Scope> {
2163    // First try the scope's own cte_scopes
2164    for cte_scope in &scope.cte_scopes {
2165        if let Expression::Cte(cte) = &cte_scope.expression {
2166            if cte.alias.name == source_name {
2167                return Some(cte_scope);
2168            }
2169        }
2170    }
2171
2172    // Then search through all ancestor CTE scopes
2173    for cte_scope in all_cte_scopes {
2174        if let Expression::Cte(cte) = &cte_scope.expression {
2175            if cte.alias.name == source_name {
2176                return Some(cte_scope);
2177            }
2178        }
2179    }
2180
2181    // Fall back to derived table scopes
2182    if let Some(source_info) = scope.sources.get(source_name) {
2183        if source_info.is_scope {
2184            if let Expression::Subquery(sq) = &source_info.expression {
2185                for dt_scope in &scope.derived_table_scopes {
2186                    if dt_scope.expression == sq.this {
2187                        return Some(dt_scope);
2188                    }
2189                }
2190            }
2191        }
2192    }
2193
2194    None
2195}
2196
2197/// Create a terminal lineage node for a table.column reference.
2198fn make_table_column_node(table: &str, column: &str) -> LineageNode {
2199    let mut node = LineageNode::new(
2200        format!("{}.{}", table, column),
2201        Expression::Column(Box::new(crate::expressions::Column {
2202            name: crate::expressions::Identifier::new(column.to_string()),
2203            table: Some(crate::expressions::Identifier::new(table.to_string())),
2204            join_mark: false,
2205            trailing_comments: vec![],
2206            span: None,
2207            inferred_type: None,
2208        })),
2209        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
2210    );
2211    node.source_name = table.to_string();
2212    node.source_kind = SourceKind::Table;
2213    node
2214}
2215
2216fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
2217    let mut parts: Vec<String> = Vec::new();
2218    if let Some(catalog) = &table_ref.catalog {
2219        parts.push(catalog.name.clone());
2220    }
2221    if let Some(schema) = &table_ref.schema {
2222        parts.push(schema.name.clone());
2223    }
2224    parts.push(table_ref.name.name.clone());
2225    parts.join(".")
2226}
2227
2228fn apply_source_info_context(
2229    node: &mut LineageNode,
2230    source_key: &str,
2231    source_info: &ScopeSourceInfo,
2232) {
2233    node.source_kind = source_info.kind;
2234    node.source_name =
2235        source_info
2236            .lineage_name
2237            .clone()
2238            .unwrap_or_else(|| match &source_info.expression {
2239                Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
2240                _ => source_key.to_string(),
2241            });
2242    node.source_alias = source_info.alias.clone();
2243}
2244
2245fn make_table_column_node_from_source(
2246    source_key: &str,
2247    column: &str,
2248    source_info: &ScopeSourceInfo,
2249) -> LineageNode {
2250    let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
2251    let mut node = LineageNode::new(
2252        format!("{}.{}", lineage_name, column),
2253        Expression::Column(Box::new(crate::expressions::Column {
2254            name: crate::expressions::Identifier::new(column.to_string()),
2255            table: Some(crate::expressions::Identifier::new(
2256                lineage_name.to_string(),
2257            )),
2258            join_mark: false,
2259            trailing_comments: vec![],
2260            span: None,
2261            inferred_type: None,
2262        })),
2263        source_info.expression.clone(),
2264    );
2265
2266    apply_source_info_context(&mut node, source_key, source_info);
2267
2268    node
2269}
2270
2271/// Simple column reference extracted from an expression
2272#[derive(Debug, Clone)]
2273struct SimpleColumnRef {
2274    table: Option<crate::expressions::Identifier>,
2275    column: String,
2276}
2277
2278/// Find all column references in an expression (does not recurse into subqueries).
2279fn find_column_refs_in_expr(
2280    expr: &Expression,
2281    dialect: Option<DialectType>,
2282) -> Vec<SimpleColumnRef> {
2283    let mut refs = Vec::new();
2284    collect_column_refs(expr, dialect, &mut refs, None);
2285    refs
2286}
2287
2288fn find_column_refs_in_expr_with_select(
2289    expr: &Expression,
2290    select_expr: &Expression,
2291    dialect: Option<DialectType>,
2292) -> Vec<SimpleColumnRef> {
2293    let named_windows = match select_expr {
2294        Expression::Select(select) => select.windows.as_deref(),
2295        _ => None,
2296    };
2297    let mut refs = Vec::new();
2298    collect_column_refs(expr, dialect, &mut refs, named_windows);
2299    refs
2300}
2301
2302fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
2303    match expr {
2304        Expression::Column(col) => {
2305            col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
2306        }
2307        Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
2308        _ => false,
2309    }
2310}
2311
2312fn collect_column_refs(
2313    expr: &Expression,
2314    dialect: Option<DialectType>,
2315    refs: &mut Vec<SimpleColumnRef>,
2316    named_windows: Option<&[NamedWindow]>,
2317) {
2318    let mut stack: Vec<&Expression> = vec![expr];
2319
2320    while let Some(current) = stack.pop() {
2321        match current {
2322            // === Leaf: collect Column references ===
2323            Expression::Column(col) => {
2324                refs.push(SimpleColumnRef {
2325                    table: col.table.clone(),
2326                    column: col.name.name.clone(),
2327                });
2328            }
2329
2330            // === Boundary: don't recurse into subqueries (handled separately) ===
2331            Expression::Subquery(_) | Expression::Exists(_) => {}
2332
2333            // === BinaryOp variants: left, right ===
2334            Expression::And(op)
2335            | Expression::Or(op)
2336            | Expression::Eq(op)
2337            | Expression::Neq(op)
2338            | Expression::Lt(op)
2339            | Expression::Lte(op)
2340            | Expression::Gt(op)
2341            | Expression::Gte(op)
2342            | Expression::Add(op)
2343            | Expression::Sub(op)
2344            | Expression::Mul(op)
2345            | Expression::Div(op)
2346            | Expression::Mod(op)
2347            | Expression::BitwiseAnd(op)
2348            | Expression::BitwiseOr(op)
2349            | Expression::BitwiseXor(op)
2350            | Expression::BitwiseLeftShift(op)
2351            | Expression::BitwiseRightShift(op)
2352            | Expression::Concat(op)
2353            | Expression::Adjacent(op)
2354            | Expression::TsMatch(op)
2355            | Expression::PropertyEQ(op)
2356            | Expression::ArrayContainsAll(op)
2357            | Expression::ArrayContainedBy(op)
2358            | Expression::ArrayOverlaps(op)
2359            | Expression::JSONBContainsAllTopKeys(op)
2360            | Expression::JSONBContainsAnyTopKeys(op)
2361            | Expression::JSONBDeleteAtPath(op)
2362            | Expression::ExtendsLeft(op)
2363            | Expression::ExtendsRight(op)
2364            | Expression::Is(op)
2365            | Expression::MemberOf(op)
2366            | Expression::NullSafeEq(op)
2367            | Expression::NullSafeNeq(op)
2368            | Expression::Glob(op)
2369            | Expression::Match(op) => {
2370                stack.push(&op.left);
2371                stack.push(&op.right);
2372            }
2373
2374            // === UnaryOp variants: this ===
2375            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
2376                stack.push(&u.this);
2377            }
2378
2379            // === UnaryFunc variants: this ===
2380            Expression::Upper(f)
2381            | Expression::Lower(f)
2382            | Expression::Length(f)
2383            | Expression::LTrim(f)
2384            | Expression::RTrim(f)
2385            | Expression::Reverse(f)
2386            | Expression::Abs(f)
2387            | Expression::Sqrt(f)
2388            | Expression::Cbrt(f)
2389            | Expression::Ln(f)
2390            | Expression::Exp(f)
2391            | Expression::Sign(f)
2392            | Expression::Date(f)
2393            | Expression::Time(f)
2394            | Expression::DateFromUnixDate(f)
2395            | Expression::UnixDate(f)
2396            | Expression::UnixSeconds(f)
2397            | Expression::UnixMillis(f)
2398            | Expression::UnixMicros(f)
2399            | Expression::TimeStrToDate(f)
2400            | Expression::DateToDi(f)
2401            | Expression::DiToDate(f)
2402            | Expression::TsOrDiToDi(f)
2403            | Expression::TsOrDsToDatetime(f)
2404            | Expression::TsOrDsToTimestamp(f)
2405            | Expression::YearOfWeek(f)
2406            | Expression::YearOfWeekIso(f)
2407            | Expression::Initcap(f)
2408            | Expression::Ascii(f)
2409            | Expression::Chr(f)
2410            | Expression::Soundex(f)
2411            | Expression::ByteLength(f)
2412            | Expression::Hex(f)
2413            | Expression::LowerHex(f)
2414            | Expression::Unicode(f)
2415            | Expression::Radians(f)
2416            | Expression::Degrees(f)
2417            | Expression::Sin(f)
2418            | Expression::Cos(f)
2419            | Expression::Tan(f)
2420            | Expression::Asin(f)
2421            | Expression::Acos(f)
2422            | Expression::Atan(f)
2423            | Expression::IsNan(f)
2424            | Expression::IsInf(f)
2425            | Expression::ArrayLength(f)
2426            | Expression::ArraySize(f)
2427            | Expression::Cardinality(f)
2428            | Expression::ArrayReverse(f)
2429            | Expression::ArrayDistinct(f)
2430            | Expression::ArrayFlatten(f)
2431            | Expression::ArrayCompact(f)
2432            | Expression::Explode(f)
2433            | Expression::ExplodeOuter(f)
2434            | Expression::ToArray(f)
2435            | Expression::MapFromEntries(f)
2436            | Expression::MapKeys(f)
2437            | Expression::MapValues(f)
2438            | Expression::JsonArrayLength(f)
2439            | Expression::JsonKeys(f)
2440            | Expression::JsonType(f)
2441            | Expression::ParseJson(f)
2442            | Expression::ToJson(f)
2443            | Expression::Typeof(f)
2444            | Expression::BitwiseCount(f)
2445            | Expression::Year(f)
2446            | Expression::Month(f)
2447            | Expression::Day(f)
2448            | Expression::Hour(f)
2449            | Expression::Minute(f)
2450            | Expression::Second(f)
2451            | Expression::DayOfWeek(f)
2452            | Expression::DayOfWeekIso(f)
2453            | Expression::DayOfMonth(f)
2454            | Expression::DayOfYear(f)
2455            | Expression::WeekOfYear(f)
2456            | Expression::Quarter(f)
2457            | Expression::Epoch(f)
2458            | Expression::EpochMs(f)
2459            | Expression::TimeStrToUnix(f)
2460            | Expression::SHA(f)
2461            | Expression::SHA1Digest(f)
2462            | Expression::TimeToUnix(f)
2463            | Expression::JSONBool(f)
2464            | Expression::Int64(f)
2465            | Expression::MD5NumberLower64(f)
2466            | Expression::MD5NumberUpper64(f)
2467            | Expression::DateStrToDate(f)
2468            | Expression::DateToDateStr(f) => {
2469                stack.push(&f.this);
2470            }
2471
2472            // === BinaryFunc variants: this, expression ===
2473            Expression::Power(f)
2474            | Expression::NullIf(f)
2475            | Expression::IfNull(f)
2476            | Expression::Nvl(f)
2477            | Expression::UnixToTimeStr(f)
2478            | Expression::Contains(f)
2479            | Expression::StartsWith(f)
2480            | Expression::EndsWith(f)
2481            | Expression::Levenshtein(f)
2482            | Expression::ModFunc(f)
2483            | Expression::Atan2(f)
2484            | Expression::IntDiv(f)
2485            | Expression::AddMonths(f)
2486            | Expression::MonthsBetween(f)
2487            | Expression::NextDay(f)
2488            | Expression::ArrayContains(f)
2489            | Expression::ArrayPosition(f)
2490            | Expression::ArrayAppend(f)
2491            | Expression::ArrayPrepend(f)
2492            | Expression::ArrayUnion(f)
2493            | Expression::ArrayExcept(f)
2494            | Expression::ArrayRemove(f)
2495            | Expression::StarMap(f)
2496            | Expression::MapFromArrays(f)
2497            | Expression::MapContainsKey(f)
2498            | Expression::ElementAt(f)
2499            | Expression::JsonMergePatch(f)
2500            | Expression::JSONBContains(f)
2501            | Expression::JSONBExtract(f) => {
2502                stack.push(&f.this);
2503                stack.push(&f.expression);
2504            }
2505
2506            // === VarArgFunc variants: expressions ===
2507            Expression::Greatest(f)
2508            | Expression::Least(f)
2509            | Expression::Coalesce(f)
2510            | Expression::ArrayConcat(f)
2511            | Expression::ArrayIntersect(f)
2512            | Expression::ArrayZip(f)
2513            | Expression::MapConcat(f)
2514            | Expression::JsonArray(f) => {
2515                for e in &f.expressions {
2516                    stack.push(e);
2517                }
2518            }
2519
2520            // === AggFunc variants: this, filter, having_max, limit ===
2521            Expression::Sum(f)
2522            | Expression::Avg(f)
2523            | Expression::Min(f)
2524            | Expression::Max(f)
2525            | Expression::ArrayAgg(f)
2526            | Expression::CountIf(f)
2527            | Expression::Stddev(f)
2528            | Expression::StddevPop(f)
2529            | Expression::StddevSamp(f)
2530            | Expression::Variance(f)
2531            | Expression::VarPop(f)
2532            | Expression::VarSamp(f)
2533            | Expression::Median(f)
2534            | Expression::Mode(f)
2535            | Expression::First(f)
2536            | Expression::Last(f)
2537            | Expression::AnyValue(f)
2538            | Expression::ApproxDistinct(f)
2539            | Expression::ApproxCountDistinct(f)
2540            | Expression::LogicalAnd(f)
2541            | Expression::LogicalOr(f)
2542            | Expression::Skewness(f)
2543            | Expression::ArrayConcatAgg(f)
2544            | Expression::ArrayUniqueAgg(f)
2545            | Expression::BoolXorAgg(f)
2546            | Expression::BitwiseAndAgg(f)
2547            | Expression::BitwiseOrAgg(f)
2548            | Expression::BitwiseXorAgg(f) => {
2549                stack.push(&f.this);
2550                if let Some(ref filter) = f.filter {
2551                    stack.push(filter);
2552                }
2553                if let Some((ref expr, _)) = f.having_max {
2554                    stack.push(expr);
2555                }
2556                if let Some(ref limit) = f.limit {
2557                    stack.push(limit);
2558                }
2559            }
2560
2561            // === Generic Function / AggregateFunction: args ===
2562            Expression::Function(func) => {
2563                for arg in &func.args {
2564                    stack.push(arg);
2565                }
2566            }
2567            Expression::AggregateFunction(func) => {
2568                for arg in &func.args {
2569                    stack.push(arg);
2570                }
2571                if let Some(ref filter) = func.filter {
2572                    stack.push(filter);
2573                }
2574                if let Some(ref limit) = func.limit {
2575                    stack.push(limit);
2576                }
2577            }
2578
2579            // === WindowFunction: this (skip Over for lineage purposes) ===
2580            Expression::WindowFunction(wf) => {
2581                stack.push(&wf.this);
2582                for e in &wf.over.partition_by {
2583                    stack.push(e);
2584                }
2585                for e in &wf.over.order_by {
2586                    stack.push(&e.this);
2587                }
2588                if let Some(keep) = &wf.keep {
2589                    for e in &keep.order_by {
2590                        stack.push(&e.this);
2591                    }
2592                }
2593                if let (Some(window_name), Some(named_windows)) =
2594                    (&wf.over.window_name, named_windows)
2595                {
2596                    for named_window in named_windows {
2597                        if named_window
2598                            .name
2599                            .name
2600                            .eq_ignore_ascii_case(&window_name.name)
2601                        {
2602                            for e in &named_window.spec.partition_by {
2603                                stack.push(e);
2604                            }
2605                            for e in &named_window.spec.order_by {
2606                                stack.push(&e.this);
2607                            }
2608                        }
2609                    }
2610                }
2611            }
2612
2613            // === Containers and special expressions ===
2614            Expression::Alias(a) => {
2615                stack.push(&a.this);
2616            }
2617            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
2618                stack.push(&c.this);
2619                if let Some(ref fmt) = c.format {
2620                    stack.push(fmt);
2621                }
2622                if let Some(ref def) = c.default {
2623                    stack.push(def);
2624                }
2625            }
2626            Expression::Paren(p) => {
2627                stack.push(&p.this);
2628            }
2629            Expression::Annotated(a) => {
2630                stack.push(&a.this);
2631            }
2632            Expression::Case(case) => {
2633                if let Some(ref operand) = case.operand {
2634                    stack.push(operand);
2635                }
2636                for (cond, result) in &case.whens {
2637                    stack.push(cond);
2638                    stack.push(result);
2639                }
2640                if let Some(ref else_expr) = case.else_ {
2641                    stack.push(else_expr);
2642                }
2643            }
2644            Expression::Collation(c) => {
2645                stack.push(&c.this);
2646            }
2647            Expression::In(i) => {
2648                stack.push(&i.this);
2649                for e in &i.expressions {
2650                    stack.push(e);
2651                }
2652                if let Some(ref q) = i.query {
2653                    stack.push(q);
2654                }
2655                if let Some(ref u) = i.unnest {
2656                    stack.push(u);
2657                }
2658            }
2659            Expression::Between(b) => {
2660                stack.push(&b.this);
2661                stack.push(&b.low);
2662                stack.push(&b.high);
2663            }
2664            Expression::IsNull(n) => {
2665                stack.push(&n.this);
2666            }
2667            Expression::IsTrue(t) | Expression::IsFalse(t) => {
2668                stack.push(&t.this);
2669            }
2670            Expression::IsJson(j) => {
2671                stack.push(&j.this);
2672            }
2673            Expression::Like(l) | Expression::ILike(l) => {
2674                stack.push(&l.left);
2675                stack.push(&l.right);
2676                if let Some(ref esc) = l.escape {
2677                    stack.push(esc);
2678                }
2679            }
2680            Expression::SimilarTo(s) => {
2681                stack.push(&s.this);
2682                stack.push(&s.pattern);
2683                if let Some(ref esc) = s.escape {
2684                    stack.push(esc);
2685                }
2686            }
2687            Expression::Ordered(o) => {
2688                stack.push(&o.this);
2689            }
2690            Expression::Array(a) => {
2691                for e in &a.expressions {
2692                    stack.push(e);
2693                }
2694            }
2695            Expression::Tuple(t) => {
2696                for e in &t.expressions {
2697                    stack.push(e);
2698                }
2699            }
2700            Expression::Struct(s) => {
2701                for (_, e) in &s.fields {
2702                    stack.push(e);
2703                }
2704            }
2705            Expression::Subscript(s) => {
2706                stack.push(&s.this);
2707                stack.push(&s.index);
2708            }
2709            Expression::Dot(d) => {
2710                stack.push(&d.this);
2711            }
2712            Expression::MethodCall(m) => {
2713                if !matches!(dialect, Some(DialectType::BigQuery))
2714                    || !is_bigquery_safe_namespace_receiver(&m.this)
2715                {
2716                    stack.push(&m.this);
2717                }
2718                for arg in &m.args {
2719                    stack.push(arg);
2720                }
2721            }
2722            Expression::ArraySlice(s) => {
2723                stack.push(&s.this);
2724                if let Some(ref start) = s.start {
2725                    stack.push(start);
2726                }
2727                if let Some(ref end) = s.end {
2728                    stack.push(end);
2729                }
2730            }
2731            Expression::Lambda(l) => {
2732                stack.push(&l.body);
2733            }
2734            Expression::NamedArgument(n) => {
2735                stack.push(&n.value);
2736            }
2737            Expression::Lateral(l) => {
2738                stack.push(&l.this);
2739                if let Some(ref view) = l.view {
2740                    stack.push(view);
2741                }
2742                if let Some(ref outer) = l.outer {
2743                    stack.push(outer);
2744                }
2745                if let Some(ref ordinality) = l.ordinality {
2746                    stack.push(ordinality);
2747                }
2748            }
2749            Expression::LateralView(lv) => {
2750                stack.push(&lv.this);
2751            }
2752            Expression::TryCatch(t) => {
2753                for stmt in &t.try_body {
2754                    stack.push(stmt);
2755                }
2756                if let Some(catch_body) = &t.catch_body {
2757                    for stmt in catch_body {
2758                        stack.push(stmt);
2759                    }
2760                }
2761            }
2762            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
2763                stack.push(e);
2764            }
2765
2766            // === Custom function structs ===
2767            Expression::Substring(f) => {
2768                stack.push(&f.this);
2769                stack.push(&f.start);
2770                if let Some(ref len) = f.length {
2771                    stack.push(len);
2772                }
2773            }
2774            Expression::Trim(f) => {
2775                stack.push(&f.this);
2776                if let Some(ref chars) = f.characters {
2777                    stack.push(chars);
2778                }
2779            }
2780            Expression::Replace(f) => {
2781                stack.push(&f.this);
2782                stack.push(&f.old);
2783                stack.push(&f.new);
2784            }
2785            Expression::IfFunc(f) => {
2786                stack.push(&f.condition);
2787                stack.push(&f.true_value);
2788                if let Some(ref fv) = f.false_value {
2789                    stack.push(fv);
2790                }
2791            }
2792            Expression::Nvl2(f) => {
2793                stack.push(&f.this);
2794                stack.push(&f.true_value);
2795                stack.push(&f.false_value);
2796            }
2797            Expression::ConcatWs(f) => {
2798                stack.push(&f.separator);
2799                for e in &f.expressions {
2800                    stack.push(e);
2801                }
2802            }
2803            Expression::Count(f) => {
2804                if let Some(ref this) = f.this {
2805                    stack.push(this);
2806                }
2807                if let Some(ref filter) = f.filter {
2808                    stack.push(filter);
2809                }
2810            }
2811            Expression::GroupConcat(f) => {
2812                stack.push(&f.this);
2813                if let Some(ref sep) = f.separator {
2814                    stack.push(sep);
2815                }
2816                if let Some(ref filter) = f.filter {
2817                    stack.push(filter);
2818                }
2819            }
2820            Expression::StringAgg(f) => {
2821                stack.push(&f.this);
2822                if let Some(ref sep) = f.separator {
2823                    stack.push(sep);
2824                }
2825                if let Some(ref filter) = f.filter {
2826                    stack.push(filter);
2827                }
2828                if let Some(ref limit) = f.limit {
2829                    stack.push(limit);
2830                }
2831            }
2832            Expression::ListAgg(f) => {
2833                stack.push(&f.this);
2834                if let Some(ref sep) = f.separator {
2835                    stack.push(sep);
2836                }
2837                if let Some(ref filter) = f.filter {
2838                    stack.push(filter);
2839                }
2840            }
2841            Expression::SumIf(f) => {
2842                stack.push(&f.this);
2843                stack.push(&f.condition);
2844                if let Some(ref filter) = f.filter {
2845                    stack.push(filter);
2846                }
2847            }
2848            Expression::DateAdd(f) | Expression::DateSub(f) => {
2849                stack.push(&f.this);
2850                stack.push(&f.interval);
2851            }
2852            Expression::DateDiff(f) => {
2853                stack.push(&f.this);
2854                stack.push(&f.expression);
2855            }
2856            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
2857                stack.push(&f.this);
2858            }
2859            Expression::Extract(f) => {
2860                stack.push(&f.this);
2861            }
2862            Expression::Round(f) => {
2863                stack.push(&f.this);
2864                if let Some(ref d) = f.decimals {
2865                    stack.push(d);
2866                }
2867            }
2868            Expression::Floor(f) => {
2869                stack.push(&f.this);
2870                if let Some(ref s) = f.scale {
2871                    stack.push(s);
2872                }
2873                if let Some(ref t) = f.to {
2874                    stack.push(t);
2875                }
2876            }
2877            Expression::Ceil(f) => {
2878                stack.push(&f.this);
2879                if let Some(ref d) = f.decimals {
2880                    stack.push(d);
2881                }
2882                if let Some(ref t) = f.to {
2883                    stack.push(t);
2884                }
2885            }
2886            Expression::Log(f) => {
2887                stack.push(&f.this);
2888                if let Some(ref b) = f.base {
2889                    stack.push(b);
2890                }
2891            }
2892            Expression::AtTimeZone(f) => {
2893                stack.push(&f.this);
2894                stack.push(&f.zone);
2895            }
2896            Expression::Lead(f) | Expression::Lag(f) => {
2897                stack.push(&f.this);
2898                if let Some(ref off) = f.offset {
2899                    stack.push(off);
2900                }
2901                if let Some(ref def) = f.default {
2902                    stack.push(def);
2903                }
2904            }
2905            Expression::FirstValue(f) | Expression::LastValue(f) => {
2906                stack.push(&f.this);
2907            }
2908            Expression::NthValue(f) => {
2909                stack.push(&f.this);
2910                stack.push(&f.offset);
2911            }
2912            Expression::Position(f) => {
2913                stack.push(&f.substring);
2914                stack.push(&f.string);
2915                if let Some(ref start) = f.start {
2916                    stack.push(start);
2917                }
2918            }
2919            Expression::Decode(f) => {
2920                stack.push(&f.this);
2921                for (search, result) in &f.search_results {
2922                    stack.push(search);
2923                    stack.push(result);
2924                }
2925                if let Some(ref def) = f.default {
2926                    stack.push(def);
2927                }
2928            }
2929            Expression::CharFunc(f) => {
2930                for arg in &f.args {
2931                    stack.push(arg);
2932                }
2933            }
2934            Expression::ArraySort(f) => {
2935                stack.push(&f.this);
2936                if let Some(ref cmp) = f.comparator {
2937                    stack.push(cmp);
2938                }
2939            }
2940            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2941                stack.push(&f.this);
2942                stack.push(&f.separator);
2943                if let Some(ref nr) = f.null_replacement {
2944                    stack.push(nr);
2945                }
2946            }
2947            Expression::ArrayFilter(f) => {
2948                stack.push(&f.this);
2949                stack.push(&f.filter);
2950            }
2951            Expression::ArrayTransform(f) => {
2952                stack.push(&f.this);
2953                stack.push(&f.transform);
2954            }
2955            Expression::Sequence(f)
2956            | Expression::Generate(f)
2957            | Expression::ExplodingGenerateSeries(f) => {
2958                stack.push(&f.start);
2959                stack.push(&f.stop);
2960                if let Some(ref step) = f.step {
2961                    stack.push(step);
2962                }
2963            }
2964            Expression::JsonExtract(f)
2965            | Expression::JsonExtractScalar(f)
2966            | Expression::JsonQuery(f)
2967            | Expression::JsonValue(f) => {
2968                stack.push(&f.this);
2969                stack.push(&f.path);
2970            }
2971            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2972                stack.push(&f.this);
2973                for p in &f.paths {
2974                    stack.push(p);
2975                }
2976            }
2977            Expression::JsonObject(f) => {
2978                for (k, v) in &f.pairs {
2979                    stack.push(k);
2980                    stack.push(v);
2981                }
2982            }
2983            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2984                stack.push(&f.this);
2985                for (path, val) in &f.path_values {
2986                    stack.push(path);
2987                    stack.push(val);
2988                }
2989            }
2990            Expression::Overlay(f) => {
2991                stack.push(&f.this);
2992                stack.push(&f.replacement);
2993                stack.push(&f.from);
2994                if let Some(ref len) = f.length {
2995                    stack.push(len);
2996                }
2997            }
2998            Expression::Convert(f) => {
2999                stack.push(&f.this);
3000                if let Some(ref style) = f.style {
3001                    stack.push(style);
3002                }
3003            }
3004            Expression::ApproxPercentile(f) => {
3005                stack.push(&f.this);
3006                stack.push(&f.percentile);
3007                if let Some(ref acc) = f.accuracy {
3008                    stack.push(acc);
3009                }
3010                if let Some(ref filter) = f.filter {
3011                    stack.push(filter);
3012                }
3013            }
3014            Expression::Percentile(f)
3015            | Expression::PercentileCont(f)
3016            | Expression::PercentileDisc(f) => {
3017                stack.push(&f.this);
3018                stack.push(&f.percentile);
3019                if let Some(ref filter) = f.filter {
3020                    stack.push(filter);
3021                }
3022            }
3023            Expression::WithinGroup(f) => {
3024                stack.push(&f.this);
3025                for e in &f.order_by {
3026                    stack.push(&e.this);
3027                }
3028            }
3029            Expression::Left(f) | Expression::Right(f) => {
3030                stack.push(&f.this);
3031                stack.push(&f.length);
3032            }
3033            Expression::Repeat(f) => {
3034                stack.push(&f.this);
3035                stack.push(&f.times);
3036            }
3037            Expression::Lpad(f) | Expression::Rpad(f) => {
3038                stack.push(&f.this);
3039                stack.push(&f.length);
3040                if let Some(ref fill) = f.fill {
3041                    stack.push(fill);
3042                }
3043            }
3044            Expression::Split(f) => {
3045                stack.push(&f.this);
3046                stack.push(&f.delimiter);
3047            }
3048            Expression::RegexpLike(f) => {
3049                stack.push(&f.this);
3050                stack.push(&f.pattern);
3051                if let Some(ref flags) = f.flags {
3052                    stack.push(flags);
3053                }
3054            }
3055            Expression::RegexpReplace(f) => {
3056                stack.push(&f.this);
3057                stack.push(&f.pattern);
3058                stack.push(&f.replacement);
3059                if let Some(ref flags) = f.flags {
3060                    stack.push(flags);
3061                }
3062            }
3063            Expression::RegexpExtract(f) => {
3064                stack.push(&f.this);
3065                stack.push(&f.pattern);
3066                if let Some(ref group) = f.group {
3067                    stack.push(group);
3068                }
3069            }
3070            Expression::ToDate(f) => {
3071                stack.push(&f.this);
3072                if let Some(ref fmt) = f.format {
3073                    stack.push(fmt);
3074                }
3075            }
3076            Expression::ToTimestamp(f) => {
3077                stack.push(&f.this);
3078                if let Some(ref fmt) = f.format {
3079                    stack.push(fmt);
3080                }
3081            }
3082            Expression::DateFormat(f) | Expression::FormatDate(f) => {
3083                stack.push(&f.this);
3084                stack.push(&f.format);
3085            }
3086            Expression::LastDay(f) => {
3087                stack.push(&f.this);
3088            }
3089            Expression::FromUnixtime(f) => {
3090                stack.push(&f.this);
3091                if let Some(ref fmt) = f.format {
3092                    stack.push(fmt);
3093                }
3094            }
3095            Expression::UnixTimestamp(f) => {
3096                if let Some(ref this) = f.this {
3097                    stack.push(this);
3098                }
3099                if let Some(ref fmt) = f.format {
3100                    stack.push(fmt);
3101                }
3102            }
3103            Expression::MakeDate(f) => {
3104                stack.push(&f.year);
3105                stack.push(&f.month);
3106                stack.push(&f.day);
3107            }
3108            Expression::MakeTimestamp(f) => {
3109                stack.push(&f.year);
3110                stack.push(&f.month);
3111                stack.push(&f.day);
3112                stack.push(&f.hour);
3113                stack.push(&f.minute);
3114                stack.push(&f.second);
3115                if let Some(ref tz) = f.timezone {
3116                    stack.push(tz);
3117                }
3118            }
3119            Expression::TruncFunc(f) => {
3120                stack.push(&f.this);
3121                if let Some(ref d) = f.decimals {
3122                    stack.push(d);
3123                }
3124            }
3125            Expression::ArrayFunc(f) => {
3126                for e in &f.expressions {
3127                    stack.push(e);
3128                }
3129            }
3130            Expression::Unnest(f) => {
3131                stack.push(&f.this);
3132                for e in &f.expressions {
3133                    stack.push(e);
3134                }
3135            }
3136            Expression::StructFunc(f) => {
3137                for (_, e) in &f.fields {
3138                    stack.push(e);
3139                }
3140            }
3141            Expression::StructExtract(f) => {
3142                stack.push(&f.this);
3143            }
3144            Expression::NamedStruct(f) => {
3145                for (k, v) in &f.pairs {
3146                    stack.push(k);
3147                    stack.push(v);
3148                }
3149            }
3150            Expression::MapFunc(f) => {
3151                for k in &f.keys {
3152                    stack.push(k);
3153                }
3154                for v in &f.values {
3155                    stack.push(v);
3156                }
3157            }
3158            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
3159                stack.push(&f.this);
3160                stack.push(&f.transform);
3161            }
3162            Expression::JsonArrayAgg(f) => {
3163                stack.push(&f.this);
3164                if let Some(ref filter) = f.filter {
3165                    stack.push(filter);
3166                }
3167            }
3168            Expression::JsonObjectAgg(f) => {
3169                stack.push(&f.key);
3170                stack.push(&f.value);
3171                if let Some(ref filter) = f.filter {
3172                    stack.push(filter);
3173                }
3174            }
3175            Expression::NTile(f) => {
3176                if let Some(ref n) = f.num_buckets {
3177                    stack.push(n);
3178                }
3179            }
3180            Expression::Rand(f) => {
3181                if let Some(ref s) = f.seed {
3182                    stack.push(s);
3183                }
3184                if let Some(ref lo) = f.lower {
3185                    stack.push(lo);
3186                }
3187                if let Some(ref hi) = f.upper {
3188                    stack.push(hi);
3189                }
3190            }
3191            Expression::Any(q) | Expression::All(q) => {
3192                stack.push(&q.this);
3193                stack.push(&q.subquery);
3194            }
3195            Expression::Overlaps(o) => {
3196                if let Some(ref this) = o.this {
3197                    stack.push(this);
3198                }
3199                if let Some(ref expr) = o.expression {
3200                    stack.push(expr);
3201                }
3202                if let Some(ref ls) = o.left_start {
3203                    stack.push(ls);
3204                }
3205                if let Some(ref le) = o.left_end {
3206                    stack.push(le);
3207                }
3208                if let Some(ref rs) = o.right_start {
3209                    stack.push(rs);
3210                }
3211                if let Some(ref re) = o.right_end {
3212                    stack.push(re);
3213                }
3214            }
3215            Expression::Interval(i) => {
3216                if let Some(ref this) = i.this {
3217                    stack.push(this);
3218                }
3219            }
3220            Expression::TimeStrToTime(f) => {
3221                stack.push(&f.this);
3222                if let Some(ref zone) = f.zone {
3223                    stack.push(zone);
3224                }
3225            }
3226            Expression::JSONBExtractScalar(f) => {
3227                stack.push(&f.this);
3228                stack.push(&f.expression);
3229                if let Some(ref jt) = f.json_type {
3230                    stack.push(jt);
3231                }
3232            }
3233            Expression::JSONExtract(f) => {
3234                stack.push(&f.this);
3235                stack.push(&f.expression);
3236                for e in &f.expressions {
3237                    stack.push(e);
3238                }
3239                if let Some(ref option) = f.option {
3240                    stack.push(option);
3241                }
3242                if let Some(ref on_condition) = f.on_condition {
3243                    stack.push(on_condition);
3244                }
3245            }
3246
3247            // === True leaves and non-expression-bearing nodes ===
3248            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
3249            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
3250            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
3251            _ => {}
3252        }
3253    }
3254}
3255
3256// ---------------------------------------------------------------------------
3257// Tests
3258// ---------------------------------------------------------------------------
3259
3260#[cfg(test)]
3261mod tests {
3262    use super::*;
3263    use crate::dialects::{Dialect, DialectType};
3264    use crate::expressions::DataType;
3265    use crate::optimizer::annotate_types::annotate_types;
3266    use crate::parse_one;
3267    use crate::schema::{MappingSchema, Schema};
3268
3269    fn parse(sql: &str) -> Expression {
3270        let dialect = Dialect::get(DialectType::Generic);
3271        let ast = dialect.parse(sql).unwrap();
3272        ast.into_iter().next().unwrap()
3273    }
3274
3275    fn parse_dialect(sql: &str, dialect_type: DialectType) -> Expression {
3276        let dialect = Dialect::get(dialect_type);
3277        let ast = dialect.parse(sql).unwrap();
3278        ast.into_iter().next().unwrap()
3279    }
3280
3281    fn lineage_names(node: &LineageNode) -> Vec<String> {
3282        node.walk().map(|n| n.name.clone()).collect()
3283    }
3284
3285    fn assert_lineage_contains(node: &LineageNode, expected: &str) {
3286        let names = lineage_names(node);
3287        assert!(
3288            names.iter().any(|name| name == expected),
3289            "expected {expected} in lineage, got {names:?}"
3290        );
3291    }
3292
3293    #[test]
3294    fn test_simple_lineage() {
3295        let expr = parse("SELECT a FROM t");
3296        let node = lineage("a", &expr, None, false).unwrap();
3297
3298        assert_eq!(node.name, "a");
3299        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
3300        // Should trace to t.a
3301        let names = node.downstream_names();
3302        assert!(
3303            names.iter().any(|n| n == "t.a"),
3304            "Expected t.a in downstream, got: {:?}",
3305            names
3306        );
3307    }
3308
3309    #[test]
3310    fn test_lineage_walk() {
3311        let root = LineageNode {
3312            name: "col_a".to_string(),
3313            expression: Expression::Null(crate::expressions::Null),
3314            source: Expression::Null(crate::expressions::Null),
3315            downstream: vec![LineageNode::new(
3316                "t.a",
3317                Expression::Null(crate::expressions::Null),
3318                Expression::Null(crate::expressions::Null),
3319            )],
3320            source_name: String::new(),
3321            source_kind: SourceKind::Unknown,
3322            source_alias: None,
3323            reference_node_name: String::new(),
3324        };
3325
3326        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
3327        assert_eq!(names.len(), 2);
3328        assert_eq!(names[0], "col_a");
3329        assert_eq!(names[1], "t.a");
3330    }
3331
3332    #[test]
3333    fn test_aliased_column() {
3334        let expr = parse("SELECT a + 1 AS b FROM t");
3335        let node = lineage("b", &expr, None, false).unwrap();
3336
3337        assert_eq!(node.name, "b");
3338        // Should trace through the expression to t.a
3339        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3340        assert!(
3341            all_names.iter().any(|n| n.contains("a")),
3342            "Expected to trace to column a, got: {:?}",
3343            all_names
3344        );
3345    }
3346
3347    #[test]
3348    fn test_qualified_column() {
3349        let expr = parse("SELECT t.a FROM t");
3350        let node = lineage("a", &expr, None, false).unwrap();
3351
3352        assert_eq!(node.name, "a");
3353        let names = node.downstream_names();
3354        assert!(
3355            names.iter().any(|n| n == "t.a"),
3356            "Expected t.a, got: {:?}",
3357            names
3358        );
3359    }
3360
3361    #[test]
3362    fn test_unqualified_column() {
3363        let expr = parse("SELECT a FROM t");
3364        let node = lineage("a", &expr, None, false).unwrap();
3365
3366        // Unqualified but single source → resolved to t.a
3367        let names = node.downstream_names();
3368        assert!(
3369            names.iter().any(|n| n == "t.a"),
3370            "Expected t.a, got: {:?}",
3371            names
3372        );
3373    }
3374
3375    #[test]
3376    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
3377        let query = "SELECT name FROM users";
3378        let dialect = Dialect::get(DialectType::BigQuery);
3379        let expr = dialect
3380            .parse(query)
3381            .unwrap()
3382            .into_iter()
3383            .next()
3384            .expect("expected one expression");
3385
3386        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3387        schema
3388            .add_table("users", &[("name".into(), DataType::Text)], None)
3389            .expect("schema setup");
3390
3391        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
3392            .expect("lineage without schema");
3393        let mut expr_without = node_without_schema.expression.clone();
3394        annotate_types(
3395            &mut expr_without,
3396            Some(&schema),
3397            Some(DialectType::BigQuery),
3398        );
3399        assert_eq!(
3400            expr_without.inferred_type(),
3401            None,
3402            "Expected unresolved root type without schema-aware lineage qualification"
3403        );
3404
3405        let node_with_schema = lineage_with_schema(
3406            "name",
3407            &expr,
3408            Some(&schema),
3409            Some(DialectType::BigQuery),
3410            false,
3411        )
3412        .expect("lineage with schema");
3413        let mut expr_with = node_with_schema.expression.clone();
3414        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
3415
3416        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
3417    }
3418
3419    #[test]
3420    fn test_lineage_with_schema_correlated_scalar_subquery() {
3421        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
3422        let dialect = Dialect::get(DialectType::BigQuery);
3423        let expr = dialect
3424            .parse(query)
3425            .unwrap()
3426            .into_iter()
3427            .next()
3428            .expect("expected one expression");
3429
3430        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3431        schema
3432            .add_table(
3433                "t1",
3434                &[("id".into(), DataType::BigInt { length: None })],
3435                None,
3436            )
3437            .expect("schema setup");
3438        schema
3439            .add_table(
3440                "t2",
3441                &[
3442                    ("id".into(), DataType::BigInt { length: None }),
3443                    ("val".into(), DataType::BigInt { length: None }),
3444                ],
3445                None,
3446            )
3447            .expect("schema setup");
3448
3449        let node = lineage_with_schema(
3450            "id",
3451            &expr,
3452            Some(&schema),
3453            Some(DialectType::BigQuery),
3454            false,
3455        )
3456        .expect("lineage_with_schema should handle correlated scalar subqueries");
3457
3458        assert_eq!(node.name, "id");
3459    }
3460
3461    #[test]
3462    fn test_lineage_with_schema_join_using() {
3463        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
3464        let dialect = Dialect::get(DialectType::BigQuery);
3465        let expr = dialect
3466            .parse(query)
3467            .unwrap()
3468            .into_iter()
3469            .next()
3470            .expect("expected one expression");
3471
3472        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3473        schema
3474            .add_table(
3475                "t1",
3476                &[("a".into(), DataType::BigInt { length: None })],
3477                None,
3478            )
3479            .expect("schema setup");
3480        schema
3481            .add_table(
3482                "t2",
3483                &[("a".into(), DataType::BigInt { length: None })],
3484                None,
3485            )
3486            .expect("schema setup");
3487
3488        let node = lineage_with_schema(
3489            "a",
3490            &expr,
3491            Some(&schema),
3492            Some(DialectType::BigQuery),
3493            false,
3494        )
3495        .expect("lineage_with_schema should handle JOIN USING");
3496
3497        assert_eq!(node.name, "a");
3498    }
3499
3500    #[test]
3501    fn test_lineage_with_schema_qualified_table_name() {
3502        let query = "SELECT a FROM raw.t1";
3503        let dialect = Dialect::get(DialectType::BigQuery);
3504        let expr = dialect
3505            .parse(query)
3506            .unwrap()
3507            .into_iter()
3508            .next()
3509            .expect("expected one expression");
3510
3511        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3512        schema
3513            .add_table(
3514                "raw.t1",
3515                &[("a".into(), DataType::BigInt { length: None })],
3516                None,
3517            )
3518            .expect("schema setup");
3519
3520        let node = lineage_with_schema(
3521            "a",
3522            &expr,
3523            Some(&schema),
3524            Some(DialectType::BigQuery),
3525            false,
3526        )
3527        .expect("lineage_with_schema should handle dotted schema.table names");
3528
3529        assert_eq!(node.name, "a");
3530    }
3531
3532    #[test]
3533    fn test_lineage_with_schema_none_matches_lineage() {
3534        let expr = parse("SELECT a FROM t");
3535        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
3536        let with_none =
3537            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
3538
3539        assert_eq!(with_none.name, baseline.name);
3540        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
3541    }
3542
3543    #[test]
3544    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
3545        let dialect = Dialect::get(DialectType::BigQuery);
3546        let expr = dialect
3547            .parse("SELECT Name AS name FROM teams")
3548            .unwrap()
3549            .into_iter()
3550            .next()
3551            .expect("expected one expression");
3552
3553        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3554        schema
3555            .add_table(
3556                "teams",
3557                &[("Name".into(), DataType::String { length: None })],
3558                None,
3559            )
3560            .expect("schema setup");
3561
3562        let node = lineage_with_schema(
3563            "name",
3564            &expr,
3565            Some(&schema),
3566            Some(DialectType::BigQuery),
3567            false,
3568        )
3569        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
3570
3571        let names = node.downstream_names();
3572        assert!(
3573            names.iter().any(|n| n == "teams.Name"),
3574            "Expected teams.Name in downstream, got: {:?}",
3575            names
3576        );
3577    }
3578
3579    #[test]
3580    fn test_lineage_bigquery_mixed_case_alias_lookup() {
3581        let dialect = Dialect::get(DialectType::BigQuery);
3582        let expr = dialect
3583            .parse("SELECT Name AS Name FROM teams")
3584            .unwrap()
3585            .into_iter()
3586            .next()
3587            .expect("expected one expression");
3588
3589        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
3590            .expect("lineage should resolve mixed-case aliases in BigQuery");
3591
3592        assert_eq!(node.name, "name");
3593    }
3594
3595    #[test]
3596    fn test_lineage_bigquery_unnest_alias_source_issue_209() {
3597        let expr = parse_one(
3598            r#"
3599SELECT date_val AS week_start
3600FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
3601"#,
3602            DialectType::BigQuery,
3603        )
3604        .expect("parse");
3605
3606        let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
3607            .expect("lineage should resolve UNNEST alias as a source");
3608        let child = node
3609            .downstream
3610            .first()
3611            .expect("week_start should have downstream lineage");
3612
3613        assert_eq!(child.name, "_0.date_val");
3614        assert_eq!(child.source_name, "_0");
3615        assert_eq!(child.source_kind, SourceKind::Virtual);
3616        assert_eq!(child.source_alias.as_deref(), Some("date_val"));
3617
3618        let Expression::Column(column) = &child.expression else {
3619            panic!(
3620                "expected downstream column expression, got {:?}",
3621                child.expression
3622            );
3623        };
3624        assert_eq!(column.name.name, "date_val");
3625        assert_eq!(
3626            column.table.as_ref().map(|table| table.name.as_str()),
3627            Some("_0")
3628        );
3629        assert!(
3630            matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
3631            "expected UNNEST source expression, got {:?}",
3632            child.source
3633        );
3634    }
3635
3636    #[test]
3637    fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
3638        let expr =
3639            parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
3640
3641        let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3642        let child = node.downstream.first().expect("id should have lineage");
3643
3644        assert_eq!(child.name, "date_val.id");
3645        assert_eq!(child.source_name, "date_val");
3646        assert_eq!(child.source_kind, SourceKind::Table);
3647        assert_eq!(child.source_alias, None);
3648    }
3649
3650    #[test]
3651    fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
3652        let expr = parse_one(
3653            r#"
3654SELECT a.a AS first_value, b.b AS second_value
3655FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
3656JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
3657"#,
3658            DialectType::BigQuery,
3659        )
3660        .expect("parse");
3661
3662        let first =
3663            lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3664        let second =
3665            lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3666
3667        let first_child = first.downstream.first().expect("first source");
3668        let second_child = second.downstream.first().expect("second source");
3669
3670        assert_eq!(first_child.name, "_0.a");
3671        assert_eq!(first_child.source_name, "_0");
3672        assert_eq!(first_child.source_alias.as_deref(), Some("a"));
3673        assert_eq!(first_child.source_kind, SourceKind::Virtual);
3674
3675        assert_eq!(second_child.name, "_1.b");
3676        assert_eq!(second_child.source_name, "_1");
3677        assert_eq!(second_child.source_alias.as_deref(), Some("b"));
3678        assert_eq!(second_child.source_kind, SourceKind::Virtual);
3679    }
3680
3681    #[test]
3682    fn test_lineage_table_backed_unnest_points_to_real_source_column() {
3683        let expr = parse_one(
3684            r#"
3685SELECT item.item AS item
3686FROM t JOIN UNNEST(t.items) AS item ON TRUE
3687"#,
3688            DialectType::BigQuery,
3689        )
3690        .expect("parse");
3691
3692        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3693        let virtual_child = node.downstream.first().expect("virtual item source");
3694        assert_eq!(virtual_child.name, "_0.item");
3695        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3696
3697        let real_child = virtual_child
3698            .downstream
3699            .first()
3700            .expect("UNNEST(t.items) should depend on t.items");
3701        assert_eq!(real_child.name, "t.items");
3702        assert_eq!(real_child.source_name, "t");
3703        assert_eq!(real_child.source_kind, SourceKind::Table);
3704    }
3705
3706    #[test]
3707    fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
3708        let expr = parse_one(
3709            r#"
3710SELECT item AS item
3711FROM t JOIN UNNEST(t.items) AS item ON TRUE
3712"#,
3713            DialectType::BigQuery,
3714        )
3715        .expect("parse");
3716
3717        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3718        let virtual_child = node.downstream.first().expect("virtual item source");
3719        assert_eq!(virtual_child.name, "_0.item");
3720        assert_eq!(virtual_child.source_name, "_0");
3721        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3722        assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
3723
3724        let real_child = virtual_child
3725            .downstream
3726            .first()
3727            .expect("UNNEST(t.items) should depend on t.items");
3728        assert_eq!(real_child.name, "t.items");
3729        assert_eq!(real_child.source_name, "t");
3730        assert_eq!(real_child.source_kind, SourceKind::Table);
3731    }
3732
3733    #[test]
3734    fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
3735        let cases = [
3736            (
3737                DialectType::PostgreSQL,
3738                "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
3739            ),
3740            (
3741                DialectType::Presto,
3742                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3743            ),
3744            (
3745                DialectType::Trino,
3746                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
3747            ),
3748        ];
3749
3750        for (dialect, sql) in cases {
3751            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3752            let node = lineage("out", &expr, Some(dialect), false)
3753                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3754            let virtual_child = node
3755                .downstream
3756                .first()
3757                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3758
3759            assert_eq!(
3760                virtual_child.name, "_0.x",
3761                "unexpected virtual child for {dialect:?}"
3762            );
3763            assert_eq!(virtual_child.source_name, "_0");
3764            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3765            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3766
3767            let real_child = virtual_child
3768                .downstream
3769                .first()
3770                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3771            assert_eq!(real_child.name, "t.items");
3772            assert_eq!(real_child.source_kind, SourceKind::Table);
3773        }
3774    }
3775
3776    #[test]
3777    fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
3778        let cases = [
3779            (
3780                DialectType::Spark,
3781                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3782            ),
3783            (
3784                DialectType::Hive,
3785                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
3786            ),
3787        ];
3788
3789        for (dialect, sql) in cases {
3790            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
3791            let node = lineage("out", &expr, Some(dialect), false)
3792                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
3793            let virtual_child = node
3794                .downstream
3795                .first()
3796                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
3797
3798            assert_eq!(virtual_child.name, "_0.x");
3799            assert_eq!(virtual_child.source_name, "_0");
3800            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3801            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
3802
3803            let real_child = virtual_child
3804                .downstream
3805                .first()
3806                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
3807            assert_eq!(real_child.name, "t.items");
3808            assert_eq!(real_child.source_kind, SourceKind::Table);
3809        }
3810    }
3811
3812    #[test]
3813    fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
3814        let expr = parse_one(
3815            "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
3816            DialectType::Snowflake,
3817        )
3818        .expect("parse");
3819
3820        let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
3821        let virtual_child = node.downstream.first().expect("virtual flatten source");
3822        assert_eq!(virtual_child.name, "_0.value");
3823        assert_eq!(virtual_child.source_name, "_0");
3824        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3825        assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
3826
3827        let real_child = virtual_child
3828            .downstream
3829            .first()
3830            .expect("FLATTEN input should depend on raw_events.payload");
3831        assert_eq!(real_child.name, "raw_events.payload");
3832        assert_eq!(real_child.source_kind, SourceKind::Table);
3833    }
3834
3835    #[test]
3836    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
3837        let expr = parse_one(
3838            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3839            DialectType::Snowflake,
3840        )
3841        .expect("parse");
3842
3843        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3844        schema
3845            .add_table(
3846                "fact.some_daily_metrics",
3847                &[("date_utc".to_string(), DataType::Date)],
3848                None,
3849            )
3850            .expect("schema setup");
3851
3852        let node = lineage_with_schema(
3853            "recency",
3854            &expr,
3855            Some(&schema),
3856            Some(DialectType::Snowflake),
3857            false,
3858        )
3859        .expect("lineage_with_schema should not treat date part as a column");
3860
3861        let names = node.downstream_names();
3862        assert!(
3863            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3864            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3865            names
3866        );
3867        assert!(
3868            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3869            "Did not expect date part to appear as lineage column, got: {:?}",
3870            names
3871        );
3872    }
3873
3874    #[test]
3875    fn test_snowflake_datediff_parses_to_typed_ast() {
3876        let expr = parse_one(
3877            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
3878            DialectType::Snowflake,
3879        )
3880        .expect("parse");
3881
3882        match expr {
3883            Expression::Select(select) => match &select.expressions[0] {
3884                Expression::Alias(alias) => match &alias.this {
3885                    Expression::DateDiff(f) => {
3886                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
3887                    }
3888                    other => panic!("expected DateDiff, got {other:?}"),
3889                },
3890                other => panic!("expected Alias, got {other:?}"),
3891            },
3892            other => panic!("expected Select, got {other:?}"),
3893        }
3894    }
3895
3896    #[test]
3897    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
3898        let expr = parse_one(
3899            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
3900            DialectType::Snowflake,
3901        )
3902        .expect("parse");
3903
3904        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3905        schema
3906            .add_table(
3907                "fact.some_daily_metrics",
3908                &[("date_utc".to_string(), DataType::Date)],
3909                None,
3910            )
3911            .expect("schema setup");
3912
3913        let node = lineage_with_schema(
3914            "next_day",
3915            &expr,
3916            Some(&schema),
3917            Some(DialectType::Snowflake),
3918            false,
3919        )
3920        .expect("lineage_with_schema should not treat DATEADD date part as a column");
3921
3922        let names = node.downstream_names();
3923        assert!(
3924            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3925            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3926            names
3927        );
3928        assert!(
3929            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3930            "Did not expect date part to appear as lineage column, got: {:?}",
3931            names
3932        );
3933    }
3934
3935    #[test]
3936    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
3937        let expr = parse_one(
3938            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
3939            DialectType::Snowflake,
3940        )
3941        .expect("parse");
3942
3943        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3944        schema
3945            .add_table(
3946                "fact.some_daily_metrics",
3947                &[("date_utc".to_string(), DataType::Date)],
3948                None,
3949            )
3950            .expect("schema setup");
3951
3952        let node = lineage_with_schema(
3953            "day_part",
3954            &expr,
3955            Some(&schema),
3956            Some(DialectType::Snowflake),
3957            false,
3958        )
3959        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
3960
3961        let names = node.downstream_names();
3962        assert!(
3963            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
3964            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
3965            names
3966        );
3967        assert!(
3968            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
3969            "Did not expect date part to appear as lineage column, got: {:?}",
3970            names
3971        );
3972    }
3973
3974    #[test]
3975    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
3976        let expr = parse_one(
3977            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
3978            DialectType::Snowflake,
3979        )
3980        .expect("parse");
3981
3982        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
3983        schema
3984            .add_table(
3985                "fact.some_daily_metrics",
3986                &[("date_utc".to_string(), DataType::Date)],
3987                None,
3988            )
3989            .expect("schema setup");
3990
3991        let node = lineage_with_schema(
3992            "day_part",
3993            &expr,
3994            Some(&schema),
3995            Some(DialectType::Snowflake),
3996            false,
3997        )
3998        .expect("quoted DATE_PART should continue to work");
3999
4000        let names = node.downstream_names();
4001        assert!(
4002            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4003            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4004            names
4005        );
4006    }
4007
4008    #[test]
4009    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
4010        let expr = parse_one(
4011            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
4012            DialectType::Snowflake,
4013        )
4014        .expect("parse");
4015
4016        match expr {
4017            Expression::Select(select) => match &select.expressions[0] {
4018                Expression::Alias(alias) => match &alias.this {
4019                    Expression::Function(f) => {
4020                        assert_eq!(f.name.to_uppercase(), "DATEADD");
4021                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4022                    }
4023                    other => panic!("expected generic DATEADD function, got {other:?}"),
4024                },
4025                other => panic!("expected Alias, got {other:?}"),
4026            },
4027            other => panic!("expected Select, got {other:?}"),
4028        }
4029    }
4030
4031    #[test]
4032    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
4033        let expr = parse_one(
4034            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
4035            DialectType::Snowflake,
4036        )
4037        .expect("parse");
4038
4039        match expr {
4040            Expression::Select(select) => match &select.expressions[0] {
4041                Expression::Alias(alias) => match &alias.this {
4042                    Expression::Function(f) => {
4043                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
4044                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4045                    }
4046                    other => panic!("expected generic DATE_PART function, got {other:?}"),
4047                },
4048                other => panic!("expected Alias, got {other:?}"),
4049            },
4050            other => panic!("expected Select, got {other:?}"),
4051        }
4052    }
4053
4054    #[test]
4055    fn test_snowflake_date_part_string_literal_stays_generic_function() {
4056        let expr = parse_one(
4057            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
4058            DialectType::Snowflake,
4059        )
4060        .expect("parse");
4061
4062        match expr {
4063            Expression::Select(select) => match &select.expressions[0] {
4064                Expression::Alias(alias) => match &alias.this {
4065                    Expression::Function(f) => {
4066                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
4067                    }
4068                    other => panic!("expected generic DATE_PART function, got {other:?}"),
4069                },
4070                other => panic!("expected Alias, got {other:?}"),
4071            },
4072            other => panic!("expected Select, got {other:?}"),
4073        }
4074    }
4075
4076    #[test]
4077    fn test_lineage_join() {
4078        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4079
4080        let node_a = lineage("a", &expr, None, false).unwrap();
4081        let names_a = node_a.downstream_names();
4082        assert!(
4083            names_a.iter().any(|n| n == "t.a"),
4084            "Expected t.a, got: {:?}",
4085            names_a
4086        );
4087
4088        let node_b = lineage("b", &expr, None, false).unwrap();
4089        let names_b = node_b.downstream_names();
4090        assert!(
4091            names_b.iter().any(|n| n == "s.b"),
4092            "Expected s.b, got: {:?}",
4093            names_b
4094        );
4095    }
4096
4097    #[test]
4098    fn test_lineage_alias_leaf_has_resolved_source_name() {
4099        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
4100        let node = lineage("col1", &expr, None, false).unwrap();
4101
4102        // Keep alias in the display lineage edge.
4103        let names = node.downstream_names();
4104        assert!(
4105            names.iter().any(|n| n == "t1.col1"),
4106            "Expected aliased column edge t1.col1, got: {:?}",
4107            names
4108        );
4109
4110        // Leaf should expose the resolved base table for consumers.
4111        let leaf = node
4112            .downstream
4113            .iter()
4114            .find(|n| n.name == "t1.col1")
4115            .expect("Expected t1.col1 leaf");
4116        assert_eq!(leaf.source_name, "table1");
4117        match &leaf.source {
4118            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
4119            _ => panic!("Expected leaf source to be a table expression"),
4120        }
4121    }
4122
4123    #[test]
4124    fn test_lineage_derived_table() {
4125        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
4126        let node = lineage("a", &expr, None, false).unwrap();
4127
4128        assert_eq!(node.name, "a");
4129        // Should trace through the derived table to t.a
4130        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4131        assert!(
4132            all_names.iter().any(|n| n == "t.a"),
4133            "Expected to trace through derived table to t.a, got: {:?}",
4134            all_names
4135        );
4136    }
4137
4138    #[test]
4139    fn test_lineage_cte() {
4140        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
4141        let node = lineage("a", &expr, None, false).unwrap();
4142
4143        assert_eq!(node.name, "a");
4144        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4145        assert!(
4146            all_names.iter().any(|n| n == "t.a"),
4147            "Expected to trace through CTE to t.a, got: {:?}",
4148            all_names
4149        );
4150    }
4151
4152    #[test]
4153    fn test_lineage_union() {
4154        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
4155        let node = lineage("a", &expr, None, false).unwrap();
4156
4157        assert_eq!(node.name, "a");
4158        // Should have 2 downstream branches
4159        assert_eq!(
4160            node.downstream.len(),
4161            2,
4162            "Expected 2 branches for UNION, got {}",
4163            node.downstream.len()
4164        );
4165    }
4166
4167    #[test]
4168    fn test_lineage_cte_union() {
4169        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
4170        let node = lineage("a", &expr, None, false).unwrap();
4171
4172        // Should trace through CTE into both UNION branches
4173        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4174        assert!(
4175            all_names.len() >= 3,
4176            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
4177            all_names
4178        );
4179    }
4180
4181    #[test]
4182    fn test_lineage_star() {
4183        let expr = parse("SELECT * FROM t");
4184        let node = lineage("*", &expr, None, false).unwrap();
4185
4186        assert_eq!(node.name, "*");
4187        // Should have downstream for table t
4188        assert!(
4189            !node.downstream.is_empty(),
4190            "Star should produce downstream nodes"
4191        );
4192    }
4193
4194    #[test]
4195    fn test_lineage_subquery_in_select() {
4196        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
4197        let node = lineage("x", &expr, None, false).unwrap();
4198
4199        assert_eq!(node.name, "x");
4200        // Should have traced into the scalar subquery
4201        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4202        assert!(
4203            all_names.len() >= 2,
4204            "Expected tracing into scalar subquery, got: {:?}",
4205            all_names
4206        );
4207    }
4208
4209    #[test]
4210    fn test_lineage_multiple_columns() {
4211        let expr = parse("SELECT a, b FROM t");
4212
4213        let node_a = lineage("a", &expr, None, false).unwrap();
4214        let node_b = lineage("b", &expr, None, false).unwrap();
4215
4216        assert_eq!(node_a.name, "a");
4217        assert_eq!(node_b.name, "b");
4218
4219        // Each should trace independently
4220        let names_a = node_a.downstream_names();
4221        let names_b = node_b.downstream_names();
4222        assert!(names_a.iter().any(|n| n == "t.a"));
4223        assert!(names_b.iter().any(|n| n == "t.b"));
4224    }
4225
4226    #[test]
4227    fn test_get_source_tables() {
4228        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4229        let node = lineage("a", &expr, None, false).unwrap();
4230
4231        let tables = get_source_tables(&node);
4232        assert!(
4233            tables.contains("t"),
4234            "Expected source table 't', got: {:?}",
4235            tables
4236        );
4237    }
4238
4239    #[test]
4240    fn test_lineage_column_not_found() {
4241        let expr = parse("SELECT a FROM t");
4242        let result = lineage("nonexistent", &expr, None, false);
4243        assert!(result.is_err());
4244    }
4245
4246    #[test]
4247    fn test_lineage_nested_cte() {
4248        let expr = parse(
4249            "WITH cte1 AS (SELECT a FROM t), \
4250             cte2 AS (SELECT a FROM cte1) \
4251             SELECT a FROM cte2",
4252        );
4253        let node = lineage("a", &expr, None, false).unwrap();
4254
4255        // Should trace through cte2 → cte1 → t
4256        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4257        assert!(
4258            all_names.len() >= 3,
4259            "Expected to trace through nested CTEs, got: {:?}",
4260            all_names
4261        );
4262    }
4263
4264    #[test]
4265    fn test_trim_selects_true() {
4266        let expr = parse("SELECT a, b, c FROM t");
4267        let node = lineage("a", &expr, None, true).unwrap();
4268
4269        // The source should be trimmed to only include 'a'
4270        if let Expression::Select(select) = &node.source {
4271            assert_eq!(
4272                select.expressions.len(),
4273                1,
4274                "Trimmed source should have 1 expression, got {}",
4275                select.expressions.len()
4276            );
4277        } else {
4278            panic!("Expected Select source");
4279        }
4280    }
4281
4282    #[test]
4283    fn test_trim_selects_false() {
4284        let expr = parse("SELECT a, b, c FROM t");
4285        let node = lineage("a", &expr, None, false).unwrap();
4286
4287        // The source should keep all columns
4288        if let Expression::Select(select) = &node.source {
4289            assert_eq!(
4290                select.expressions.len(),
4291                3,
4292                "Untrimmed source should have 3 expressions"
4293            );
4294        } else {
4295            panic!("Expected Select source");
4296        }
4297    }
4298
4299    #[test]
4300    fn test_lineage_expression_in_select() {
4301        let expr = parse("SELECT a + b AS c FROM t");
4302        let node = lineage("c", &expr, None, false).unwrap();
4303
4304        // Should trace to both a and b from t
4305        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4306        assert!(
4307            all_names.len() >= 3,
4308            "Expected to trace a + b to both columns, got: {:?}",
4309            all_names
4310        );
4311    }
4312
4313    #[test]
4314    fn test_set_operation_by_index() {
4315        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
4316
4317        // Trace column "a" which is at index 0
4318        let node = lineage("a", &expr, None, false).unwrap();
4319
4320        // UNION branches should be traced by index
4321        assert_eq!(node.downstream.len(), 2);
4322    }
4323
4324    // --- Tests for column lineage inside function calls (issue #18) ---
4325
4326    fn print_node(node: &LineageNode, indent: usize) {
4327        let pad = "  ".repeat(indent);
4328        println!(
4329            "{pad}name={:?} source_name={:?}",
4330            node.name, node.source_name
4331        );
4332        for child in &node.downstream {
4333            print_node(child, indent + 1);
4334        }
4335    }
4336
4337    #[test]
4338    fn test_issue18_repro() {
4339        // Exact scenario from the issue
4340        let query = "SELECT UPPER(name) as upper_name FROM users";
4341        println!("Query: {query}\n");
4342
4343        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
4344        let exprs = dialect.parse(query).unwrap();
4345        let expr = &exprs[0];
4346
4347        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
4348        println!("lineage(\"upper_name\"):");
4349        print_node(&node, 1);
4350
4351        let names = node.downstream_names();
4352        assert!(
4353            names.iter().any(|n| n == "users.name"),
4354            "Expected users.name in downstream, got: {:?}",
4355            names
4356        );
4357    }
4358
4359    #[test]
4360    fn test_lineage_bigquery_safe_namespace_issue207() {
4361        let query = r#"
4362WITH import_cte AS (
4363  SELECT timestamp, data, operation
4364  FROM `project`.`dataset`.`source_table`
4365),
4366transform_cte AS (
4367  SELECT
4368    timestamp,
4369    SAFE.PARSE_JSON(data) AS json_data
4370  FROM import_cte
4371)
4372SELECT json_data FROM transform_cte
4373"#;
4374        let expr = parse_one(query, DialectType::BigQuery).expect("parse");
4375        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4376            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4377        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4378
4379        assert!(
4380            names.iter().any(|name| name == "source_table.data"),
4381            "expected source_table.data in lineage, got {names:?}"
4382        );
4383        assert!(
4384            !names
4385                .iter()
4386                .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
4387            "did not expect SAFE namespace receiver in lineage, got {names:?}"
4388        );
4389    }
4390
4391    #[test]
4392    fn test_lineage_bigquery_safe_namespace_method_call_guard() {
4393        let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
4394        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4395            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4396        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4397
4398        assert!(
4399            names.iter().any(|name| name == "t.data"),
4400            "expected t.data in lineage, got {names:?}"
4401        );
4402        assert!(
4403            !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
4404            "did not expect SAFE namespace receiver in lineage, got {names:?}"
4405        );
4406    }
4407
4408    #[test]
4409    fn test_lineage_method_call_receiver_control() {
4410        let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
4411        let node = lineage("out", &expr, None, false).expect("lineage");
4412        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4413
4414        assert!(
4415            names.iter().any(|name| name == "t.obj"),
4416            "expected ordinary method receiver to remain in lineage, got {names:?}"
4417        );
4418        assert!(
4419            names.iter().any(|name| name == "t.arg"),
4420            "expected method argument in lineage, got {names:?}"
4421        );
4422    }
4423
4424    #[test]
4425    fn test_lineage_upper_function() {
4426        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
4427        let node = lineage("upper_name", &expr, None, false).unwrap();
4428
4429        let names = node.downstream_names();
4430        assert!(
4431            names.iter().any(|n| n == "users.name"),
4432            "Expected users.name in downstream, got: {:?}",
4433            names
4434        );
4435    }
4436
4437    #[test]
4438    fn test_lineage_round_function() {
4439        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
4440        let node = lineage("rounded", &expr, None, false).unwrap();
4441
4442        let names = node.downstream_names();
4443        assert!(
4444            names.iter().any(|n| n == "products.price"),
4445            "Expected products.price in downstream, got: {:?}",
4446            names
4447        );
4448    }
4449
4450    #[test]
4451    fn test_lineage_coalesce_function() {
4452        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
4453        let node = lineage("val", &expr, None, false).unwrap();
4454
4455        let names = node.downstream_names();
4456        assert!(
4457            names.iter().any(|n| n == "t.a"),
4458            "Expected t.a in downstream, got: {:?}",
4459            names
4460        );
4461        assert!(
4462            names.iter().any(|n| n == "t.b"),
4463            "Expected t.b in downstream, got: {:?}",
4464            names
4465        );
4466    }
4467
4468    #[test]
4469    fn test_lineage_count_function() {
4470        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
4471        let node = lineage("cnt", &expr, None, false).unwrap();
4472
4473        let names = node.downstream_names();
4474        assert!(
4475            names.iter().any(|n| n == "t.id"),
4476            "Expected t.id in downstream, got: {:?}",
4477            names
4478        );
4479    }
4480
4481    #[test]
4482    fn test_lineage_sum_function() {
4483        let expr = parse("SELECT SUM(amount) AS total FROM t");
4484        let node = lineage("total", &expr, None, false).unwrap();
4485
4486        let names = node.downstream_names();
4487        assert!(
4488            names.iter().any(|n| n == "t.amount"),
4489            "Expected t.amount in downstream, got: {:?}",
4490            names
4491        );
4492    }
4493
4494    #[test]
4495    fn test_lineage_case_with_nested_functions() {
4496        let expr =
4497            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
4498        let node = lineage("result", &expr, None, false).unwrap();
4499
4500        let names = node.downstream_names();
4501        assert!(
4502            names.iter().any(|n| n == "t.x"),
4503            "Expected t.x in downstream, got: {:?}",
4504            names
4505        );
4506        assert!(
4507            names.iter().any(|n| n == "t.name"),
4508            "Expected t.name in downstream, got: {:?}",
4509            names
4510        );
4511    }
4512
4513    #[test]
4514    fn test_lineage_substring_function() {
4515        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
4516        let node = lineage("short", &expr, None, false).unwrap();
4517
4518        let names = node.downstream_names();
4519        assert!(
4520            names.iter().any(|n| n == "t.name"),
4521            "Expected t.name in downstream, got: {:?}",
4522            names
4523        );
4524    }
4525
4526    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
4527
4528    #[test]
4529    fn test_lineage_cte_select_star() {
4530        // Ported from sqlglot: test_lineage_source_with_star
4531        // WITH y AS (SELECT * FROM x) SELECT a FROM y
4532        // After star expansion: SELECT y.a AS a FROM y
4533        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
4534        let node = lineage("a", &expr, None, false).unwrap();
4535
4536        assert_eq!(node.name, "a");
4537        // Should successfully resolve column 'a' through the CTE
4538        // (previously failed with "Cannot find column 'a' in query")
4539        assert!(
4540            !node.downstream.is_empty(),
4541            "Expected downstream nodes tracing through CTE, got none"
4542        );
4543    }
4544
4545    #[test]
4546    fn test_lineage_schema_less_cte_star_passthrough_resolves_base_column() {
4547        let expr = parse("WITH c AS (SELECT * FROM t) SELECT c.x FROM c");
4548        let node = lineage("x", &expr, None, false).unwrap();
4549
4550        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4551        assert!(
4552            all_names.iter().any(|name| name == "t.x"),
4553            "Expected schema-less CTE star passthrough to reach t.x, got: {:?}",
4554            all_names
4555        );
4556
4557        let cte_node = node
4558            .walk()
4559            .find(|child| child.source_kind == SourceKind::Cte && child.source_name == "c")
4560            .expect("expected CTE hop with source_name c");
4561        assert_eq!(cte_node.source_kind, SourceKind::Cte);
4562        assert_eq!(cte_node.source_name, "c");
4563    }
4564
4565    #[test]
4566    fn test_lineage_schema_less_cte_star_passthrough_with_aggregation() {
4567        let expr = parse(
4568            "WITH c AS (SELECT * FROM t) \
4569             SELECT SUM(c.x) AS s FROM c GROUP BY 1",
4570        );
4571        let node = lineage("s", &expr, None, false).unwrap();
4572
4573        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4574        assert!(
4575            all_names.iter().any(|name| name == "t.x"),
4576            "Expected aggregate over CTE star passthrough to reach t.x, got: {:?}",
4577            all_names
4578        );
4579    }
4580
4581    #[test]
4582    fn test_lineage_schema_less_cte_star_passthrough_with_join_and_alias() {
4583        let expr = parse(
4584            "WITH a AS (SELECT * FROM t1), b AS (SELECT * FROM t2) \
4585             SELECT SUM(b.x) AS s FROM a LEFT JOIN b ON b.id = a.id GROUP BY a.k",
4586        );
4587        let node = lineage("s", &expr, None, false).unwrap();
4588
4589        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4590        assert!(
4591            all_names.iter().any(|name| name == "t2.x"),
4592            "Expected joined CTE star passthrough to reach t2.x, got: {:?}",
4593            all_names
4594        );
4595    }
4596
4597    #[test]
4598    fn test_lineage_schema_less_chained_cte_star_passthrough() {
4599        let expr = parse(
4600            "WITH c1 AS (SELECT * FROM t), \
4601             c2 AS (SELECT * FROM c1), \
4602             c3 AS (SELECT * FROM c2) \
4603             SELECT c3.x FROM c3",
4604        );
4605        let node = lineage("x", &expr, None, false).unwrap();
4606
4607        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4608        assert!(
4609            all_names.iter().any(|name| name == "t.x"),
4610            "Expected chained CTE star passthrough to reach t.x, got: {:?}",
4611            all_names
4612        );
4613    }
4614
4615    #[test]
4616    fn test_lineage_schema_less_unqualified_star_with_multiple_sources_does_not_guess() {
4617        let expr = parse("SELECT * FROM t1 JOIN t2 ON t1.id = t2.id");
4618        let result = lineage("x", &expr, None, false);
4619
4620        assert!(
4621            result.is_err(),
4622            "Unqualified star over multiple sources should remain ambiguous, got: {:?}",
4623            result
4624        );
4625    }
4626
4627    #[test]
4628    fn test_lineage_cte_select_star_renamed_column() {
4629        // dbt standard pattern: CTE with column rename + outer SELECT *
4630        // This is the primary use case for dbt projects (jaffle-shop etc.)
4631        let expr =
4632            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
4633        let node = lineage("customer_id", &expr, None, false).unwrap();
4634
4635        assert_eq!(node.name, "customer_id");
4636        // Should trace customer_id → renamed CTE → source.id
4637        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4638        assert!(
4639            all_names.len() >= 2,
4640            "Expected at least 2 nodes (customer_id → source), got: {:?}",
4641            all_names
4642        );
4643    }
4644
4645    #[test]
4646    fn test_lineage_cte_select_star_multiple_columns() {
4647        // CTE exposes multiple columns, outer SELECT * should resolve each
4648        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
4649
4650        for col in &["a", "b", "c"] {
4651            let node = lineage(col, &expr, None, false).unwrap();
4652            assert_eq!(node.name, *col);
4653            // Verify lineage resolves without error (star expanded to explicit columns)
4654            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4655            assert!(
4656                all_names.len() >= 2,
4657                "Expected at least 2 nodes for column {}, got: {:?}",
4658                col,
4659                all_names
4660            );
4661        }
4662    }
4663
4664    #[test]
4665    fn test_lineage_nested_cte_select_star() {
4666        // Nested CTE star expansion: cte2 references cte1 via SELECT *
4667        let expr = parse(
4668            "WITH cte1 AS (SELECT a FROM t), \
4669             cte2 AS (SELECT * FROM cte1) \
4670             SELECT * FROM cte2",
4671        );
4672        let node = lineage("a", &expr, None, false).unwrap();
4673
4674        assert_eq!(node.name, "a");
4675        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4676        assert!(
4677            all_names.len() >= 3,
4678            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
4679            all_names
4680        );
4681    }
4682
4683    #[test]
4684    fn test_lineage_three_level_nested_cte_star() {
4685        // Three-level nested CTE: cte3 → cte2 → cte1 → t
4686        let expr = parse(
4687            "WITH cte1 AS (SELECT x FROM t), \
4688             cte2 AS (SELECT * FROM cte1), \
4689             cte3 AS (SELECT * FROM cte2) \
4690             SELECT * FROM cte3",
4691        );
4692        let node = lineage("x", &expr, None, false).unwrap();
4693
4694        assert_eq!(node.name, "x");
4695        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4696        assert!(
4697            all_names.len() >= 4,
4698            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
4699            all_names
4700        );
4701    }
4702
4703    #[test]
4704    fn test_lineage_cte_union_star() {
4705        // CTE with UNION body, outer SELECT * should resolve from left branch
4706        let expr = parse(
4707            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
4708             SELECT * FROM cte",
4709        );
4710        let node = lineage("a", &expr, None, false).unwrap();
4711
4712        assert_eq!(node.name, "a");
4713        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4714        assert!(
4715            all_names.len() >= 2,
4716            "Expected at least 2 nodes for CTE union star, got: {:?}",
4717            all_names
4718        );
4719    }
4720
4721    #[test]
4722    fn test_lineage_cte_star_unknown_table() {
4723        // When CTE references an unknown table, star expansion is skipped gracefully
4724        // and lineage falls back to normal resolution (which may fail)
4725        let expr = parse(
4726            "WITH cte AS (SELECT * FROM unknown_table) \
4727             SELECT * FROM cte",
4728        );
4729        // This should not panic — it may succeed or fail depending on resolution,
4730        // but should not crash
4731        let _result = lineage("x", &expr, None, false);
4732    }
4733
4734    #[test]
4735    fn test_lineage_cte_explicit_columns() {
4736        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
4737        let expr = parse(
4738            "WITH cte(x, y) AS (SELECT a, b FROM t) \
4739             SELECT * FROM cte",
4740        );
4741        let node = lineage("x", &expr, None, false).unwrap();
4742        assert_eq!(node.name, "x");
4743    }
4744
4745    #[test]
4746    fn test_lineage_cte_qualified_star() {
4747        // Qualified star: SELECT cte.* FROM cte
4748        let expr = parse(
4749            "WITH cte AS (SELECT a, b FROM t) \
4750             SELECT cte.* FROM cte",
4751        );
4752        for col in &["a", "b"] {
4753            let node = lineage(col, &expr, None, false).unwrap();
4754            assert_eq!(node.name, *col);
4755            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4756            assert!(
4757                all_names.len() >= 2,
4758                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
4759                col,
4760                all_names
4761            );
4762        }
4763    }
4764
4765    #[test]
4766    fn test_lineage_subquery_select_star() {
4767        // Ported from sqlglot: test_select_star
4768        // SELECT x FROM (SELECT * FROM table_a)
4769        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
4770        let node = lineage("x", &expr, None, false).unwrap();
4771
4772        assert_eq!(node.name, "x");
4773        assert!(
4774            !node.downstream.is_empty(),
4775            "Expected downstream nodes for subquery with SELECT *, got none"
4776        );
4777    }
4778
4779    #[test]
4780    fn test_lineage_cte_star_with_schema_external_table() {
4781        // CTE references an external table via SELECT * — schema enables expansion
4782        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
4783SELECT * FROM orders"#;
4784        let expr = parse(sql);
4785
4786        let mut schema = MappingSchema::new();
4787        let cols = vec![
4788            ("order_id".to_string(), DataType::Unknown),
4789            ("customer_id".to_string(), DataType::Unknown),
4790            ("amount".to_string(), DataType::Unknown),
4791        ];
4792        schema.add_table("stg_orders", &cols, None).unwrap();
4793
4794        let node =
4795            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4796                .unwrap();
4797        assert_eq!(node.name, "order_id");
4798    }
4799
4800    #[test]
4801    fn test_lineage_cte_star_with_schema_three_part_name() {
4802        // CTE references an external table with fully-qualified 3-part name
4803        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
4804SELECT * FROM orders"#;
4805        let expr = parse(sql);
4806
4807        let mut schema = MappingSchema::new();
4808        let cols = vec![
4809            ("order_id".to_string(), DataType::Unknown),
4810            ("customer_id".to_string(), DataType::Unknown),
4811        ];
4812        schema
4813            .add_table("db.schema.stg_orders", &cols, None)
4814            .unwrap();
4815
4816        let node = lineage_with_schema(
4817            "customer_id",
4818            &expr,
4819            Some(&schema as &dyn Schema),
4820            None,
4821            false,
4822        )
4823        .unwrap();
4824        assert_eq!(node.name, "customer_id");
4825    }
4826
4827    #[test]
4828    fn test_lineage_cte_star_with_schema_nested() {
4829        // Nested CTEs: outer CTE references inner CTE with SELECT *,
4830        // inner CTE references external table with SELECT *
4831        let sql = r#"WITH
4832            raw AS (SELECT * FROM external_table),
4833            enriched AS (SELECT * FROM raw)
4834        SELECT * FROM enriched"#;
4835        let expr = parse(sql);
4836
4837        let mut schema = MappingSchema::new();
4838        let cols = vec![
4839            ("id".to_string(), DataType::Unknown),
4840            ("name".to_string(), DataType::Unknown),
4841        ];
4842        schema.add_table("external_table", &cols, None).unwrap();
4843
4844        let node =
4845            lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4846        assert_eq!(node.name, "name");
4847    }
4848
4849    #[test]
4850    fn test_lineage_cte_qualified_star_with_schema() {
4851        // CTE uses qualified star (orders.*) from a CTE whose columns
4852        // come from an external table via SELECT *
4853        let sql = r#"WITH
4854            orders AS (SELECT * FROM stg_orders),
4855            enriched AS (
4856                SELECT orders.*, 'extra' AS extra
4857                FROM orders
4858            )
4859        SELECT * FROM enriched"#;
4860        let expr = parse(sql);
4861
4862        let mut schema = MappingSchema::new();
4863        let cols = vec![
4864            ("order_id".to_string(), DataType::Unknown),
4865            ("total".to_string(), DataType::Unknown),
4866        ];
4867        schema.add_table("stg_orders", &cols, None).unwrap();
4868
4869        let node =
4870            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4871                .unwrap();
4872        assert_eq!(node.name, "order_id");
4873
4874        // Also verify the extra column works
4875        let extra =
4876            lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
4877        assert_eq!(extra.name, "extra");
4878    }
4879
4880    #[test]
4881    fn test_lineage_cte_star_without_schema_still_works() {
4882        // Without schema, CTE-to-CTE star expansion still works
4883        let sql = r#"WITH
4884            cte1 AS (SELECT id, name FROM raw_table),
4885            cte2 AS (SELECT * FROM cte1)
4886        SELECT * FROM cte2"#;
4887        let expr = parse(sql);
4888
4889        // No schema — should still resolve through CTE chain
4890        let node = lineage("id", &expr, None, false).unwrap();
4891        assert_eq!(node.name, "id");
4892    }
4893
4894    #[test]
4895    fn test_lineage_nested_cte_star_with_join_and_schema() {
4896        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
4897        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
4898        let sql = r#"WITH
4899base_orders AS (
4900    SELECT * FROM stg_orders
4901),
4902with_payments AS (
4903    SELECT
4904        base_orders.*,
4905        p.amount
4906    FROM base_orders
4907    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
4908),
4909final_cte AS (
4910    SELECT * FROM with_payments
4911)
4912SELECT * FROM final_cte"#;
4913        let expr = parse(sql);
4914
4915        let mut schema = MappingSchema::new();
4916        let order_cols = vec![
4917            (
4918                "order_id".to_string(),
4919                crate::expressions::DataType::Unknown,
4920            ),
4921            (
4922                "customer_id".to_string(),
4923                crate::expressions::DataType::Unknown,
4924            ),
4925            ("status".to_string(), crate::expressions::DataType::Unknown),
4926        ];
4927        let pay_cols = vec![
4928            (
4929                "payment_id".to_string(),
4930                crate::expressions::DataType::Unknown,
4931            ),
4932            (
4933                "order_id".to_string(),
4934                crate::expressions::DataType::Unknown,
4935            ),
4936            ("amount".to_string(), crate::expressions::DataType::Unknown),
4937        ];
4938        schema.add_table("stg_orders", &order_cols, None).unwrap();
4939        schema.add_table("stg_payments", &pay_cols, None).unwrap();
4940
4941        // order_id should trace back to stg_orders
4942        let node =
4943            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
4944                .unwrap();
4945        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4946
4947        // The leaf should be "stg_orders.order_id" (not just "order_id")
4948        let has_table_qualified = all_names
4949            .iter()
4950            .any(|n| n.contains('.') && n.contains("order_id"));
4951        assert!(
4952            has_table_qualified,
4953            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
4954            all_names
4955        );
4956
4957        // amount should trace back to stg_payments
4958        let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
4959            .unwrap();
4960        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4961
4962        let has_table_qualified = all_names
4963            .iter()
4964            .any(|n| n.contains('.') && n.contains("amount"));
4965        assert!(
4966            has_table_qualified,
4967            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
4968            all_names
4969        );
4970    }
4971
4972    #[test]
4973    fn test_lineage_cte_alias_resolution() {
4974        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
4975        let sql = r#"WITH import_stg_items AS (
4976    SELECT item_id, name, status FROM stg_items
4977)
4978SELECT base.item_id, base.status
4979FROM import_stg_items AS base"#;
4980        let expr = parse(sql);
4981
4982        let node = lineage("item_id", &expr, None, false).unwrap();
4983        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4984        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
4985        assert!(
4986            all_names.iter().any(|n| n == "stg_items.item_id"),
4987            "Expected leaf 'stg_items.item_id', got: {:?}",
4988            all_names
4989        );
4990    }
4991
4992    #[test]
4993    fn test_lineage_cte_alias_with_schema_and_star() {
4994        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
4995        let sql = r#"WITH import_stg AS (
4996    SELECT * FROM stg_items
4997)
4998SELECT base.item_id, base.status
4999FROM import_stg AS base"#;
5000        let expr = parse(sql);
5001
5002        let mut schema = MappingSchema::new();
5003        schema
5004            .add_table(
5005                "stg_items",
5006                &[
5007                    ("item_id".to_string(), DataType::Unknown),
5008                    ("name".to_string(), DataType::Unknown),
5009                    ("status".to_string(), DataType::Unknown),
5010                ],
5011                None,
5012            )
5013            .unwrap();
5014
5015        let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
5016            .unwrap();
5017        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5018        assert!(
5019            all_names.iter().any(|n| n == "stg_items.item_id"),
5020            "Expected leaf 'stg_items.item_id', got: {:?}",
5021            all_names
5022        );
5023    }
5024
5025    #[test]
5026    fn test_lineage_cte_alias_with_join() {
5027        // Multiple CTE aliases in a JOIN: each should resolve independently
5028        let sql = r#"WITH
5029    import_users AS (SELECT id, name FROM users),
5030    import_orders AS (SELECT id, user_id, amount FROM orders)
5031SELECT u.name, o.amount
5032FROM import_users AS u
5033LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
5034        let expr = parse(sql);
5035
5036        let node = lineage("name", &expr, None, false).unwrap();
5037        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5038        assert!(
5039            all_names.iter().any(|n| n == "users.name"),
5040            "Expected leaf 'users.name', got: {:?}",
5041            all_names
5042        );
5043
5044        let node = lineage("amount", &expr, None, false).unwrap();
5045        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5046        assert!(
5047            all_names.iter().any(|n| n == "orders.amount"),
5048            "Expected leaf 'orders.amount', got: {:?}",
5049            all_names
5050        );
5051    }
5052
5053    // -----------------------------------------------------------------------
5054    // Quoted CTE name tests — verifying SQL identifier case semantics
5055    // -----------------------------------------------------------------------
5056
5057    #[test]
5058    fn test_lineage_unquoted_cte_case_insensitive() {
5059        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
5060        // MyCte and MYCTE should match.
5061        let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
5062        let node = lineage("col", &expr, None, false).unwrap();
5063        assert_eq!(node.name, "col");
5064        assert!(
5065            !node.downstream.is_empty(),
5066            "Unquoted CTE should resolve case-insensitively"
5067        );
5068    }
5069
5070    #[test]
5071    fn test_lineage_quoted_cte_case_preserved() {
5072        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
5073        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
5074        let node = lineage("col", &expr, None, false).unwrap();
5075        assert_eq!(node.name, "col");
5076        assert!(
5077            !node.downstream.is_empty(),
5078            "Quoted CTE with matching case should resolve"
5079        );
5080    }
5081
5082    #[test]
5083    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
5084        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
5085        // sqlglot treats this as a table reference, not a CTE match.
5086        // Star expansion should NOT resolve through the CTE.
5087        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
5088        // lineage("col", ...) should fail because "mycte" is treated as an external
5089        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
5090        let result = lineage("col", &expr, None, false);
5091        assert!(
5092            result.is_err(),
5093            "Quoted CTE with case mismatch should not expand star: {:?}",
5094            result
5095        );
5096    }
5097
5098    #[test]
5099    fn test_lineage_mixed_quoted_unquoted_cte() {
5100        // Mix of unquoted and quoted CTEs in a nested chain.
5101        let expr = parse(
5102            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
5103        );
5104        let node = lineage("a", &expr, None, false).unwrap();
5105        assert_eq!(node.name, "a");
5106        assert!(
5107            !node.downstream.is_empty(),
5108            "Mixed quoted/unquoted CTE chain should resolve"
5109        );
5110    }
5111
5112    // -----------------------------------------------------------------------
5113    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
5114    // -----------------------------------------------------------------------
5115    //
5116    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
5117    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
5118    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
5119    // direct string comparison for CTE name matching, ignoring the quoted status.
5120    //
5121    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
5122    // and unquoted as case-insensitive. The scope system should do the same.
5123    //
5124    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
5125    // which is broader than the star expansion scope of this PR.
5126
5127    #[test]
5128    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
5129        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
5130        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
5131        // as "mycte" incorrectly resolves to the CTE.
5132        //
5133        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
5134        // case-sensitive: "mycte" should NOT match CTE "MyCte".
5135        //
5136        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
5137        // this test should fail — update the assertion to match correct behavior:
5138        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
5139        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
5140        let node = lineage("col", &expr, None, false).unwrap();
5141        assert!(!node.downstream.is_empty());
5142        let child = &node.downstream[0];
5143        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
5144        assert_eq!(
5145            child.source_name, "MyCte",
5146            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5147             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5148        );
5149    }
5150
5151    #[test]
5152    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
5153        // Known bug: same as above but with qualified column reference ("mycte".col).
5154        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
5155        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
5156        //
5157        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
5158        // this test should fail — update to assert source_name != "MyCte".
5159        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
5160        let node = lineage("col", &expr, None, false).unwrap();
5161        assert!(!node.downstream.is_empty());
5162        let child = &node.downstream[0];
5163        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
5164        assert_eq!(
5165            child.source_name, "MyCte",
5166            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5167             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5168        );
5169    }
5170
5171    #[test]
5172    fn test_lineage_recursive_cte_terminates_at_base_case() {
5173        let expr = parse_dialect(
5174            "WITH RECURSIVE nums AS (\
5175             SELECT 1 AS n \
5176             UNION ALL \
5177             SELECT n + 1 FROM nums WHERE n < 5\
5178             ) SELECT n FROM nums",
5179            DialectType::DuckDB,
5180        );
5181        let node = lineage("n", &expr, Some(DialectType::DuckDB), false).unwrap();
5182        let names = lineage_names(&node);
5183
5184        assert!(
5185            names.len() <= 12,
5186            "recursive CTE lineage should not unroll repeatedly, got {names:?}"
5187        );
5188        assert!(
5189            node.walk()
5190                .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "nums"),
5191            "expected recursive source to be marked as a CTE, got {names:?}"
5192        );
5193    }
5194
5195    #[test]
5196    fn test_lineage_window_partition_and_order_columns() {
5197        let expr = parse(
5198            "WITH c AS (SELECT user_id, ts FROM events) \
5199             SELECT ROW_NUMBER() OVER (PARTITION BY c.user_id ORDER BY c.ts) AS out FROM c",
5200        );
5201        let node = lineage("out", &expr, None, false).unwrap();
5202
5203        assert_lineage_contains(&node, "events.user_id");
5204        assert_lineage_contains(&node, "events.ts");
5205    }
5206
5207    #[test]
5208    fn test_lineage_window_aggregate_order_column() {
5209        let expr = parse(
5210            "WITH c AS (SELECT amount, d FROM txns) \
5211             SELECT SUM(c.amount) OVER (ORDER BY c.d) AS running FROM c",
5212        );
5213        let node = lineage("running", &expr, None, false).unwrap();
5214
5215        assert_lineage_contains(&node, "txns.amount");
5216        assert_lineage_contains(&node, "txns.d");
5217    }
5218
5219    #[test]
5220    fn test_lineage_named_window_columns() {
5221        let expr = parse(
5222            "SELECT ROW_NUMBER() OVER w AS out \
5223             FROM events \
5224             WINDOW w AS (PARTITION BY user_id ORDER BY ts)",
5225        );
5226        let node = lineage("out", &expr, None, false).unwrap();
5227
5228        assert_lineage_contains(&node, "events.user_id");
5229        assert_lineage_contains(&node, "events.ts");
5230    }
5231
5232    #[test]
5233    fn test_lineage_within_group_order_column() {
5234        let expr =
5235            parse("SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) AS p FROM txns");
5236        let node = lineage("p", &expr, None, false).unwrap();
5237
5238        assert_lineage_contains(&node, "txns.amount");
5239    }
5240
5241    #[test]
5242    fn test_lineage_query_wrappers_resolve_inner_select() {
5243        for sql in [
5244            "CREATE TABLE tgt AS SELECT x FROM src",
5245            "CREATE VIEW v AS SELECT x FROM src",
5246            "INSERT INTO tgt SELECT x FROM src",
5247        ] {
5248            let expr = parse(sql);
5249            let node = lineage("x", &expr, None, false).unwrap();
5250            assert_lineage_contains(&node, "src.x");
5251        }
5252    }
5253
5254    #[test]
5255    fn test_lineage_scalar_subquery_through_cte_reaches_base_table() {
5256        let expr = parse(
5257            "WITH c AS (SELECT x FROM t) \
5258             SELECT (SELECT SUM(x) FROM c) AS s FROM c LIMIT 1",
5259        );
5260        let node = lineage("s", &expr, None, false).unwrap();
5261
5262        assert_lineage_contains(&node, "t.x");
5263        assert!(
5264            node.walk()
5265                .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "c"),
5266            "expected scalar subquery CTE hop in lineage, got {:?}",
5267            lineage_names(&node)
5268        );
5269    }
5270
5271    #[test]
5272    fn test_lineage_pivot_output_resolves_aggregation_input() {
5273        let expr = parse_dialect(
5274            "SELECT * FROM (SELECT region, q, amt FROM sales) \
5275             PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5276            DialectType::DuckDB,
5277        );
5278        let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5279
5280        assert_lineage_contains(&node, "sales.amt");
5281    }
5282
5283    #[test]
5284    fn test_lineage_pivot_through_cte_resolves_aggregation_input() {
5285        let expr = parse_dialect(
5286            "WITH src AS (SELECT region, q, amt FROM sales) \
5287             SELECT q1 FROM src PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5288            DialectType::DuckDB,
5289        );
5290        let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5291
5292        assert_lineage_contains(&node, "sales.amt");
5293    }
5294
5295    #[test]
5296    fn test_lineage_unpivot_value_resolves_input_columns() {
5297        let expr = parse_dialect(
5298            "SELECT name, val FROM t UNPIVOT(val FOR col IN (a, b, c))",
5299            DialectType::DuckDB,
5300        );
5301        let node = lineage("val", &expr, Some(DialectType::DuckDB), false).unwrap();
5302
5303        assert_lineage_contains(&node, "t.a");
5304        assert_lineage_contains(&node, "t.b");
5305        assert_lineage_contains(&node, "t.c");
5306    }
5307
5308    #[test]
5309    fn test_lineage_unpivot_multi_value_columns_resolve_positionally() {
5310        let expr = parse_dialect(
5311            "SELECT first_half_sales, second_half_sales, semester \
5312             FROM produce \
5313             UNPIVOT((first_half_sales, second_half_sales) \
5314             FOR semester IN ((q1, q2) AS 'semester_1', (q3, q4) AS 'semester_2'))",
5315            DialectType::BigQuery,
5316        );
5317
5318        let first = lineage(
5319            "first_half_sales",
5320            &expr,
5321            Some(DialectType::BigQuery),
5322            false,
5323        )
5324        .unwrap();
5325        assert_lineage_contains(&first, "produce.q1");
5326        assert_lineage_contains(&first, "produce.q3");
5327
5328        let second = lineage(
5329            "second_half_sales",
5330            &expr,
5331            Some(DialectType::BigQuery),
5332            false,
5333        )
5334        .unwrap();
5335        assert_lineage_contains(&second, "produce.q2");
5336        assert_lineage_contains(&second, "produce.q4");
5337    }
5338
5339    #[test]
5340    fn test_lineage_top_level_union_over_ctes_reaches_base_tables() {
5341        let expr = parse(
5342            "WITH a AS (SELECT x FROM t1), b AS (SELECT x FROM t2) \
5343             SELECT x FROM a UNION SELECT x FROM b",
5344        );
5345        let node = lineage("x", &expr, None, false).unwrap();
5346
5347        assert_lineage_contains(&node, "t1.x");
5348        assert_lineage_contains(&node, "t2.x");
5349    }
5350
5351    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
5352
5353    /// sqlglot: test_node_name_doesnt_contain_comment
5354    /// Comments in column expressions should not affect lineage resolution.
5355    /// NOTE: This test uses SELECT * from a derived table, which is a separate
5356    /// known limitation in polyglot-sql (star expansion in subqueries).
5357    #[test]
5358    #[ignore = "requires derived table star expansion (separate issue)"]
5359    fn test_node_name_doesnt_contain_comment() {
5360        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
5361        let node = lineage("x", &expr, None, false).unwrap();
5362
5363        assert_eq!(node.name, "x");
5364        assert!(!node.downstream.is_empty());
5365    }
5366
5367    /// A line comment between SELECT and the first column wraps the column
5368    /// in an Annotated node. Lineage must unwrap it to find the column name.
5369    /// Verify that commented and uncommented queries produce identical lineage.
5370    #[test]
5371    fn test_comment_before_first_column_in_cte() {
5372        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
5373        let sql_without_comment = "with t as (select 1 as a) select a from t";
5374
5375        // Without comment — baseline
5376        let expr_ok = parse(sql_without_comment);
5377        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
5378
5379        // With comment — should produce identical lineage
5380        let expr_comment = parse(sql_with_comment);
5381        let node_comment = lineage("a", &expr_comment, None, false)
5382            .expect("with comment before first column should succeed");
5383
5384        assert_eq!(node_ok.name, node_comment.name, "node names should match");
5385        assert_eq!(
5386            node_ok.downstream_names(),
5387            node_comment.downstream_names(),
5388            "downstream lineage should be identical with or without comment"
5389        );
5390    }
5391
5392    /// Block comment between SELECT and first column.
5393    #[test]
5394    fn test_block_comment_before_first_column() {
5395        let sql = "with t as (select 1 as a) select /* section */ a from t";
5396        let expr = parse(sql);
5397        let node = lineage("a", &expr, None, false)
5398            .expect("block comment before first column should succeed");
5399        assert_eq!(node.name, "a");
5400        assert!(
5401            !node.downstream.is_empty(),
5402            "should have downstream lineage"
5403        );
5404    }
5405
5406    /// Comment before first column should not affect second column resolution.
5407    #[test]
5408    fn test_comment_before_first_column_second_col_ok() {
5409        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
5410        let expr = parse(sql);
5411
5412        let node_a =
5413            lineage("a", &expr, None, false).expect("column a with comment should succeed");
5414        assert_eq!(node_a.name, "a");
5415
5416        let node_b =
5417            lineage("b", &expr, None, false).expect("column b with comment should succeed");
5418        assert_eq!(node_b.name, "b");
5419    }
5420
5421    /// Aliased column with preceding comment.
5422    #[test]
5423    fn test_comment_before_aliased_column() {
5424        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
5425        let expr = parse(sql);
5426        let node =
5427            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
5428        assert_eq!(node.name, "y");
5429        assert!(
5430            !node.downstream.is_empty(),
5431            "aliased column should have downstream lineage"
5432        );
5433    }
5434}