Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19/// A node in the column lineage graph
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22    /// Name of this lineage step (e.g., "table.column")
23    pub name: String,
24    /// The expression at this node
25    pub expression: Expression,
26    /// The source expression (the full query context)
27    pub source: Expression,
28    /// Downstream nodes that depend on this one
29    pub downstream: Vec<LineageNode>,
30    /// Optional source name (e.g., for derived tables)
31    pub source_name: String,
32    /// Semantic source kind for downstream consumers.
33    pub source_kind: SourceKind,
34    /// User-written source alias when different from canonical source name.
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub source_alias: Option<String>,
37    /// Optional reference node name (e.g., for CTEs)
38    pub reference_node_name: String,
39}
40
41impl LineageNode {
42    /// Create a new lineage node
43    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
44        Self {
45            name: name.into(),
46            expression,
47            source,
48            downstream: Vec::new(),
49            source_name: String::new(),
50            source_kind: SourceKind::Unknown,
51            source_alias: None,
52            reference_node_name: String::new(),
53        }
54    }
55
56    /// Iterate over all nodes in the lineage graph using DFS
57    pub fn walk(&self) -> LineageWalker<'_> {
58        LineageWalker { stack: vec![self] }
59    }
60
61    /// Get all downstream column names
62    pub fn downstream_names(&self) -> Vec<String> {
63        self.downstream.iter().map(|n| n.name.clone()).collect()
64    }
65}
66
67fn source_kind_for_scope_context(
68    scope: &Scope,
69    source_name: &str,
70    reference_node_name: &str,
71) -> SourceKind {
72    if source_name.is_empty() && reference_node_name.is_empty() {
73        return SourceKind::Root;
74    }
75    if let Some(source_info) = scope.sources.get(source_name) {
76        return source_info.kind;
77    }
78    if scope.cte_sources.contains_key(source_name) {
79        return SourceKind::Cte;
80    }
81    match scope.scope_type {
82        ScopeType::Cte => SourceKind::Cte,
83        ScopeType::DerivedTable => SourceKind::DerivedTable,
84        ScopeType::Udtf => SourceKind::Virtual,
85        _ => SourceKind::Unknown,
86    }
87}
88
89fn apply_scope_context(
90    node: &mut LineageNode,
91    scope: &Scope,
92    source_name: &str,
93    reference_node_name: &str,
94) {
95    node.source_name = source_name.to_string();
96    node.reference_node_name = reference_node_name.to_string();
97    node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
98}
99
100/// Iterator for walking the lineage graph
101pub struct LineageWalker<'a> {
102    stack: Vec<&'a LineageNode>,
103}
104
105impl<'a> Iterator for LineageWalker<'a> {
106    type Item = &'a LineageNode;
107
108    fn next(&mut self) -> Option<Self::Item> {
109        if let Some(node) = self.stack.pop() {
110            // Add children in reverse order so they're visited in order
111            for child in node.downstream.iter().rev() {
112                self.stack.push(child);
113            }
114            Some(node)
115        } else {
116            None
117        }
118    }
119}
120
121// ---------------------------------------------------------------------------
122// ColumnRef: name or positional index for column lookup
123// ---------------------------------------------------------------------------
124
125/// Column reference for lineage tracing — by name or positional index.
126enum ColumnRef<'a> {
127    Name(&'a str),
128    Index(usize),
129}
130
131// ---------------------------------------------------------------------------
132// Public API
133// ---------------------------------------------------------------------------
134
135/// Build the lineage graph for a column in a SQL query
136///
137/// # Arguments
138/// * `column` - The column name to trace lineage for
139/// * `sql` - The SQL expression (SELECT, UNION, etc.)
140/// * `dialect` - Optional dialect for parsing
141/// * `trim_selects` - If true, trim the source SELECT to only include the target column
142///
143/// # Returns
144/// The root lineage node for the specified column
145///
146/// # Example
147/// ```ignore
148/// use polyglot_sql::lineage::lineage;
149/// use polyglot_sql::parse_one;
150/// use polyglot_sql::DialectType;
151///
152/// let sql = "SELECT a, b + 1 AS c FROM t";
153/// let expr = parse_one(sql, DialectType::Generic).unwrap();
154/// let node = lineage("c", &expr, None, false).unwrap();
155/// ```
156pub fn lineage(
157    column: &str,
158    sql: &Expression,
159    dialect: Option<DialectType>,
160    trim_selects: bool,
161) -> Result<LineageNode> {
162    // Fast path: skip clone when there are no CTEs to expand
163    let effective_sql = lineage_effective_expression(sql);
164    let has_with = matches!(effective_sql, Expression::Select(s) if s.with.is_some());
165    if !has_with {
166        return lineage_from_expression(column, sql, dialect, trim_selects);
167    }
168    let mut owned = sql.clone();
169    expand_cte_stars(&mut owned, None);
170    lineage_from_expression(column, &owned, dialect, trim_selects)
171}
172
173/// Build the lineage graph for a column in a SQL query using optional schema metadata.
174///
175/// When `schema` is provided, the query is first qualified with
176/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
177/// ambiguous column references.
178///
179/// # Arguments
180/// * `column` - The column name to trace lineage for
181/// * `sql` - The SQL expression (SELECT, UNION, etc.)
182/// * `schema` - Optional schema used for qualification
183/// * `dialect` - Optional dialect for qualification and lineage handling
184/// * `trim_selects` - If true, trim the source SELECT to only include the target column
185///
186/// # Returns
187/// The root lineage node for the specified column
188pub fn lineage_with_schema(
189    column: &str,
190    sql: &Expression,
191    schema: Option<&dyn Schema>,
192    dialect: Option<DialectType>,
193    trim_selects: bool,
194) -> Result<LineageNode> {
195    let mut qualified_expression = if let Some(schema) = schema {
196        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197            QualifyColumnsOptions::new().with_dialect(dialect_type)
198        } else {
199            QualifyColumnsOptions::new()
200        };
201
202        qualify_columns(sql.clone(), schema, &options).map_err(|e| {
203            Error::internal(format!("Lineage qualification failed with schema: {}", e))
204        })?
205    } else {
206        sql.clone()
207    };
208
209    // Annotate types in-place so lineage nodes carry type information
210    annotate_types(&mut qualified_expression, schema, dialect);
211
212    // Expand CTE stars on the already-owned expression (no extra clone).
213    // Pass schema so that stars from external tables can also be resolved.
214    expand_cte_stars(&mut qualified_expression, schema);
215
216    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220    column: &str,
221    sql: &Expression,
222    dialect: Option<DialectType>,
223    trim_selects: bool,
224) -> Result<LineageNode> {
225    let sql = lineage_effective_expression(sql);
226    let scope = build_scope(sql);
227    to_node(
228        ColumnRef::Name(column),
229        &scope,
230        dialect,
231        "",
232        "",
233        "",
234        trim_selects,
235    )
236}
237
238fn lineage_effective_expression(sql: &Expression) -> &Expression {
239    match sql {
240        Expression::Prepare(prepare) => &prepare.statement,
241        _ => sql,
242    }
243}
244
245// ---------------------------------------------------------------------------
246// CTE star expansion
247// ---------------------------------------------------------------------------
248
249/// Normalize an identifier for CTE name matching.
250///
251/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
252/// quoted identifiers preserve their original case. This matches sqlglot's
253/// `normalize_identifiers` behavior.
254fn normalize_cte_name(ident: &Identifier) -> String {
255    if ident.quoted {
256        ident.name.clone()
257    } else {
258        ident.name.to_lowercase()
259    }
260}
261
262/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
263/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
264/// which qualify_columns cannot resolve because it processes each SELECT independently.
265///
266/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
267/// by looking up column names in the schema. This enables correct expansion of patterns
268/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
269///
270/// CTE name matching follows SQL identifier semantics: unquoted names are compared
271/// case-insensitively (lowercased), while quoted names preserve their original case.
272/// This matches sqlglot's `normalize_identifiers` behavior.
273pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
274    if let Expression::Prepare(prepare) = expr {
275        expand_cte_stars(&mut prepare.statement, schema);
276        return;
277    }
278
279    let select = match expr {
280        Expression::Select(s) => s,
281        _ => return,
282    };
283
284    let with = match &mut select.with {
285        Some(w) => w,
286        None => return,
287    };
288
289    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
290
291    for cte in &mut with.ctes {
292        let cte_name = normalize_cte_name(&cte.alias);
293
294        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
295        if !cte.columns.is_empty() {
296            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
297            resolved_cte_columns.insert(cte_name, cols);
298            continue;
299        }
300
301        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
302        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
303        // references itself. We detect this conservatively: if the CTE name appears as
304        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
305        // block are still expanded.
306        if with.recursive {
307            let is_self_referencing =
308                if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
309                    let body_sources = get_select_sources(body_select);
310                    body_sources.iter().any(|s| s.normalized == cte_name)
311                } else {
312                    false
313                };
314            if is_self_referencing {
315                continue;
316            }
317        }
318
319        // Get the SELECT from the CTE body (handle UNION by taking left branch)
320        let body_select = match get_leftmost_select_mut(&mut cte.this) {
321            Some(s) => s,
322            None => continue,
323        };
324
325        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
326        resolved_cte_columns.insert(cte_name, columns);
327    }
328
329    // Also expand stars in the outer SELECT itself
330    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
331}
332
333/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
334///
335/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
336/// are determined by the left branch. This matches sqlglot's behavior.
337fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
338    let mut current = expr;
339    for _ in 0..MAX_LINEAGE_DEPTH {
340        match current {
341            Expression::Select(s) => return Some(s),
342            Expression::Union(u) => current = &mut u.left,
343            Expression::Intersect(i) => current = &mut i.left,
344            Expression::Except(e) => current = &mut e.left,
345            Expression::Paren(p) => current = &mut p.this,
346            _ => return None,
347        }
348    }
349    None
350}
351
352/// Rewrite star expressions in a SELECT using resolved CTE column lists.
353/// Falls back to `schema` for external table column lookup.
354/// Returns the list of output column names after expansion.
355fn rewrite_stars_in_select(
356    select: &mut Select,
357    resolved_ctes: &HashMap<String, Vec<String>>,
358    schema: Option<&dyn Schema>,
359) -> Vec<String> {
360    // The AST represents star expressions in two forms depending on syntax:
361    //   - `SELECT *`      → Expression::Star (unqualified star)
362    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
363    // Both must be checked to handle all star patterns.
364    let has_star = select
365        .expressions
366        .iter()
367        .any(|e| matches!(e, Expression::Star(_)));
368    let has_qualified_star = select
369        .expressions
370        .iter()
371        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
372
373    if !has_star && !has_qualified_star {
374        // No stars — just extract column names without rewriting
375        return select
376            .expressions
377            .iter()
378            .filter_map(get_expression_output_name)
379            .collect();
380    }
381
382    let sources = get_select_sources(select);
383    let mut new_expressions = Vec::new();
384    let mut result_columns = Vec::new();
385
386    for expr in &select.expressions {
387        match expr {
388            Expression::Star(star) => {
389                let qual = star.table.as_ref();
390                if let Some(expanded) =
391                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
392                {
393                    for (src_alias, col_name) in &expanded {
394                        let table_id = Identifier::new(src_alias);
395                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
396                        result_columns.push(col_name.clone());
397                    }
398                } else {
399                    new_expressions.push(expr.clone());
400                    result_columns.push("*".to_string());
401                }
402            }
403            Expression::Column(c) if c.name.name == "*" => {
404                let qual = c.table.as_ref();
405                if let Some(expanded) =
406                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
407                {
408                    for (_src_alias, col_name) in &expanded {
409                        // Keep the original table qualifier for qualified stars (table.*)
410                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
411                        result_columns.push(col_name.clone());
412                    }
413                } else {
414                    new_expressions.push(expr.clone());
415                    result_columns.push("*".to_string());
416                }
417            }
418            _ => {
419                new_expressions.push(expr.clone());
420                if let Some(name) = get_expression_output_name(expr) {
421                    result_columns.push(name);
422                }
423            }
424        }
425    }
426
427    select.expressions = new_expressions;
428    result_columns
429}
430
431/// Try to expand a star expression by looking up source columns from resolved CTEs,
432/// falling back to the schema for external tables.
433/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
434/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
435fn expand_star_from_sources(
436    qualifier: Option<&Identifier>,
437    sources: &[SourceInfo],
438    resolved_ctes: &HashMap<String, Vec<String>>,
439    schema: Option<&dyn Schema>,
440) -> Option<Vec<(String, String)>> {
441    let mut expanded = Vec::new();
442
443    if let Some(qual) = qualifier {
444        // Qualified star: table.*
445        let qual_normalized = normalize_cte_name(qual);
446        for src in sources {
447            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
448                // Try CTE first
449                if let Some(cols) = resolved_ctes.get(&src.normalized) {
450                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
451                    return Some(expanded);
452                }
453                // Fall back to schema
454                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
455                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
456                    return Some(expanded);
457                }
458            }
459        }
460        None
461    } else {
462        // Unqualified star: expand all sources.
463        // Intentionally conservative: if any source can't be resolved, the entire
464        // expansion is aborted. Partial expansion would produce an incomplete column
465        // list, causing downstream lineage resolution to silently omit columns.
466        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
467        let mut any_expanded = false;
468        for src in sources {
469            if let Some(cols) = resolved_ctes.get(&src.normalized) {
470                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
471                any_expanded = true;
472            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
473                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
474                any_expanded = true;
475            } else {
476                return None;
477            }
478        }
479        if any_expanded {
480            Some(expanded)
481        } else {
482            None
483        }
484    }
485}
486
487/// Look up column names for a table from the schema.
488fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
489    let schema = schema?;
490    if fq_name.is_empty() {
491        return None;
492    }
493    schema
494        .column_names(fq_name)
495        .ok()
496        .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
497}
498
499/// Create a Column expression with the given name and optional table qualifier.
500fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
501    Expression::Column(Box::new(crate::expressions::Column {
502        name: Identifier::new(name),
503        table: table.cloned(),
504        join_mark: false,
505        trailing_comments: Vec::new(),
506        span: None,
507        inferred_type: None,
508    }))
509}
510
511/// Extract the output name of a SELECT expression.
512fn get_expression_output_name(expr: &Expression) -> Option<String> {
513    match expr {
514        Expression::Alias(a) => Some(a.alias.name.clone()),
515        Expression::Column(c) => Some(c.name.name.clone()),
516        Expression::Identifier(id) => Some(id.name.clone()),
517        Expression::Star(_) => Some("*".to_string()),
518        _ => None,
519    }
520}
521
522/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
523struct SourceInfo {
524    alias: String,
525    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
526    normalized: String,
527    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
528    fq_name: String,
529}
530
531/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
532/// SELECT's FROM and JOIN clauses in a single pass.
533fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
534    let mut sources = Vec::new();
535
536    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
537        fn virtual_source_info(alias: &Identifier) -> SourceInfo {
538            SourceInfo {
539                alias: alias.name.clone(),
540                normalized: normalize_cte_name(alias),
541                fq_name: alias.name.clone(),
542            }
543        }
544
545        match expr {
546            Expression::Table(t) => {
547                let normalized = normalize_cte_name(&t.name);
548                let alias = t
549                    .alias
550                    .as_ref()
551                    .map(|a| a.name.clone())
552                    .unwrap_or_else(|| t.name.name.clone());
553                let mut parts = Vec::new();
554                if let Some(catalog) = &t.catalog {
555                    parts.push(catalog.name.clone());
556                }
557                if let Some(schema) = &t.schema {
558                    parts.push(schema.name.clone());
559                }
560                parts.push(t.name.name.clone());
561                let fq_name = parts.join(".");
562                Some(SourceInfo {
563                    alias,
564                    normalized,
565                    fq_name,
566                })
567            }
568            Expression::Subquery(s) => {
569                let alias = s.alias.as_ref()?.name.clone();
570                let normalized = alias.to_lowercase();
571                let fq_name = alias.clone();
572                Some(SourceInfo {
573                    alias,
574                    normalized,
575                    fq_name,
576                })
577            }
578            Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
579            Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
580                Some(virtual_source_info(&a.alias))
581            }
582            Expression::Paren(p) => extract_source(&p.this),
583            _ => None,
584        }
585    }
586
587    if let Some(from) = &select.from {
588        for expr in &from.expressions {
589            if let Some(info) = extract_source(expr) {
590                sources.push(info);
591            }
592        }
593    }
594    for join in &select.joins {
595        if let Some(info) = extract_source(&join.this) {
596            sources.push(info);
597        }
598    }
599    sources
600}
601
602/// Get all source tables from a lineage graph
603pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
604    let mut tables = HashSet::new();
605    collect_source_tables(node, &mut tables);
606    tables
607}
608
609/// Recursively collect source table names from lineage graph
610pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
611    if let Expression::Table(table) = &node.source {
612        tables.insert(table.name.name.clone());
613    }
614    for child in &node.downstream {
615        collect_source_tables(child, tables);
616    }
617}
618
619// ---------------------------------------------------------------------------
620// Core recursive lineage builder
621// ---------------------------------------------------------------------------
622
623/// Maximum recursion depth for lineage tracing to prevent stack overflow
624/// on circular or deeply nested CTE chains.
625const MAX_LINEAGE_DEPTH: usize = 64;
626
627/// Recursively build a lineage node for a column in a scope.
628fn to_node(
629    column: ColumnRef<'_>,
630    scope: &Scope,
631    dialect: Option<DialectType>,
632    scope_name: &str,
633    source_name: &str,
634    reference_node_name: &str,
635    trim_selects: bool,
636) -> Result<LineageNode> {
637    to_node_inner(
638        column,
639        scope,
640        dialect,
641        scope_name,
642        source_name,
643        reference_node_name,
644        trim_selects,
645        &[],
646        0,
647    )
648}
649
650fn to_node_inner(
651    column: ColumnRef<'_>,
652    scope: &Scope,
653    dialect: Option<DialectType>,
654    scope_name: &str,
655    source_name: &str,
656    reference_node_name: &str,
657    trim_selects: bool,
658    ancestor_cte_scopes: &[Scope],
659    depth: usize,
660) -> Result<LineageNode> {
661    if depth > MAX_LINEAGE_DEPTH {
662        return Err(Error::internal(format!(
663            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
664        )));
665    }
666    let scope_expr = &scope.expression;
667
668    // Build combined CTE scopes: current scope's cte_scopes + ancestors
669    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
670    for s in ancestor_cte_scopes {
671        all_cte_scopes.push(s);
672    }
673
674    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
675    //    but we need the inner query (SELECT/UNION) for column lookup.
676    let effective_expr = match scope_expr {
677        Expression::Cte(cte) => &cte.this,
678        other => other,
679    };
680
681    // 1. Set operations (UNION / INTERSECT / EXCEPT)
682    if matches!(
683        effective_expr,
684        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
685    ) {
686        // For CTE wrapping a set op, create a temporary scope with the inner expression
687        if matches!(scope_expr, Expression::Cte(_)) {
688            let mut inner_scope = Scope::new(effective_expr.clone());
689            inner_scope.union_scopes = scope.union_scopes.clone();
690            inner_scope.sources = scope.sources.clone();
691            inner_scope.cte_sources = scope.cte_sources.clone();
692            inner_scope.cte_scopes = scope.cte_scopes.clone();
693            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
694            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
695            return handle_set_operation(
696                &column,
697                &inner_scope,
698                dialect,
699                scope_name,
700                source_name,
701                reference_node_name,
702                trim_selects,
703                ancestor_cte_scopes,
704                depth,
705            );
706        }
707        return handle_set_operation(
708            &column,
709            scope,
710            dialect,
711            scope_name,
712            source_name,
713            reference_node_name,
714            trim_selects,
715            ancestor_cte_scopes,
716            depth,
717        );
718    }
719
720    // 2. Find the select expression for this column
721    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
722    let column_name = resolve_column_name(&column, &select_expr);
723
724    // 3. Trim source if requested
725    let node_source = if trim_selects {
726        trim_source(effective_expr, &select_expr)
727    } else {
728        effective_expr.clone()
729    };
730
731    // 4. Create the lineage node
732    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
733    apply_scope_context(&mut node, scope, source_name, reference_node_name);
734
735    // 5. Star handling — add downstream for each source
736    if matches!(&select_expr, Expression::Star(_)) {
737        for (name, source_info) in &scope.sources {
738            let mut child = LineageNode::new(
739                format!("{}.*", name),
740                Expression::Star(crate::expressions::Star {
741                    table: None,
742                    except: None,
743                    replace: None,
744                    rename: None,
745                    trailing_comments: vec![],
746                    span: None,
747                }),
748                source_info.expression.clone(),
749            );
750            apply_source_info_context(&mut child, name, source_info);
751            node.downstream.push(child);
752        }
753        return Ok(node);
754    }
755
756    // 6. Subqueries in select — trace through scalar subqueries
757    let subqueries: Vec<&Expression> =
758        select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
759    for sq_expr in subqueries {
760        if let Expression::Subquery(sq) = sq_expr {
761            for sq_scope in &scope.subquery_scopes {
762                if sq_scope.expression == sq.this {
763                    if let Ok(child) = to_node_inner(
764                        ColumnRef::Index(0),
765                        sq_scope,
766                        dialect,
767                        &column_name,
768                        "",
769                        "",
770                        trim_selects,
771                        ancestor_cte_scopes,
772                        depth + 1,
773                    ) {
774                        node.downstream.push(child);
775                    }
776                    break;
777                }
778            }
779        }
780    }
781
782    // 7. Column references — trace each column to its source
783    let col_refs = find_column_refs_in_expr(&select_expr, dialect);
784    for col_ref in col_refs {
785        let col_name = &col_ref.column;
786        if let Some(ref table_id) = col_ref.table {
787            let tbl = &table_id.name;
788            resolve_qualified_column(
789                &mut node,
790                scope,
791                dialect,
792                tbl,
793                col_name,
794                &column_name,
795                trim_selects,
796                &all_cte_scopes,
797                depth,
798            );
799        } else {
800            resolve_unqualified_column(
801                &mut node,
802                scope,
803                dialect,
804                col_name,
805                &column_name,
806                trim_selects,
807                &all_cte_scopes,
808                depth,
809            );
810        }
811    }
812
813    Ok(node)
814}
815
816// ---------------------------------------------------------------------------
817// Set operation handling
818// ---------------------------------------------------------------------------
819
820fn handle_set_operation(
821    column: &ColumnRef<'_>,
822    scope: &Scope,
823    dialect: Option<DialectType>,
824    scope_name: &str,
825    source_name: &str,
826    reference_node_name: &str,
827    trim_selects: bool,
828    ancestor_cte_scopes: &[Scope],
829    depth: usize,
830) -> Result<LineageNode> {
831    let scope_expr = &scope.expression;
832
833    // Determine column index
834    let col_index = match column {
835        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
836        ColumnRef::Index(i) => *i,
837    };
838
839    let col_name = match column {
840        ColumnRef::Name(name) => name.to_string(),
841        ColumnRef::Index(_) => format!("_{col_index}"),
842    };
843
844    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
845    apply_scope_context(&mut node, scope, source_name, reference_node_name);
846
847    // Recurse into each union branch
848    for branch_scope in &scope.union_scopes {
849        if let Ok(child) = to_node_inner(
850            ColumnRef::Index(col_index),
851            branch_scope,
852            dialect,
853            scope_name,
854            "",
855            "",
856            trim_selects,
857            ancestor_cte_scopes,
858            depth + 1,
859        ) {
860            node.downstream.push(child);
861        }
862    }
863
864    Ok(node)
865}
866
867// ---------------------------------------------------------------------------
868// Column resolution helpers
869// ---------------------------------------------------------------------------
870
871fn resolve_qualified_column(
872    node: &mut LineageNode,
873    scope: &Scope,
874    dialect: Option<DialectType>,
875    table: &str,
876    col_name: &str,
877    parent_name: &str,
878    trim_selects: bool,
879    all_cte_scopes: &[&Scope],
880    depth: usize,
881) {
882    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
883    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
884    let resolved_cte_name = resolve_cte_alias(scope, table);
885    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
886
887    // Check if table is a CTE reference — check both the current scope's cte_sources
888    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
889    let is_cte = scope.cte_sources.contains_key(effective_table)
890        || all_cte_scopes.iter().any(
891            |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
892        );
893    if is_cte {
894        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
895            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
896            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
897            if let Ok(child) = to_node_inner(
898                ColumnRef::Name(col_name),
899                child_scope,
900                dialect,
901                parent_name,
902                effective_table,
903                parent_name,
904                trim_selects,
905                &ancestors,
906                depth + 1,
907            ) {
908                node.downstream.push(child);
909                return;
910            }
911        }
912    }
913
914    // Check if table is a derived table (is_scope = true in sources)
915    if let Some(source_info) = scope.sources.get(table) {
916        if source_info.is_scope {
917            if let Some(child_scope) = find_child_scope(scope, table) {
918                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
919                if let Ok(child) = to_node_inner(
920                    ColumnRef::Name(col_name),
921                    child_scope,
922                    dialect,
923                    parent_name,
924                    table,
925                    parent_name,
926                    trim_selects,
927                    &ancestors,
928                    depth + 1,
929                ) {
930                    node.downstream.push(child);
931                    return;
932                }
933            }
934        }
935    }
936
937    // Base table source found in current scope: preserve alias in the display name
938    // but store the resolved table expression and name for downstream consumers.
939    if let Some(source_info) = scope.sources.get(table) {
940        if !source_info.is_scope {
941            let mut child = make_table_column_node_from_source(table, col_name, source_info);
942            if source_info.kind == SourceKind::Virtual {
943                attach_virtual_source_dependencies(
944                    &mut child,
945                    scope,
946                    dialect,
947                    table,
948                    &source_info.expression,
949                    trim_selects,
950                    all_cte_scopes,
951                    depth,
952                );
953            }
954            node.downstream.push(child);
955            return;
956        }
957    }
958
959    // Base table or unresolved — terminal node
960    node.downstream
961        .push(make_table_column_node(table, col_name));
962}
963
964/// Resolve a FROM alias to the original CTE name.
965///
966/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
967/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
968/// This function checks if `name` is such an alias and returns the CTE name.
969fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
970    // If it's already a known CTE name, no resolution needed
971    if scope.cte_sources.contains_key(name) {
972        return None;
973    }
974    // Check if the source's expression is a CTE — if so, extract the CTE name
975    if let Some(source_info) = scope.sources.get(name) {
976        if source_info.is_scope {
977            if let Expression::Cte(cte) = &source_info.expression {
978                let cte_name = &cte.alias.name;
979                if scope.cte_sources.contains_key(cte_name) {
980                    return Some(cte_name.clone());
981                }
982            }
983        }
984    }
985    None
986}
987
988fn resolve_unqualified_column(
989    node: &mut LineageNode,
990    scope: &Scope,
991    dialect: Option<DialectType>,
992    col_name: &str,
993    parent_name: &str,
994    trim_selects: bool,
995    all_cte_scopes: &[&Scope],
996    depth: usize,
997) {
998    // Try to find which source this column belongs to.
999    // Build the source list from the actual FROM/JOIN clauses to avoid
1000    // mixing in CTE definitions that are in scope but not referenced.
1001    let from_source_names = source_names_from_from_join(scope);
1002
1003    if from_source_names.len() == 1 {
1004        let tbl = &from_source_names[0];
1005        resolve_qualified_column(
1006            node,
1007            scope,
1008            dialect,
1009            tbl,
1010            col_name,
1011            parent_name,
1012            trim_selects,
1013            all_cte_scopes,
1014            depth,
1015        );
1016        return;
1017    }
1018
1019    // Multiple sources — can't resolve without schema info, add unqualified node
1020    let child = LineageNode::new(
1021        col_name.to_string(),
1022        Expression::Column(Box::new(crate::expressions::Column {
1023            name: crate::expressions::Identifier::new(col_name.to_string()),
1024            table: None,
1025            join_mark: false,
1026            trailing_comments: vec![],
1027            span: None,
1028            inferred_type: None,
1029        })),
1030        node.source.clone(),
1031    );
1032    node.downstream.push(child);
1033}
1034
1035fn attach_virtual_source_dependencies(
1036    node: &mut LineageNode,
1037    scope: &Scope,
1038    dialect: Option<DialectType>,
1039    source_alias: &str,
1040    source_expr: &Expression,
1041    trim_selects: bool,
1042    all_cte_scopes: &[&Scope],
1043    depth: usize,
1044) {
1045    let parent_name = node.name.clone();
1046    let mut seen = HashSet::new();
1047    for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1048        let key = (
1049            col_ref.table.as_ref().map(|t| t.name.clone()),
1050            col_ref.column.clone(),
1051        );
1052        if !seen.insert(key) {
1053            continue;
1054        }
1055
1056        if let Some(table_id) = col_ref.table {
1057            let table = table_id.name;
1058            if table == source_alias {
1059                continue;
1060            }
1061            resolve_qualified_column(
1062                node,
1063                scope,
1064                dialect,
1065                &table,
1066                &col_ref.column,
1067                &parent_name,
1068                trim_selects,
1069                all_cte_scopes,
1070                depth + 1,
1071            );
1072        } else {
1073            let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
1074            if non_virtual_sources.len() == 1 {
1075                resolve_qualified_column(
1076                    node,
1077                    scope,
1078                    dialect,
1079                    &non_virtual_sources[0],
1080                    &col_ref.column,
1081                    &parent_name,
1082                    trim_selects,
1083                    all_cte_scopes,
1084                    depth + 1,
1085                );
1086            }
1087        }
1088    }
1089}
1090
1091fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
1092    fn source_name(expr: &Expression) -> Option<String> {
1093        match expr {
1094            Expression::Table(table) => Some(
1095                table
1096                    .alias
1097                    .as_ref()
1098                    .map(|a| a.name.clone())
1099                    .unwrap_or_else(|| table.name.name.clone()),
1100            ),
1101            Expression::Subquery(subquery) => {
1102                subquery.alias.as_ref().map(|alias| alias.name.clone())
1103            }
1104            Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
1105            Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1106                Some(alias.alias.name.clone())
1107            }
1108            Expression::Paren(paren) => source_name(&paren.this),
1109            _ => None,
1110        }
1111    }
1112
1113    let effective_expr = match &scope.expression {
1114        Expression::Cte(cte) => &cte.this,
1115        expr => expr,
1116    };
1117
1118    let mut names = Vec::new();
1119    let mut seen = std::collections::HashSet::new();
1120
1121    if let Expression::Select(select) = effective_expr {
1122        if let Some(from) = &select.from {
1123            for expr in &from.expressions {
1124                if let Some(name) = source_name(expr) {
1125                    if !name.is_empty() && seen.insert(name.clone()) {
1126                        names.push(name);
1127                    }
1128                }
1129            }
1130        }
1131        for join in &select.joins {
1132            if let Some(name) = source_name(&join.this) {
1133                if !name.is_empty() && seen.insert(name.clone()) {
1134                    names.push(name);
1135                }
1136            }
1137        }
1138    }
1139
1140    names
1141}
1142
1143fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
1144    source_names_from_from_join(scope)
1145        .into_iter()
1146        .filter(|name| {
1147            !matches!(
1148                scope.sources.get(name).map(|source| source.kind),
1149                Some(SourceKind::Virtual)
1150            )
1151        })
1152        .collect()
1153}
1154
1155// ---------------------------------------------------------------------------
1156// Helper functions
1157// ---------------------------------------------------------------------------
1158
1159/// Get the alias or name of an expression
1160fn get_alias_or_name(expr: &Expression) -> Option<String> {
1161    match expr {
1162        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1163        Expression::Column(col) => Some(col.name.name.clone()),
1164        Expression::Identifier(id) => Some(id.name.clone()),
1165        Expression::Star(_) => Some("*".to_string()),
1166        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
1167        // Unwrap to get the actual column/alias name from the inner expression.
1168        Expression::Annotated(a) => get_alias_or_name(&a.this),
1169        _ => None,
1170    }
1171}
1172
1173/// Resolve the display name for a column reference.
1174fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1175    match column {
1176        ColumnRef::Name(n) => n.to_string(),
1177        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1178    }
1179}
1180
1181/// Find the select expression matching a column reference.
1182fn find_select_expr(
1183    scope_expr: &Expression,
1184    column: &ColumnRef<'_>,
1185    dialect: Option<DialectType>,
1186) -> Result<Expression> {
1187    if let Expression::Select(ref select) = scope_expr {
1188        match column {
1189            ColumnRef::Name(name) => {
1190                let normalized_name = normalize_column_name(name, dialect);
1191                for expr in &select.expressions {
1192                    if let Some(alias_or_name) = get_alias_or_name(expr) {
1193                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1194                            return Ok(expr.clone());
1195                        }
1196                    }
1197                }
1198                Err(crate::error::Error::parse(
1199                    format!("Cannot find column '{}' in query", name),
1200                    0,
1201                    0,
1202                    0,
1203                    0,
1204                ))
1205            }
1206            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1207                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1208            }),
1209        }
1210    } else {
1211        Err(crate::error::Error::parse(
1212            "Expected SELECT expression for column lookup",
1213            0,
1214            0,
1215            0,
1216            0,
1217        ))
1218    }
1219}
1220
1221/// Find the positional index of a column name in a set operation's first SELECT branch.
1222fn column_to_index(
1223    set_op_expr: &Expression,
1224    name: &str,
1225    dialect: Option<DialectType>,
1226) -> Result<usize> {
1227    let normalized_name = normalize_column_name(name, dialect);
1228    let mut expr = set_op_expr;
1229    loop {
1230        match expr {
1231            Expression::Union(u) => expr = &u.left,
1232            Expression::Intersect(i) => expr = &i.left,
1233            Expression::Except(e) => expr = &e.left,
1234            Expression::Select(select) => {
1235                for (i, e) in select.expressions.iter().enumerate() {
1236                    if let Some(alias_or_name) = get_alias_or_name(e) {
1237                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1238                            return Ok(i);
1239                        }
1240                    }
1241                }
1242                return Err(crate::error::Error::parse(
1243                    format!("Cannot find column '{}' in set operation", name),
1244                    0,
1245                    0,
1246                    0,
1247                    0,
1248                ));
1249            }
1250            _ => {
1251                return Err(crate::error::Error::parse(
1252                    "Expected SELECT or set operation",
1253                    0,
1254                    0,
1255                    0,
1256                    0,
1257                ))
1258            }
1259        }
1260    }
1261}
1262
1263fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1264    normalize_name(name, dialect, false, true)
1265}
1266
1267/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
1268fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1269    if let Expression::Select(select) = select_expr {
1270        let mut trimmed = select.as_ref().clone();
1271        trimmed.expressions = vec![target_expr.clone()];
1272        Expression::Select(Box::new(trimmed))
1273    } else {
1274        select_expr.clone()
1275    }
1276}
1277
1278/// Find the child scope (CTE or derived table) for a given source name.
1279fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1280    // Check CTE scopes
1281    if scope.cte_sources.contains_key(source_name) {
1282        for cte_scope in &scope.cte_scopes {
1283            if let Expression::Cte(cte) = &cte_scope.expression {
1284                if cte.alias.name == source_name {
1285                    return Some(cte_scope);
1286                }
1287            }
1288        }
1289    }
1290
1291    // Check derived table scopes
1292    if let Some(source_info) = scope.sources.get(source_name) {
1293        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1294            if let Expression::Subquery(sq) = &source_info.expression {
1295                for dt_scope in &scope.derived_table_scopes {
1296                    if dt_scope.expression == sq.this {
1297                        return Some(dt_scope);
1298                    }
1299                }
1300            }
1301        }
1302    }
1303
1304    None
1305}
1306
1307/// Find a CTE scope by name, searching through a combined list of CTE scopes.
1308/// This handles nested CTEs where the current scope doesn't have the CTE scope
1309/// as a direct child but knows about it via cte_sources.
1310fn find_child_scope_in<'a>(
1311    all_cte_scopes: &[&'a Scope],
1312    scope: &'a Scope,
1313    source_name: &str,
1314) -> Option<&'a Scope> {
1315    // First try the scope's own cte_scopes
1316    for cte_scope in &scope.cte_scopes {
1317        if let Expression::Cte(cte) = &cte_scope.expression {
1318            if cte.alias.name == source_name {
1319                return Some(cte_scope);
1320            }
1321        }
1322    }
1323
1324    // Then search through all ancestor CTE scopes
1325    for cte_scope in all_cte_scopes {
1326        if let Expression::Cte(cte) = &cte_scope.expression {
1327            if cte.alias.name == source_name {
1328                return Some(cte_scope);
1329            }
1330        }
1331    }
1332
1333    // Fall back to derived table scopes
1334    if let Some(source_info) = scope.sources.get(source_name) {
1335        if source_info.is_scope {
1336            if let Expression::Subquery(sq) = &source_info.expression {
1337                for dt_scope in &scope.derived_table_scopes {
1338                    if dt_scope.expression == sq.this {
1339                        return Some(dt_scope);
1340                    }
1341                }
1342            }
1343        }
1344    }
1345
1346    None
1347}
1348
1349/// Create a terminal lineage node for a table.column reference.
1350fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1351    let mut node = LineageNode::new(
1352        format!("{}.{}", table, column),
1353        Expression::Column(Box::new(crate::expressions::Column {
1354            name: crate::expressions::Identifier::new(column.to_string()),
1355            table: Some(crate::expressions::Identifier::new(table.to_string())),
1356            join_mark: false,
1357            trailing_comments: vec![],
1358            span: None,
1359            inferred_type: None,
1360        })),
1361        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1362    );
1363    node.source_name = table.to_string();
1364    node.source_kind = SourceKind::Table;
1365    node
1366}
1367
1368fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1369    let mut parts: Vec<String> = Vec::new();
1370    if let Some(catalog) = &table_ref.catalog {
1371        parts.push(catalog.name.clone());
1372    }
1373    if let Some(schema) = &table_ref.schema {
1374        parts.push(schema.name.clone());
1375    }
1376    parts.push(table_ref.name.name.clone());
1377    parts.join(".")
1378}
1379
1380fn apply_source_info_context(
1381    node: &mut LineageNode,
1382    source_key: &str,
1383    source_info: &ScopeSourceInfo,
1384) {
1385    node.source_kind = source_info.kind;
1386    node.source_name =
1387        source_info
1388            .lineage_name
1389            .clone()
1390            .unwrap_or_else(|| match &source_info.expression {
1391                Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
1392                _ => source_key.to_string(),
1393            });
1394    node.source_alias = source_info.alias.clone();
1395}
1396
1397fn make_table_column_node_from_source(
1398    source_key: &str,
1399    column: &str,
1400    source_info: &ScopeSourceInfo,
1401) -> LineageNode {
1402    let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
1403    let mut node = LineageNode::new(
1404        format!("{}.{}", lineage_name, column),
1405        Expression::Column(Box::new(crate::expressions::Column {
1406            name: crate::expressions::Identifier::new(column.to_string()),
1407            table: Some(crate::expressions::Identifier::new(
1408                lineage_name.to_string(),
1409            )),
1410            join_mark: false,
1411            trailing_comments: vec![],
1412            span: None,
1413            inferred_type: None,
1414        })),
1415        source_info.expression.clone(),
1416    );
1417
1418    apply_source_info_context(&mut node, source_key, source_info);
1419
1420    node
1421}
1422
1423/// Simple column reference extracted from an expression
1424#[derive(Debug, Clone)]
1425struct SimpleColumnRef {
1426    table: Option<crate::expressions::Identifier>,
1427    column: String,
1428}
1429
1430/// Find all column references in an expression (does not recurse into subqueries).
1431fn find_column_refs_in_expr(
1432    expr: &Expression,
1433    dialect: Option<DialectType>,
1434) -> Vec<SimpleColumnRef> {
1435    let mut refs = Vec::new();
1436    collect_column_refs(expr, dialect, &mut refs);
1437    refs
1438}
1439
1440fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1441    match expr {
1442        Expression::Column(col) => {
1443            col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1444        }
1445        Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1446        _ => false,
1447    }
1448}
1449
1450fn collect_column_refs(
1451    expr: &Expression,
1452    dialect: Option<DialectType>,
1453    refs: &mut Vec<SimpleColumnRef>,
1454) {
1455    let mut stack: Vec<&Expression> = vec![expr];
1456
1457    while let Some(current) = stack.pop() {
1458        match current {
1459            // === Leaf: collect Column references ===
1460            Expression::Column(col) => {
1461                refs.push(SimpleColumnRef {
1462                    table: col.table.clone(),
1463                    column: col.name.name.clone(),
1464                });
1465            }
1466
1467            // === Boundary: don't recurse into subqueries (handled separately) ===
1468            Expression::Subquery(_) | Expression::Exists(_) => {}
1469
1470            // === BinaryOp variants: left, right ===
1471            Expression::And(op)
1472            | Expression::Or(op)
1473            | Expression::Eq(op)
1474            | Expression::Neq(op)
1475            | Expression::Lt(op)
1476            | Expression::Lte(op)
1477            | Expression::Gt(op)
1478            | Expression::Gte(op)
1479            | Expression::Add(op)
1480            | Expression::Sub(op)
1481            | Expression::Mul(op)
1482            | Expression::Div(op)
1483            | Expression::Mod(op)
1484            | Expression::BitwiseAnd(op)
1485            | Expression::BitwiseOr(op)
1486            | Expression::BitwiseXor(op)
1487            | Expression::BitwiseLeftShift(op)
1488            | Expression::BitwiseRightShift(op)
1489            | Expression::Concat(op)
1490            | Expression::Adjacent(op)
1491            | Expression::TsMatch(op)
1492            | Expression::PropertyEQ(op)
1493            | Expression::ArrayContainsAll(op)
1494            | Expression::ArrayContainedBy(op)
1495            | Expression::ArrayOverlaps(op)
1496            | Expression::JSONBContainsAllTopKeys(op)
1497            | Expression::JSONBContainsAnyTopKeys(op)
1498            | Expression::JSONBDeleteAtPath(op)
1499            | Expression::ExtendsLeft(op)
1500            | Expression::ExtendsRight(op)
1501            | Expression::Is(op)
1502            | Expression::MemberOf(op)
1503            | Expression::NullSafeEq(op)
1504            | Expression::NullSafeNeq(op)
1505            | Expression::Glob(op)
1506            | Expression::Match(op) => {
1507                stack.push(&op.left);
1508                stack.push(&op.right);
1509            }
1510
1511            // === UnaryOp variants: this ===
1512            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1513                stack.push(&u.this);
1514            }
1515
1516            // === UnaryFunc variants: this ===
1517            Expression::Upper(f)
1518            | Expression::Lower(f)
1519            | Expression::Length(f)
1520            | Expression::LTrim(f)
1521            | Expression::RTrim(f)
1522            | Expression::Reverse(f)
1523            | Expression::Abs(f)
1524            | Expression::Sqrt(f)
1525            | Expression::Cbrt(f)
1526            | Expression::Ln(f)
1527            | Expression::Exp(f)
1528            | Expression::Sign(f)
1529            | Expression::Date(f)
1530            | Expression::Time(f)
1531            | Expression::DateFromUnixDate(f)
1532            | Expression::UnixDate(f)
1533            | Expression::UnixSeconds(f)
1534            | Expression::UnixMillis(f)
1535            | Expression::UnixMicros(f)
1536            | Expression::TimeStrToDate(f)
1537            | Expression::DateToDi(f)
1538            | Expression::DiToDate(f)
1539            | Expression::TsOrDiToDi(f)
1540            | Expression::TsOrDsToDatetime(f)
1541            | Expression::TsOrDsToTimestamp(f)
1542            | Expression::YearOfWeek(f)
1543            | Expression::YearOfWeekIso(f)
1544            | Expression::Initcap(f)
1545            | Expression::Ascii(f)
1546            | Expression::Chr(f)
1547            | Expression::Soundex(f)
1548            | Expression::ByteLength(f)
1549            | Expression::Hex(f)
1550            | Expression::LowerHex(f)
1551            | Expression::Unicode(f)
1552            | Expression::Radians(f)
1553            | Expression::Degrees(f)
1554            | Expression::Sin(f)
1555            | Expression::Cos(f)
1556            | Expression::Tan(f)
1557            | Expression::Asin(f)
1558            | Expression::Acos(f)
1559            | Expression::Atan(f)
1560            | Expression::IsNan(f)
1561            | Expression::IsInf(f)
1562            | Expression::ArrayLength(f)
1563            | Expression::ArraySize(f)
1564            | Expression::Cardinality(f)
1565            | Expression::ArrayReverse(f)
1566            | Expression::ArrayDistinct(f)
1567            | Expression::ArrayFlatten(f)
1568            | Expression::ArrayCompact(f)
1569            | Expression::Explode(f)
1570            | Expression::ExplodeOuter(f)
1571            | Expression::ToArray(f)
1572            | Expression::MapFromEntries(f)
1573            | Expression::MapKeys(f)
1574            | Expression::MapValues(f)
1575            | Expression::JsonArrayLength(f)
1576            | Expression::JsonKeys(f)
1577            | Expression::JsonType(f)
1578            | Expression::ParseJson(f)
1579            | Expression::ToJson(f)
1580            | Expression::Typeof(f)
1581            | Expression::BitwiseCount(f)
1582            | Expression::Year(f)
1583            | Expression::Month(f)
1584            | Expression::Day(f)
1585            | Expression::Hour(f)
1586            | Expression::Minute(f)
1587            | Expression::Second(f)
1588            | Expression::DayOfWeek(f)
1589            | Expression::DayOfWeekIso(f)
1590            | Expression::DayOfMonth(f)
1591            | Expression::DayOfYear(f)
1592            | Expression::WeekOfYear(f)
1593            | Expression::Quarter(f)
1594            | Expression::Epoch(f)
1595            | Expression::EpochMs(f)
1596            | Expression::TimeStrToUnix(f)
1597            | Expression::SHA(f)
1598            | Expression::SHA1Digest(f)
1599            | Expression::TimeToUnix(f)
1600            | Expression::JSONBool(f)
1601            | Expression::Int64(f)
1602            | Expression::MD5NumberLower64(f)
1603            | Expression::MD5NumberUpper64(f)
1604            | Expression::DateStrToDate(f)
1605            | Expression::DateToDateStr(f) => {
1606                stack.push(&f.this);
1607            }
1608
1609            // === BinaryFunc variants: this, expression ===
1610            Expression::Power(f)
1611            | Expression::NullIf(f)
1612            | Expression::IfNull(f)
1613            | Expression::Nvl(f)
1614            | Expression::UnixToTimeStr(f)
1615            | Expression::Contains(f)
1616            | Expression::StartsWith(f)
1617            | Expression::EndsWith(f)
1618            | Expression::Levenshtein(f)
1619            | Expression::ModFunc(f)
1620            | Expression::Atan2(f)
1621            | Expression::IntDiv(f)
1622            | Expression::AddMonths(f)
1623            | Expression::MonthsBetween(f)
1624            | Expression::NextDay(f)
1625            | Expression::ArrayContains(f)
1626            | Expression::ArrayPosition(f)
1627            | Expression::ArrayAppend(f)
1628            | Expression::ArrayPrepend(f)
1629            | Expression::ArrayUnion(f)
1630            | Expression::ArrayExcept(f)
1631            | Expression::ArrayRemove(f)
1632            | Expression::StarMap(f)
1633            | Expression::MapFromArrays(f)
1634            | Expression::MapContainsKey(f)
1635            | Expression::ElementAt(f)
1636            | Expression::JsonMergePatch(f)
1637            | Expression::JSONBContains(f)
1638            | Expression::JSONBExtract(f) => {
1639                stack.push(&f.this);
1640                stack.push(&f.expression);
1641            }
1642
1643            // === VarArgFunc variants: expressions ===
1644            Expression::Greatest(f)
1645            | Expression::Least(f)
1646            | Expression::Coalesce(f)
1647            | Expression::ArrayConcat(f)
1648            | Expression::ArrayIntersect(f)
1649            | Expression::ArrayZip(f)
1650            | Expression::MapConcat(f)
1651            | Expression::JsonArray(f) => {
1652                for e in &f.expressions {
1653                    stack.push(e);
1654                }
1655            }
1656
1657            // === AggFunc variants: this, filter, having_max, limit ===
1658            Expression::Sum(f)
1659            | Expression::Avg(f)
1660            | Expression::Min(f)
1661            | Expression::Max(f)
1662            | Expression::ArrayAgg(f)
1663            | Expression::CountIf(f)
1664            | Expression::Stddev(f)
1665            | Expression::StddevPop(f)
1666            | Expression::StddevSamp(f)
1667            | Expression::Variance(f)
1668            | Expression::VarPop(f)
1669            | Expression::VarSamp(f)
1670            | Expression::Median(f)
1671            | Expression::Mode(f)
1672            | Expression::First(f)
1673            | Expression::Last(f)
1674            | Expression::AnyValue(f)
1675            | Expression::ApproxDistinct(f)
1676            | Expression::ApproxCountDistinct(f)
1677            | Expression::LogicalAnd(f)
1678            | Expression::LogicalOr(f)
1679            | Expression::Skewness(f)
1680            | Expression::ArrayConcatAgg(f)
1681            | Expression::ArrayUniqueAgg(f)
1682            | Expression::BoolXorAgg(f)
1683            | Expression::BitwiseAndAgg(f)
1684            | Expression::BitwiseOrAgg(f)
1685            | Expression::BitwiseXorAgg(f) => {
1686                stack.push(&f.this);
1687                if let Some(ref filter) = f.filter {
1688                    stack.push(filter);
1689                }
1690                if let Some((ref expr, _)) = f.having_max {
1691                    stack.push(expr);
1692                }
1693                if let Some(ref limit) = f.limit {
1694                    stack.push(limit);
1695                }
1696            }
1697
1698            // === Generic Function / AggregateFunction: args ===
1699            Expression::Function(func) => {
1700                for arg in &func.args {
1701                    stack.push(arg);
1702                }
1703            }
1704            Expression::AggregateFunction(func) => {
1705                for arg in &func.args {
1706                    stack.push(arg);
1707                }
1708                if let Some(ref filter) = func.filter {
1709                    stack.push(filter);
1710                }
1711                if let Some(ref limit) = func.limit {
1712                    stack.push(limit);
1713                }
1714            }
1715
1716            // === WindowFunction: this (skip Over for lineage purposes) ===
1717            Expression::WindowFunction(wf) => {
1718                stack.push(&wf.this);
1719            }
1720
1721            // === Containers and special expressions ===
1722            Expression::Alias(a) => {
1723                stack.push(&a.this);
1724            }
1725            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1726                stack.push(&c.this);
1727                if let Some(ref fmt) = c.format {
1728                    stack.push(fmt);
1729                }
1730                if let Some(ref def) = c.default {
1731                    stack.push(def);
1732                }
1733            }
1734            Expression::Paren(p) => {
1735                stack.push(&p.this);
1736            }
1737            Expression::Annotated(a) => {
1738                stack.push(&a.this);
1739            }
1740            Expression::Case(case) => {
1741                if let Some(ref operand) = case.operand {
1742                    stack.push(operand);
1743                }
1744                for (cond, result) in &case.whens {
1745                    stack.push(cond);
1746                    stack.push(result);
1747                }
1748                if let Some(ref else_expr) = case.else_ {
1749                    stack.push(else_expr);
1750                }
1751            }
1752            Expression::Collation(c) => {
1753                stack.push(&c.this);
1754            }
1755            Expression::In(i) => {
1756                stack.push(&i.this);
1757                for e in &i.expressions {
1758                    stack.push(e);
1759                }
1760                if let Some(ref q) = i.query {
1761                    stack.push(q);
1762                }
1763                if let Some(ref u) = i.unnest {
1764                    stack.push(u);
1765                }
1766            }
1767            Expression::Between(b) => {
1768                stack.push(&b.this);
1769                stack.push(&b.low);
1770                stack.push(&b.high);
1771            }
1772            Expression::IsNull(n) => {
1773                stack.push(&n.this);
1774            }
1775            Expression::IsTrue(t) | Expression::IsFalse(t) => {
1776                stack.push(&t.this);
1777            }
1778            Expression::IsJson(j) => {
1779                stack.push(&j.this);
1780            }
1781            Expression::Like(l) | Expression::ILike(l) => {
1782                stack.push(&l.left);
1783                stack.push(&l.right);
1784                if let Some(ref esc) = l.escape {
1785                    stack.push(esc);
1786                }
1787            }
1788            Expression::SimilarTo(s) => {
1789                stack.push(&s.this);
1790                stack.push(&s.pattern);
1791                if let Some(ref esc) = s.escape {
1792                    stack.push(esc);
1793                }
1794            }
1795            Expression::Ordered(o) => {
1796                stack.push(&o.this);
1797            }
1798            Expression::Array(a) => {
1799                for e in &a.expressions {
1800                    stack.push(e);
1801                }
1802            }
1803            Expression::Tuple(t) => {
1804                for e in &t.expressions {
1805                    stack.push(e);
1806                }
1807            }
1808            Expression::Struct(s) => {
1809                for (_, e) in &s.fields {
1810                    stack.push(e);
1811                }
1812            }
1813            Expression::Subscript(s) => {
1814                stack.push(&s.this);
1815                stack.push(&s.index);
1816            }
1817            Expression::Dot(d) => {
1818                stack.push(&d.this);
1819            }
1820            Expression::MethodCall(m) => {
1821                if !matches!(dialect, Some(DialectType::BigQuery))
1822                    || !is_bigquery_safe_namespace_receiver(&m.this)
1823                {
1824                    stack.push(&m.this);
1825                }
1826                for arg in &m.args {
1827                    stack.push(arg);
1828                }
1829            }
1830            Expression::ArraySlice(s) => {
1831                stack.push(&s.this);
1832                if let Some(ref start) = s.start {
1833                    stack.push(start);
1834                }
1835                if let Some(ref end) = s.end {
1836                    stack.push(end);
1837                }
1838            }
1839            Expression::Lambda(l) => {
1840                stack.push(&l.body);
1841            }
1842            Expression::NamedArgument(n) => {
1843                stack.push(&n.value);
1844            }
1845            Expression::TryCatch(t) => {
1846                for stmt in &t.try_body {
1847                    stack.push(stmt);
1848                }
1849                if let Some(catch_body) = &t.catch_body {
1850                    for stmt in catch_body {
1851                        stack.push(stmt);
1852                    }
1853                }
1854            }
1855            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
1856                stack.push(e);
1857            }
1858
1859            // === Custom function structs ===
1860            Expression::Substring(f) => {
1861                stack.push(&f.this);
1862                stack.push(&f.start);
1863                if let Some(ref len) = f.length {
1864                    stack.push(len);
1865                }
1866            }
1867            Expression::Trim(f) => {
1868                stack.push(&f.this);
1869                if let Some(ref chars) = f.characters {
1870                    stack.push(chars);
1871                }
1872            }
1873            Expression::Replace(f) => {
1874                stack.push(&f.this);
1875                stack.push(&f.old);
1876                stack.push(&f.new);
1877            }
1878            Expression::IfFunc(f) => {
1879                stack.push(&f.condition);
1880                stack.push(&f.true_value);
1881                if let Some(ref fv) = f.false_value {
1882                    stack.push(fv);
1883                }
1884            }
1885            Expression::Nvl2(f) => {
1886                stack.push(&f.this);
1887                stack.push(&f.true_value);
1888                stack.push(&f.false_value);
1889            }
1890            Expression::ConcatWs(f) => {
1891                stack.push(&f.separator);
1892                for e in &f.expressions {
1893                    stack.push(e);
1894                }
1895            }
1896            Expression::Count(f) => {
1897                if let Some(ref this) = f.this {
1898                    stack.push(this);
1899                }
1900                if let Some(ref filter) = f.filter {
1901                    stack.push(filter);
1902                }
1903            }
1904            Expression::GroupConcat(f) => {
1905                stack.push(&f.this);
1906                if let Some(ref sep) = f.separator {
1907                    stack.push(sep);
1908                }
1909                if let Some(ref filter) = f.filter {
1910                    stack.push(filter);
1911                }
1912            }
1913            Expression::StringAgg(f) => {
1914                stack.push(&f.this);
1915                if let Some(ref sep) = f.separator {
1916                    stack.push(sep);
1917                }
1918                if let Some(ref filter) = f.filter {
1919                    stack.push(filter);
1920                }
1921                if let Some(ref limit) = f.limit {
1922                    stack.push(limit);
1923                }
1924            }
1925            Expression::ListAgg(f) => {
1926                stack.push(&f.this);
1927                if let Some(ref sep) = f.separator {
1928                    stack.push(sep);
1929                }
1930                if let Some(ref filter) = f.filter {
1931                    stack.push(filter);
1932                }
1933            }
1934            Expression::SumIf(f) => {
1935                stack.push(&f.this);
1936                stack.push(&f.condition);
1937                if let Some(ref filter) = f.filter {
1938                    stack.push(filter);
1939                }
1940            }
1941            Expression::DateAdd(f) | Expression::DateSub(f) => {
1942                stack.push(&f.this);
1943                stack.push(&f.interval);
1944            }
1945            Expression::DateDiff(f) => {
1946                stack.push(&f.this);
1947                stack.push(&f.expression);
1948            }
1949            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
1950                stack.push(&f.this);
1951            }
1952            Expression::Extract(f) => {
1953                stack.push(&f.this);
1954            }
1955            Expression::Round(f) => {
1956                stack.push(&f.this);
1957                if let Some(ref d) = f.decimals {
1958                    stack.push(d);
1959                }
1960            }
1961            Expression::Floor(f) => {
1962                stack.push(&f.this);
1963                if let Some(ref s) = f.scale {
1964                    stack.push(s);
1965                }
1966                if let Some(ref t) = f.to {
1967                    stack.push(t);
1968                }
1969            }
1970            Expression::Ceil(f) => {
1971                stack.push(&f.this);
1972                if let Some(ref d) = f.decimals {
1973                    stack.push(d);
1974                }
1975                if let Some(ref t) = f.to {
1976                    stack.push(t);
1977                }
1978            }
1979            Expression::Log(f) => {
1980                stack.push(&f.this);
1981                if let Some(ref b) = f.base {
1982                    stack.push(b);
1983                }
1984            }
1985            Expression::AtTimeZone(f) => {
1986                stack.push(&f.this);
1987                stack.push(&f.zone);
1988            }
1989            Expression::Lead(f) | Expression::Lag(f) => {
1990                stack.push(&f.this);
1991                if let Some(ref off) = f.offset {
1992                    stack.push(off);
1993                }
1994                if let Some(ref def) = f.default {
1995                    stack.push(def);
1996                }
1997            }
1998            Expression::FirstValue(f) | Expression::LastValue(f) => {
1999                stack.push(&f.this);
2000            }
2001            Expression::NthValue(f) => {
2002                stack.push(&f.this);
2003                stack.push(&f.offset);
2004            }
2005            Expression::Position(f) => {
2006                stack.push(&f.substring);
2007                stack.push(&f.string);
2008                if let Some(ref start) = f.start {
2009                    stack.push(start);
2010                }
2011            }
2012            Expression::Decode(f) => {
2013                stack.push(&f.this);
2014                for (search, result) in &f.search_results {
2015                    stack.push(search);
2016                    stack.push(result);
2017                }
2018                if let Some(ref def) = f.default {
2019                    stack.push(def);
2020                }
2021            }
2022            Expression::CharFunc(f) => {
2023                for arg in &f.args {
2024                    stack.push(arg);
2025                }
2026            }
2027            Expression::ArraySort(f) => {
2028                stack.push(&f.this);
2029                if let Some(ref cmp) = f.comparator {
2030                    stack.push(cmp);
2031                }
2032            }
2033            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
2034                stack.push(&f.this);
2035                stack.push(&f.separator);
2036                if let Some(ref nr) = f.null_replacement {
2037                    stack.push(nr);
2038                }
2039            }
2040            Expression::ArrayFilter(f) => {
2041                stack.push(&f.this);
2042                stack.push(&f.filter);
2043            }
2044            Expression::ArrayTransform(f) => {
2045                stack.push(&f.this);
2046                stack.push(&f.transform);
2047            }
2048            Expression::Sequence(f)
2049            | Expression::Generate(f)
2050            | Expression::ExplodingGenerateSeries(f) => {
2051                stack.push(&f.start);
2052                stack.push(&f.stop);
2053                if let Some(ref step) = f.step {
2054                    stack.push(step);
2055                }
2056            }
2057            Expression::JsonExtract(f)
2058            | Expression::JsonExtractScalar(f)
2059            | Expression::JsonQuery(f)
2060            | Expression::JsonValue(f) => {
2061                stack.push(&f.this);
2062                stack.push(&f.path);
2063            }
2064            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
2065                stack.push(&f.this);
2066                for p in &f.paths {
2067                    stack.push(p);
2068                }
2069            }
2070            Expression::JsonObject(f) => {
2071                for (k, v) in &f.pairs {
2072                    stack.push(k);
2073                    stack.push(v);
2074                }
2075            }
2076            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
2077                stack.push(&f.this);
2078                for (path, val) in &f.path_values {
2079                    stack.push(path);
2080                    stack.push(val);
2081                }
2082            }
2083            Expression::Overlay(f) => {
2084                stack.push(&f.this);
2085                stack.push(&f.replacement);
2086                stack.push(&f.from);
2087                if let Some(ref len) = f.length {
2088                    stack.push(len);
2089                }
2090            }
2091            Expression::Convert(f) => {
2092                stack.push(&f.this);
2093                if let Some(ref style) = f.style {
2094                    stack.push(style);
2095                }
2096            }
2097            Expression::ApproxPercentile(f) => {
2098                stack.push(&f.this);
2099                stack.push(&f.percentile);
2100                if let Some(ref acc) = f.accuracy {
2101                    stack.push(acc);
2102                }
2103                if let Some(ref filter) = f.filter {
2104                    stack.push(filter);
2105                }
2106            }
2107            Expression::Percentile(f)
2108            | Expression::PercentileCont(f)
2109            | Expression::PercentileDisc(f) => {
2110                stack.push(&f.this);
2111                stack.push(&f.percentile);
2112                if let Some(ref filter) = f.filter {
2113                    stack.push(filter);
2114                }
2115            }
2116            Expression::WithinGroup(f) => {
2117                stack.push(&f.this);
2118            }
2119            Expression::Left(f) | Expression::Right(f) => {
2120                stack.push(&f.this);
2121                stack.push(&f.length);
2122            }
2123            Expression::Repeat(f) => {
2124                stack.push(&f.this);
2125                stack.push(&f.times);
2126            }
2127            Expression::Lpad(f) | Expression::Rpad(f) => {
2128                stack.push(&f.this);
2129                stack.push(&f.length);
2130                if let Some(ref fill) = f.fill {
2131                    stack.push(fill);
2132                }
2133            }
2134            Expression::Split(f) => {
2135                stack.push(&f.this);
2136                stack.push(&f.delimiter);
2137            }
2138            Expression::RegexpLike(f) => {
2139                stack.push(&f.this);
2140                stack.push(&f.pattern);
2141                if let Some(ref flags) = f.flags {
2142                    stack.push(flags);
2143                }
2144            }
2145            Expression::RegexpReplace(f) => {
2146                stack.push(&f.this);
2147                stack.push(&f.pattern);
2148                stack.push(&f.replacement);
2149                if let Some(ref flags) = f.flags {
2150                    stack.push(flags);
2151                }
2152            }
2153            Expression::RegexpExtract(f) => {
2154                stack.push(&f.this);
2155                stack.push(&f.pattern);
2156                if let Some(ref group) = f.group {
2157                    stack.push(group);
2158                }
2159            }
2160            Expression::ToDate(f) => {
2161                stack.push(&f.this);
2162                if let Some(ref fmt) = f.format {
2163                    stack.push(fmt);
2164                }
2165            }
2166            Expression::ToTimestamp(f) => {
2167                stack.push(&f.this);
2168                if let Some(ref fmt) = f.format {
2169                    stack.push(fmt);
2170                }
2171            }
2172            Expression::DateFormat(f) | Expression::FormatDate(f) => {
2173                stack.push(&f.this);
2174                stack.push(&f.format);
2175            }
2176            Expression::LastDay(f) => {
2177                stack.push(&f.this);
2178            }
2179            Expression::FromUnixtime(f) => {
2180                stack.push(&f.this);
2181                if let Some(ref fmt) = f.format {
2182                    stack.push(fmt);
2183                }
2184            }
2185            Expression::UnixTimestamp(f) => {
2186                if let Some(ref this) = f.this {
2187                    stack.push(this);
2188                }
2189                if let Some(ref fmt) = f.format {
2190                    stack.push(fmt);
2191                }
2192            }
2193            Expression::MakeDate(f) => {
2194                stack.push(&f.year);
2195                stack.push(&f.month);
2196                stack.push(&f.day);
2197            }
2198            Expression::MakeTimestamp(f) => {
2199                stack.push(&f.year);
2200                stack.push(&f.month);
2201                stack.push(&f.day);
2202                stack.push(&f.hour);
2203                stack.push(&f.minute);
2204                stack.push(&f.second);
2205                if let Some(ref tz) = f.timezone {
2206                    stack.push(tz);
2207                }
2208            }
2209            Expression::TruncFunc(f) => {
2210                stack.push(&f.this);
2211                if let Some(ref d) = f.decimals {
2212                    stack.push(d);
2213                }
2214            }
2215            Expression::ArrayFunc(f) => {
2216                for e in &f.expressions {
2217                    stack.push(e);
2218                }
2219            }
2220            Expression::Unnest(f) => {
2221                stack.push(&f.this);
2222                for e in &f.expressions {
2223                    stack.push(e);
2224                }
2225            }
2226            Expression::StructFunc(f) => {
2227                for (_, e) in &f.fields {
2228                    stack.push(e);
2229                }
2230            }
2231            Expression::StructExtract(f) => {
2232                stack.push(&f.this);
2233            }
2234            Expression::NamedStruct(f) => {
2235                for (k, v) in &f.pairs {
2236                    stack.push(k);
2237                    stack.push(v);
2238                }
2239            }
2240            Expression::MapFunc(f) => {
2241                for k in &f.keys {
2242                    stack.push(k);
2243                }
2244                for v in &f.values {
2245                    stack.push(v);
2246                }
2247            }
2248            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2249                stack.push(&f.this);
2250                stack.push(&f.transform);
2251            }
2252            Expression::JsonArrayAgg(f) => {
2253                stack.push(&f.this);
2254                if let Some(ref filter) = f.filter {
2255                    stack.push(filter);
2256                }
2257            }
2258            Expression::JsonObjectAgg(f) => {
2259                stack.push(&f.key);
2260                stack.push(&f.value);
2261                if let Some(ref filter) = f.filter {
2262                    stack.push(filter);
2263                }
2264            }
2265            Expression::NTile(f) => {
2266                if let Some(ref n) = f.num_buckets {
2267                    stack.push(n);
2268                }
2269            }
2270            Expression::Rand(f) => {
2271                if let Some(ref s) = f.seed {
2272                    stack.push(s);
2273                }
2274                if let Some(ref lo) = f.lower {
2275                    stack.push(lo);
2276                }
2277                if let Some(ref hi) = f.upper {
2278                    stack.push(hi);
2279                }
2280            }
2281            Expression::Any(q) | Expression::All(q) => {
2282                stack.push(&q.this);
2283                stack.push(&q.subquery);
2284            }
2285            Expression::Overlaps(o) => {
2286                if let Some(ref this) = o.this {
2287                    stack.push(this);
2288                }
2289                if let Some(ref expr) = o.expression {
2290                    stack.push(expr);
2291                }
2292                if let Some(ref ls) = o.left_start {
2293                    stack.push(ls);
2294                }
2295                if let Some(ref le) = o.left_end {
2296                    stack.push(le);
2297                }
2298                if let Some(ref rs) = o.right_start {
2299                    stack.push(rs);
2300                }
2301                if let Some(ref re) = o.right_end {
2302                    stack.push(re);
2303                }
2304            }
2305            Expression::Interval(i) => {
2306                if let Some(ref this) = i.this {
2307                    stack.push(this);
2308                }
2309            }
2310            Expression::TimeStrToTime(f) => {
2311                stack.push(&f.this);
2312                if let Some(ref zone) = f.zone {
2313                    stack.push(zone);
2314                }
2315            }
2316            Expression::JSONBExtractScalar(f) => {
2317                stack.push(&f.this);
2318                stack.push(&f.expression);
2319                if let Some(ref jt) = f.json_type {
2320                    stack.push(jt);
2321                }
2322            }
2323
2324            // === True leaves and non-expression-bearing nodes ===
2325            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
2326            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
2327            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
2328            _ => {}
2329        }
2330    }
2331}
2332
2333// ---------------------------------------------------------------------------
2334// Tests
2335// ---------------------------------------------------------------------------
2336
2337#[cfg(test)]
2338mod tests {
2339    use super::*;
2340    use crate::dialects::{Dialect, DialectType};
2341    use crate::expressions::DataType;
2342    use crate::optimizer::annotate_types::annotate_types;
2343    use crate::parse_one;
2344    use crate::schema::{MappingSchema, Schema};
2345
2346    fn parse(sql: &str) -> Expression {
2347        let dialect = Dialect::get(DialectType::Generic);
2348        let ast = dialect.parse(sql).unwrap();
2349        ast.into_iter().next().unwrap()
2350    }
2351
2352    #[test]
2353    fn test_simple_lineage() {
2354        let expr = parse("SELECT a FROM t");
2355        let node = lineage("a", &expr, None, false).unwrap();
2356
2357        assert_eq!(node.name, "a");
2358        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2359        // Should trace to t.a
2360        let names = node.downstream_names();
2361        assert!(
2362            names.iter().any(|n| n == "t.a"),
2363            "Expected t.a in downstream, got: {:?}",
2364            names
2365        );
2366    }
2367
2368    #[test]
2369    fn test_lineage_walk() {
2370        let root = LineageNode {
2371            name: "col_a".to_string(),
2372            expression: Expression::Null(crate::expressions::Null),
2373            source: Expression::Null(crate::expressions::Null),
2374            downstream: vec![LineageNode::new(
2375                "t.a",
2376                Expression::Null(crate::expressions::Null),
2377                Expression::Null(crate::expressions::Null),
2378            )],
2379            source_name: String::new(),
2380            source_kind: SourceKind::Unknown,
2381            source_alias: None,
2382            reference_node_name: String::new(),
2383        };
2384
2385        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2386        assert_eq!(names.len(), 2);
2387        assert_eq!(names[0], "col_a");
2388        assert_eq!(names[1], "t.a");
2389    }
2390
2391    #[test]
2392    fn test_aliased_column() {
2393        let expr = parse("SELECT a + 1 AS b FROM t");
2394        let node = lineage("b", &expr, None, false).unwrap();
2395
2396        assert_eq!(node.name, "b");
2397        // Should trace through the expression to t.a
2398        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2399        assert!(
2400            all_names.iter().any(|n| n.contains("a")),
2401            "Expected to trace to column a, got: {:?}",
2402            all_names
2403        );
2404    }
2405
2406    #[test]
2407    fn test_qualified_column() {
2408        let expr = parse("SELECT t.a FROM t");
2409        let node = lineage("a", &expr, None, false).unwrap();
2410
2411        assert_eq!(node.name, "a");
2412        let names = node.downstream_names();
2413        assert!(
2414            names.iter().any(|n| n == "t.a"),
2415            "Expected t.a, got: {:?}",
2416            names
2417        );
2418    }
2419
2420    #[test]
2421    fn test_unqualified_column() {
2422        let expr = parse("SELECT a FROM t");
2423        let node = lineage("a", &expr, None, false).unwrap();
2424
2425        // Unqualified but single source → resolved to t.a
2426        let names = node.downstream_names();
2427        assert!(
2428            names.iter().any(|n| n == "t.a"),
2429            "Expected t.a, got: {:?}",
2430            names
2431        );
2432    }
2433
2434    #[test]
2435    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2436        let query = "SELECT name FROM users";
2437        let dialect = Dialect::get(DialectType::BigQuery);
2438        let expr = dialect
2439            .parse(query)
2440            .unwrap()
2441            .into_iter()
2442            .next()
2443            .expect("expected one expression");
2444
2445        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2446        schema
2447            .add_table("users", &[("name".into(), DataType::Text)], None)
2448            .expect("schema setup");
2449
2450        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2451            .expect("lineage without schema");
2452        let mut expr_without = node_without_schema.expression.clone();
2453        annotate_types(
2454            &mut expr_without,
2455            Some(&schema),
2456            Some(DialectType::BigQuery),
2457        );
2458        assert_eq!(
2459            expr_without.inferred_type(),
2460            None,
2461            "Expected unresolved root type without schema-aware lineage qualification"
2462        );
2463
2464        let node_with_schema = lineage_with_schema(
2465            "name",
2466            &expr,
2467            Some(&schema),
2468            Some(DialectType::BigQuery),
2469            false,
2470        )
2471        .expect("lineage with schema");
2472        let mut expr_with = node_with_schema.expression.clone();
2473        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2474
2475        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2476    }
2477
2478    #[test]
2479    fn test_lineage_with_schema_correlated_scalar_subquery() {
2480        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2481        let dialect = Dialect::get(DialectType::BigQuery);
2482        let expr = dialect
2483            .parse(query)
2484            .unwrap()
2485            .into_iter()
2486            .next()
2487            .expect("expected one expression");
2488
2489        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2490        schema
2491            .add_table(
2492                "t1",
2493                &[("id".into(), DataType::BigInt { length: None })],
2494                None,
2495            )
2496            .expect("schema setup");
2497        schema
2498            .add_table(
2499                "t2",
2500                &[
2501                    ("id".into(), DataType::BigInt { length: None }),
2502                    ("val".into(), DataType::BigInt { length: None }),
2503                ],
2504                None,
2505            )
2506            .expect("schema setup");
2507
2508        let node = lineage_with_schema(
2509            "id",
2510            &expr,
2511            Some(&schema),
2512            Some(DialectType::BigQuery),
2513            false,
2514        )
2515        .expect("lineage_with_schema should handle correlated scalar subqueries");
2516
2517        assert_eq!(node.name, "id");
2518    }
2519
2520    #[test]
2521    fn test_lineage_with_schema_join_using() {
2522        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2523        let dialect = Dialect::get(DialectType::BigQuery);
2524        let expr = dialect
2525            .parse(query)
2526            .unwrap()
2527            .into_iter()
2528            .next()
2529            .expect("expected one expression");
2530
2531        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2532        schema
2533            .add_table(
2534                "t1",
2535                &[("a".into(), DataType::BigInt { length: None })],
2536                None,
2537            )
2538            .expect("schema setup");
2539        schema
2540            .add_table(
2541                "t2",
2542                &[("a".into(), DataType::BigInt { length: None })],
2543                None,
2544            )
2545            .expect("schema setup");
2546
2547        let node = lineage_with_schema(
2548            "a",
2549            &expr,
2550            Some(&schema),
2551            Some(DialectType::BigQuery),
2552            false,
2553        )
2554        .expect("lineage_with_schema should handle JOIN USING");
2555
2556        assert_eq!(node.name, "a");
2557    }
2558
2559    #[test]
2560    fn test_lineage_with_schema_qualified_table_name() {
2561        let query = "SELECT a FROM raw.t1";
2562        let dialect = Dialect::get(DialectType::BigQuery);
2563        let expr = dialect
2564            .parse(query)
2565            .unwrap()
2566            .into_iter()
2567            .next()
2568            .expect("expected one expression");
2569
2570        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2571        schema
2572            .add_table(
2573                "raw.t1",
2574                &[("a".into(), DataType::BigInt { length: None })],
2575                None,
2576            )
2577            .expect("schema setup");
2578
2579        let node = lineage_with_schema(
2580            "a",
2581            &expr,
2582            Some(&schema),
2583            Some(DialectType::BigQuery),
2584            false,
2585        )
2586        .expect("lineage_with_schema should handle dotted schema.table names");
2587
2588        assert_eq!(node.name, "a");
2589    }
2590
2591    #[test]
2592    fn test_lineage_with_schema_none_matches_lineage() {
2593        let expr = parse("SELECT a FROM t");
2594        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2595        let with_none =
2596            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2597
2598        assert_eq!(with_none.name, baseline.name);
2599        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2600    }
2601
2602    #[test]
2603    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2604        let dialect = Dialect::get(DialectType::BigQuery);
2605        let expr = dialect
2606            .parse("SELECT Name AS name FROM teams")
2607            .unwrap()
2608            .into_iter()
2609            .next()
2610            .expect("expected one expression");
2611
2612        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2613        schema
2614            .add_table(
2615                "teams",
2616                &[("Name".into(), DataType::String { length: None })],
2617                None,
2618            )
2619            .expect("schema setup");
2620
2621        let node = lineage_with_schema(
2622            "name",
2623            &expr,
2624            Some(&schema),
2625            Some(DialectType::BigQuery),
2626            false,
2627        )
2628        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2629
2630        let names = node.downstream_names();
2631        assert!(
2632            names.iter().any(|n| n == "teams.Name"),
2633            "Expected teams.Name in downstream, got: {:?}",
2634            names
2635        );
2636    }
2637
2638    #[test]
2639    fn test_lineage_bigquery_mixed_case_alias_lookup() {
2640        let dialect = Dialect::get(DialectType::BigQuery);
2641        let expr = dialect
2642            .parse("SELECT Name AS Name FROM teams")
2643            .unwrap()
2644            .into_iter()
2645            .next()
2646            .expect("expected one expression");
2647
2648        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2649            .expect("lineage should resolve mixed-case aliases in BigQuery");
2650
2651        assert_eq!(node.name, "name");
2652    }
2653
2654    #[test]
2655    fn test_lineage_bigquery_unnest_alias_source_issue_209() {
2656        let expr = parse_one(
2657            r#"
2658SELECT date_val AS week_start
2659FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
2660"#,
2661            DialectType::BigQuery,
2662        )
2663        .expect("parse");
2664
2665        let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
2666            .expect("lineage should resolve UNNEST alias as a source");
2667        let child = node
2668            .downstream
2669            .first()
2670            .expect("week_start should have downstream lineage");
2671
2672        assert_eq!(child.name, "_0.date_val");
2673        assert_eq!(child.source_name, "_0");
2674        assert_eq!(child.source_kind, SourceKind::Virtual);
2675        assert_eq!(child.source_alias.as_deref(), Some("date_val"));
2676
2677        let Expression::Column(column) = &child.expression else {
2678            panic!(
2679                "expected downstream column expression, got {:?}",
2680                child.expression
2681            );
2682        };
2683        assert_eq!(column.name.name, "date_val");
2684        assert_eq!(
2685            column.table.as_ref().map(|table| table.name.as_str()),
2686            Some("_0")
2687        );
2688        assert!(
2689            matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
2690            "expected UNNEST source expression, got {:?}",
2691            child.source
2692        );
2693    }
2694
2695    #[test]
2696    fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
2697        let expr =
2698            parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
2699
2700        let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2701        let child = node.downstream.first().expect("id should have lineage");
2702
2703        assert_eq!(child.name, "date_val.id");
2704        assert_eq!(child.source_name, "date_val");
2705        assert_eq!(child.source_kind, SourceKind::Table);
2706        assert_eq!(child.source_alias, None);
2707    }
2708
2709    #[test]
2710    fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
2711        let expr = parse_one(
2712            r#"
2713SELECT a.a AS first_value, b.b AS second_value
2714FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
2715JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
2716"#,
2717            DialectType::BigQuery,
2718        )
2719        .expect("parse");
2720
2721        let first =
2722            lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2723        let second =
2724            lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2725
2726        let first_child = first.downstream.first().expect("first source");
2727        let second_child = second.downstream.first().expect("second source");
2728
2729        assert_eq!(first_child.name, "_0.a");
2730        assert_eq!(first_child.source_name, "_0");
2731        assert_eq!(first_child.source_alias.as_deref(), Some("a"));
2732        assert_eq!(first_child.source_kind, SourceKind::Virtual);
2733
2734        assert_eq!(second_child.name, "_1.b");
2735        assert_eq!(second_child.source_name, "_1");
2736        assert_eq!(second_child.source_alias.as_deref(), Some("b"));
2737        assert_eq!(second_child.source_kind, SourceKind::Virtual);
2738    }
2739
2740    #[test]
2741    fn test_lineage_table_backed_unnest_points_to_real_source_column() {
2742        let expr = parse_one(
2743            r#"
2744SELECT item.item AS item
2745FROM t JOIN UNNEST(t.items) AS item ON TRUE
2746"#,
2747            DialectType::BigQuery,
2748        )
2749        .expect("parse");
2750
2751        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
2752        let virtual_child = node.downstream.first().expect("virtual item source");
2753        assert_eq!(virtual_child.name, "_0.item");
2754        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
2755
2756        let real_child = virtual_child
2757            .downstream
2758            .first()
2759            .expect("UNNEST(t.items) should depend on t.items");
2760        assert_eq!(real_child.name, "t.items");
2761        assert_eq!(real_child.source_name, "t");
2762        assert_eq!(real_child.source_kind, SourceKind::Table);
2763    }
2764
2765    #[test]
2766    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
2767        let expr = parse_one(
2768            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2769            DialectType::Snowflake,
2770        )
2771        .expect("parse");
2772
2773        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2774        schema
2775            .add_table(
2776                "fact.some_daily_metrics",
2777                &[("date_utc".to_string(), DataType::Date)],
2778                None,
2779            )
2780            .expect("schema setup");
2781
2782        let node = lineage_with_schema(
2783            "recency",
2784            &expr,
2785            Some(&schema),
2786            Some(DialectType::Snowflake),
2787            false,
2788        )
2789        .expect("lineage_with_schema should not treat date part as a column");
2790
2791        let names = node.downstream_names();
2792        assert!(
2793            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2794            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2795            names
2796        );
2797        assert!(
2798            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2799            "Did not expect date part to appear as lineage column, got: {:?}",
2800            names
2801        );
2802    }
2803
2804    #[test]
2805    fn test_snowflake_datediff_parses_to_typed_ast() {
2806        let expr = parse_one(
2807            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2808            DialectType::Snowflake,
2809        )
2810        .expect("parse");
2811
2812        match expr {
2813            Expression::Select(select) => match &select.expressions[0] {
2814                Expression::Alias(alias) => match &alias.this {
2815                    Expression::DateDiff(f) => {
2816                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
2817                    }
2818                    other => panic!("expected DateDiff, got {other:?}"),
2819                },
2820                other => panic!("expected Alias, got {other:?}"),
2821            },
2822            other => panic!("expected Select, got {other:?}"),
2823        }
2824    }
2825
2826    #[test]
2827    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
2828        let expr = parse_one(
2829            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2830            DialectType::Snowflake,
2831        )
2832        .expect("parse");
2833
2834        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2835        schema
2836            .add_table(
2837                "fact.some_daily_metrics",
2838                &[("date_utc".to_string(), DataType::Date)],
2839                None,
2840            )
2841            .expect("schema setup");
2842
2843        let node = lineage_with_schema(
2844            "next_day",
2845            &expr,
2846            Some(&schema),
2847            Some(DialectType::Snowflake),
2848            false,
2849        )
2850        .expect("lineage_with_schema should not treat DATEADD date part as a column");
2851
2852        let names = node.downstream_names();
2853        assert!(
2854            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2855            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2856            names
2857        );
2858        assert!(
2859            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2860            "Did not expect date part to appear as lineage column, got: {:?}",
2861            names
2862        );
2863    }
2864
2865    #[test]
2866    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
2867        let expr = parse_one(
2868            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2869            DialectType::Snowflake,
2870        )
2871        .expect("parse");
2872
2873        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2874        schema
2875            .add_table(
2876                "fact.some_daily_metrics",
2877                &[("date_utc".to_string(), DataType::Date)],
2878                None,
2879            )
2880            .expect("schema setup");
2881
2882        let node = lineage_with_schema(
2883            "day_part",
2884            &expr,
2885            Some(&schema),
2886            Some(DialectType::Snowflake),
2887            false,
2888        )
2889        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
2890
2891        let names = node.downstream_names();
2892        assert!(
2893            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2894            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2895            names
2896        );
2897        assert!(
2898            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2899            "Did not expect date part to appear as lineage column, got: {:?}",
2900            names
2901        );
2902    }
2903
2904    #[test]
2905    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
2906        let expr = parse_one(
2907            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2908            DialectType::Snowflake,
2909        )
2910        .expect("parse");
2911
2912        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2913        schema
2914            .add_table(
2915                "fact.some_daily_metrics",
2916                &[("date_utc".to_string(), DataType::Date)],
2917                None,
2918            )
2919            .expect("schema setup");
2920
2921        let node = lineage_with_schema(
2922            "day_part",
2923            &expr,
2924            Some(&schema),
2925            Some(DialectType::Snowflake),
2926            false,
2927        )
2928        .expect("quoted DATE_PART should continue to work");
2929
2930        let names = node.downstream_names();
2931        assert!(
2932            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2933            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2934            names
2935        );
2936    }
2937
2938    #[test]
2939    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
2940        let expr = parse_one(
2941            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2942            DialectType::Snowflake,
2943        )
2944        .expect("parse");
2945
2946        match expr {
2947            Expression::Select(select) => match &select.expressions[0] {
2948                Expression::Alias(alias) => match &alias.this {
2949                    Expression::Function(f) => {
2950                        assert_eq!(f.name.to_uppercase(), "DATEADD");
2951                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2952                    }
2953                    other => panic!("expected generic DATEADD function, got {other:?}"),
2954                },
2955                other => panic!("expected Alias, got {other:?}"),
2956            },
2957            other => panic!("expected Select, got {other:?}"),
2958        }
2959    }
2960
2961    #[test]
2962    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
2963        let expr = parse_one(
2964            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2965            DialectType::Snowflake,
2966        )
2967        .expect("parse");
2968
2969        match expr {
2970            Expression::Select(select) => match &select.expressions[0] {
2971                Expression::Alias(alias) => match &alias.this {
2972                    Expression::Function(f) => {
2973                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2974                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2975                    }
2976                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2977                },
2978                other => panic!("expected Alias, got {other:?}"),
2979            },
2980            other => panic!("expected Select, got {other:?}"),
2981        }
2982    }
2983
2984    #[test]
2985    fn test_snowflake_date_part_string_literal_stays_generic_function() {
2986        let expr = parse_one(
2987            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2988            DialectType::Snowflake,
2989        )
2990        .expect("parse");
2991
2992        match expr {
2993            Expression::Select(select) => match &select.expressions[0] {
2994                Expression::Alias(alias) => match &alias.this {
2995                    Expression::Function(f) => {
2996                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2997                    }
2998                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2999                },
3000                other => panic!("expected Alias, got {other:?}"),
3001            },
3002            other => panic!("expected Select, got {other:?}"),
3003        }
3004    }
3005
3006    #[test]
3007    fn test_lineage_join() {
3008        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3009
3010        let node_a = lineage("a", &expr, None, false).unwrap();
3011        let names_a = node_a.downstream_names();
3012        assert!(
3013            names_a.iter().any(|n| n == "t.a"),
3014            "Expected t.a, got: {:?}",
3015            names_a
3016        );
3017
3018        let node_b = lineage("b", &expr, None, false).unwrap();
3019        let names_b = node_b.downstream_names();
3020        assert!(
3021            names_b.iter().any(|n| n == "s.b"),
3022            "Expected s.b, got: {:?}",
3023            names_b
3024        );
3025    }
3026
3027    #[test]
3028    fn test_lineage_alias_leaf_has_resolved_source_name() {
3029        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
3030        let node = lineage("col1", &expr, None, false).unwrap();
3031
3032        // Keep alias in the display lineage edge.
3033        let names = node.downstream_names();
3034        assert!(
3035            names.iter().any(|n| n == "t1.col1"),
3036            "Expected aliased column edge t1.col1, got: {:?}",
3037            names
3038        );
3039
3040        // Leaf should expose the resolved base table for consumers.
3041        let leaf = node
3042            .downstream
3043            .iter()
3044            .find(|n| n.name == "t1.col1")
3045            .expect("Expected t1.col1 leaf");
3046        assert_eq!(leaf.source_name, "table1");
3047        match &leaf.source {
3048            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
3049            _ => panic!("Expected leaf source to be a table expression"),
3050        }
3051    }
3052
3053    #[test]
3054    fn test_lineage_derived_table() {
3055        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
3056        let node = lineage("a", &expr, None, false).unwrap();
3057
3058        assert_eq!(node.name, "a");
3059        // Should trace through the derived table to t.a
3060        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3061        assert!(
3062            all_names.iter().any(|n| n == "t.a"),
3063            "Expected to trace through derived table to t.a, got: {:?}",
3064            all_names
3065        );
3066    }
3067
3068    #[test]
3069    fn test_lineage_cte() {
3070        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
3071        let node = lineage("a", &expr, None, false).unwrap();
3072
3073        assert_eq!(node.name, "a");
3074        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3075        assert!(
3076            all_names.iter().any(|n| n == "t.a"),
3077            "Expected to trace through CTE to t.a, got: {:?}",
3078            all_names
3079        );
3080    }
3081
3082    #[test]
3083    fn test_lineage_union() {
3084        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
3085        let node = lineage("a", &expr, None, false).unwrap();
3086
3087        assert_eq!(node.name, "a");
3088        // Should have 2 downstream branches
3089        assert_eq!(
3090            node.downstream.len(),
3091            2,
3092            "Expected 2 branches for UNION, got {}",
3093            node.downstream.len()
3094        );
3095    }
3096
3097    #[test]
3098    fn test_lineage_cte_union() {
3099        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
3100        let node = lineage("a", &expr, None, false).unwrap();
3101
3102        // Should trace through CTE into both UNION branches
3103        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3104        assert!(
3105            all_names.len() >= 3,
3106            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
3107            all_names
3108        );
3109    }
3110
3111    #[test]
3112    fn test_lineage_star() {
3113        let expr = parse("SELECT * FROM t");
3114        let node = lineage("*", &expr, None, false).unwrap();
3115
3116        assert_eq!(node.name, "*");
3117        // Should have downstream for table t
3118        assert!(
3119            !node.downstream.is_empty(),
3120            "Star should produce downstream nodes"
3121        );
3122    }
3123
3124    #[test]
3125    fn test_lineage_subquery_in_select() {
3126        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
3127        let node = lineage("x", &expr, None, false).unwrap();
3128
3129        assert_eq!(node.name, "x");
3130        // Should have traced into the scalar subquery
3131        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3132        assert!(
3133            all_names.len() >= 2,
3134            "Expected tracing into scalar subquery, got: {:?}",
3135            all_names
3136        );
3137    }
3138
3139    #[test]
3140    fn test_lineage_multiple_columns() {
3141        let expr = parse("SELECT a, b FROM t");
3142
3143        let node_a = lineage("a", &expr, None, false).unwrap();
3144        let node_b = lineage("b", &expr, None, false).unwrap();
3145
3146        assert_eq!(node_a.name, "a");
3147        assert_eq!(node_b.name, "b");
3148
3149        // Each should trace independently
3150        let names_a = node_a.downstream_names();
3151        let names_b = node_b.downstream_names();
3152        assert!(names_a.iter().any(|n| n == "t.a"));
3153        assert!(names_b.iter().any(|n| n == "t.b"));
3154    }
3155
3156    #[test]
3157    fn test_get_source_tables() {
3158        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
3159        let node = lineage("a", &expr, None, false).unwrap();
3160
3161        let tables = get_source_tables(&node);
3162        assert!(
3163            tables.contains("t"),
3164            "Expected source table 't', got: {:?}",
3165            tables
3166        );
3167    }
3168
3169    #[test]
3170    fn test_lineage_column_not_found() {
3171        let expr = parse("SELECT a FROM t");
3172        let result = lineage("nonexistent", &expr, None, false);
3173        assert!(result.is_err());
3174    }
3175
3176    #[test]
3177    fn test_lineage_nested_cte() {
3178        let expr = parse(
3179            "WITH cte1 AS (SELECT a FROM t), \
3180             cte2 AS (SELECT a FROM cte1) \
3181             SELECT a FROM cte2",
3182        );
3183        let node = lineage("a", &expr, None, false).unwrap();
3184
3185        // Should trace through cte2 → cte1 → t
3186        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3187        assert!(
3188            all_names.len() >= 3,
3189            "Expected to trace through nested CTEs, got: {:?}",
3190            all_names
3191        );
3192    }
3193
3194    #[test]
3195    fn test_trim_selects_true() {
3196        let expr = parse("SELECT a, b, c FROM t");
3197        let node = lineage("a", &expr, None, true).unwrap();
3198
3199        // The source should be trimmed to only include 'a'
3200        if let Expression::Select(select) = &node.source {
3201            assert_eq!(
3202                select.expressions.len(),
3203                1,
3204                "Trimmed source should have 1 expression, got {}",
3205                select.expressions.len()
3206            );
3207        } else {
3208            panic!("Expected Select source");
3209        }
3210    }
3211
3212    #[test]
3213    fn test_trim_selects_false() {
3214        let expr = parse("SELECT a, b, c FROM t");
3215        let node = lineage("a", &expr, None, false).unwrap();
3216
3217        // The source should keep all columns
3218        if let Expression::Select(select) = &node.source {
3219            assert_eq!(
3220                select.expressions.len(),
3221                3,
3222                "Untrimmed source should have 3 expressions"
3223            );
3224        } else {
3225            panic!("Expected Select source");
3226        }
3227    }
3228
3229    #[test]
3230    fn test_lineage_expression_in_select() {
3231        let expr = parse("SELECT a + b AS c FROM t");
3232        let node = lineage("c", &expr, None, false).unwrap();
3233
3234        // Should trace to both a and b from t
3235        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3236        assert!(
3237            all_names.len() >= 3,
3238            "Expected to trace a + b to both columns, got: {:?}",
3239            all_names
3240        );
3241    }
3242
3243    #[test]
3244    fn test_set_operation_by_index() {
3245        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
3246
3247        // Trace column "a" which is at index 0
3248        let node = lineage("a", &expr, None, false).unwrap();
3249
3250        // UNION branches should be traced by index
3251        assert_eq!(node.downstream.len(), 2);
3252    }
3253
3254    // --- Tests for column lineage inside function calls (issue #18) ---
3255
3256    fn print_node(node: &LineageNode, indent: usize) {
3257        let pad = "  ".repeat(indent);
3258        println!(
3259            "{pad}name={:?} source_name={:?}",
3260            node.name, node.source_name
3261        );
3262        for child in &node.downstream {
3263            print_node(child, indent + 1);
3264        }
3265    }
3266
3267    #[test]
3268    fn test_issue18_repro() {
3269        // Exact scenario from the issue
3270        let query = "SELECT UPPER(name) as upper_name FROM users";
3271        println!("Query: {query}\n");
3272
3273        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
3274        let exprs = dialect.parse(query).unwrap();
3275        let expr = &exprs[0];
3276
3277        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3278        println!("lineage(\"upper_name\"):");
3279        print_node(&node, 1);
3280
3281        let names = node.downstream_names();
3282        assert!(
3283            names.iter().any(|n| n == "users.name"),
3284            "Expected users.name in downstream, got: {:?}",
3285            names
3286        );
3287    }
3288
3289    #[test]
3290    fn test_lineage_bigquery_safe_namespace_issue207() {
3291        let query = r#"
3292WITH import_cte AS (
3293  SELECT timestamp, data, operation
3294  FROM `project`.`dataset`.`source_table`
3295),
3296transform_cte AS (
3297  SELECT
3298    timestamp,
3299    SAFE.PARSE_JSON(data) AS json_data
3300  FROM import_cte
3301)
3302SELECT json_data FROM transform_cte
3303"#;
3304        let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3305        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3306            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3307        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3308
3309        assert!(
3310            names.iter().any(|name| name == "source_table.data"),
3311            "expected source_table.data in lineage, got {names:?}"
3312        );
3313        assert!(
3314            !names
3315                .iter()
3316                .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3317            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3318        );
3319    }
3320
3321    #[test]
3322    fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3323        let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3324        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3325            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3326        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3327
3328        assert!(
3329            names.iter().any(|name| name == "t.data"),
3330            "expected t.data in lineage, got {names:?}"
3331        );
3332        assert!(
3333            !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3334            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3335        );
3336    }
3337
3338    #[test]
3339    fn test_lineage_method_call_receiver_control() {
3340        let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3341        let node = lineage("out", &expr, None, false).expect("lineage");
3342        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3343
3344        assert!(
3345            names.iter().any(|name| name == "t.obj"),
3346            "expected ordinary method receiver to remain in lineage, got {names:?}"
3347        );
3348        assert!(
3349            names.iter().any(|name| name == "t.arg"),
3350            "expected method argument in lineage, got {names:?}"
3351        );
3352    }
3353
3354    #[test]
3355    fn test_lineage_upper_function() {
3356        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3357        let node = lineage("upper_name", &expr, None, false).unwrap();
3358
3359        let names = node.downstream_names();
3360        assert!(
3361            names.iter().any(|n| n == "users.name"),
3362            "Expected users.name in downstream, got: {:?}",
3363            names
3364        );
3365    }
3366
3367    #[test]
3368    fn test_lineage_round_function() {
3369        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3370        let node = lineage("rounded", &expr, None, false).unwrap();
3371
3372        let names = node.downstream_names();
3373        assert!(
3374            names.iter().any(|n| n == "products.price"),
3375            "Expected products.price in downstream, got: {:?}",
3376            names
3377        );
3378    }
3379
3380    #[test]
3381    fn test_lineage_coalesce_function() {
3382        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3383        let node = lineage("val", &expr, None, false).unwrap();
3384
3385        let names = node.downstream_names();
3386        assert!(
3387            names.iter().any(|n| n == "t.a"),
3388            "Expected t.a in downstream, got: {:?}",
3389            names
3390        );
3391        assert!(
3392            names.iter().any(|n| n == "t.b"),
3393            "Expected t.b in downstream, got: {:?}",
3394            names
3395        );
3396    }
3397
3398    #[test]
3399    fn test_lineage_count_function() {
3400        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3401        let node = lineage("cnt", &expr, None, false).unwrap();
3402
3403        let names = node.downstream_names();
3404        assert!(
3405            names.iter().any(|n| n == "t.id"),
3406            "Expected t.id in downstream, got: {:?}",
3407            names
3408        );
3409    }
3410
3411    #[test]
3412    fn test_lineage_sum_function() {
3413        let expr = parse("SELECT SUM(amount) AS total FROM t");
3414        let node = lineage("total", &expr, None, false).unwrap();
3415
3416        let names = node.downstream_names();
3417        assert!(
3418            names.iter().any(|n| n == "t.amount"),
3419            "Expected t.amount in downstream, got: {:?}",
3420            names
3421        );
3422    }
3423
3424    #[test]
3425    fn test_lineage_case_with_nested_functions() {
3426        let expr =
3427            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3428        let node = lineage("result", &expr, None, false).unwrap();
3429
3430        let names = node.downstream_names();
3431        assert!(
3432            names.iter().any(|n| n == "t.x"),
3433            "Expected t.x in downstream, got: {:?}",
3434            names
3435        );
3436        assert!(
3437            names.iter().any(|n| n == "t.name"),
3438            "Expected t.name in downstream, got: {:?}",
3439            names
3440        );
3441    }
3442
3443    #[test]
3444    fn test_lineage_substring_function() {
3445        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3446        let node = lineage("short", &expr, None, false).unwrap();
3447
3448        let names = node.downstream_names();
3449        assert!(
3450            names.iter().any(|n| n == "t.name"),
3451            "Expected t.name in downstream, got: {:?}",
3452            names
3453        );
3454    }
3455
3456    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
3457
3458    #[test]
3459    fn test_lineage_cte_select_star() {
3460        // Ported from sqlglot: test_lineage_source_with_star
3461        // WITH y AS (SELECT * FROM x) SELECT a FROM y
3462        // After star expansion: SELECT y.a AS a FROM y
3463        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3464        let node = lineage("a", &expr, None, false).unwrap();
3465
3466        assert_eq!(node.name, "a");
3467        // Should successfully resolve column 'a' through the CTE
3468        // (previously failed with "Cannot find column 'a' in query")
3469        assert!(
3470            !node.downstream.is_empty(),
3471            "Expected downstream nodes tracing through CTE, got none"
3472        );
3473    }
3474
3475    #[test]
3476    fn test_lineage_cte_select_star_renamed_column() {
3477        // dbt standard pattern: CTE with column rename + outer SELECT *
3478        // This is the primary use case for dbt projects (jaffle-shop etc.)
3479        let expr =
3480            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3481        let node = lineage("customer_id", &expr, None, false).unwrap();
3482
3483        assert_eq!(node.name, "customer_id");
3484        // Should trace customer_id → renamed CTE → source.id
3485        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3486        assert!(
3487            all_names.len() >= 2,
3488            "Expected at least 2 nodes (customer_id → source), got: {:?}",
3489            all_names
3490        );
3491    }
3492
3493    #[test]
3494    fn test_lineage_cte_select_star_multiple_columns() {
3495        // CTE exposes multiple columns, outer SELECT * should resolve each
3496        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3497
3498        for col in &["a", "b", "c"] {
3499            let node = lineage(col, &expr, None, false).unwrap();
3500            assert_eq!(node.name, *col);
3501            // Verify lineage resolves without error (star expanded to explicit columns)
3502            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3503            assert!(
3504                all_names.len() >= 2,
3505                "Expected at least 2 nodes for column {}, got: {:?}",
3506                col,
3507                all_names
3508            );
3509        }
3510    }
3511
3512    #[test]
3513    fn test_lineage_nested_cte_select_star() {
3514        // Nested CTE star expansion: cte2 references cte1 via SELECT *
3515        let expr = parse(
3516            "WITH cte1 AS (SELECT a FROM t), \
3517             cte2 AS (SELECT * FROM cte1) \
3518             SELECT * FROM cte2",
3519        );
3520        let node = lineage("a", &expr, None, false).unwrap();
3521
3522        assert_eq!(node.name, "a");
3523        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3524        assert!(
3525            all_names.len() >= 3,
3526            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3527            all_names
3528        );
3529    }
3530
3531    #[test]
3532    fn test_lineage_three_level_nested_cte_star() {
3533        // Three-level nested CTE: cte3 → cte2 → cte1 → t
3534        let expr = parse(
3535            "WITH cte1 AS (SELECT x FROM t), \
3536             cte2 AS (SELECT * FROM cte1), \
3537             cte3 AS (SELECT * FROM cte2) \
3538             SELECT * FROM cte3",
3539        );
3540        let node = lineage("x", &expr, None, false).unwrap();
3541
3542        assert_eq!(node.name, "x");
3543        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3544        assert!(
3545            all_names.len() >= 4,
3546            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3547            all_names
3548        );
3549    }
3550
3551    #[test]
3552    fn test_lineage_cte_union_star() {
3553        // CTE with UNION body, outer SELECT * should resolve from left branch
3554        let expr = parse(
3555            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3556             SELECT * FROM cte",
3557        );
3558        let node = lineage("a", &expr, None, false).unwrap();
3559
3560        assert_eq!(node.name, "a");
3561        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3562        assert!(
3563            all_names.len() >= 2,
3564            "Expected at least 2 nodes for CTE union star, got: {:?}",
3565            all_names
3566        );
3567    }
3568
3569    #[test]
3570    fn test_lineage_cte_star_unknown_table() {
3571        // When CTE references an unknown table, star expansion is skipped gracefully
3572        // and lineage falls back to normal resolution (which may fail)
3573        let expr = parse(
3574            "WITH cte AS (SELECT * FROM unknown_table) \
3575             SELECT * FROM cte",
3576        );
3577        // This should not panic — it may succeed or fail depending on resolution,
3578        // but should not crash
3579        let _result = lineage("x", &expr, None, false);
3580    }
3581
3582    #[test]
3583    fn test_lineage_cte_explicit_columns() {
3584        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
3585        let expr = parse(
3586            "WITH cte(x, y) AS (SELECT a, b FROM t) \
3587             SELECT * FROM cte",
3588        );
3589        let node = lineage("x", &expr, None, false).unwrap();
3590        assert_eq!(node.name, "x");
3591    }
3592
3593    #[test]
3594    fn test_lineage_cte_qualified_star() {
3595        // Qualified star: SELECT cte.* FROM cte
3596        let expr = parse(
3597            "WITH cte AS (SELECT a, b FROM t) \
3598             SELECT cte.* FROM cte",
3599        );
3600        for col in &["a", "b"] {
3601            let node = lineage(col, &expr, None, false).unwrap();
3602            assert_eq!(node.name, *col);
3603            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3604            assert!(
3605                all_names.len() >= 2,
3606                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3607                col,
3608                all_names
3609            );
3610        }
3611    }
3612
3613    #[test]
3614    fn test_lineage_subquery_select_star() {
3615        // Ported from sqlglot: test_select_star
3616        // SELECT x FROM (SELECT * FROM table_a)
3617        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3618        let node = lineage("x", &expr, None, false).unwrap();
3619
3620        assert_eq!(node.name, "x");
3621        assert!(
3622            !node.downstream.is_empty(),
3623            "Expected downstream nodes for subquery with SELECT *, got none"
3624        );
3625    }
3626
3627    #[test]
3628    fn test_lineage_cte_star_with_schema_external_table() {
3629        // CTE references an external table via SELECT * — schema enables expansion
3630        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3631SELECT * FROM orders"#;
3632        let expr = parse(sql);
3633
3634        let mut schema = MappingSchema::new();
3635        let cols = vec![
3636            ("order_id".to_string(), DataType::Unknown),
3637            ("customer_id".to_string(), DataType::Unknown),
3638            ("amount".to_string(), DataType::Unknown),
3639        ];
3640        schema.add_table("stg_orders", &cols, None).unwrap();
3641
3642        let node =
3643            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3644                .unwrap();
3645        assert_eq!(node.name, "order_id");
3646    }
3647
3648    #[test]
3649    fn test_lineage_cte_star_with_schema_three_part_name() {
3650        // CTE references an external table with fully-qualified 3-part name
3651        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3652SELECT * FROM orders"#;
3653        let expr = parse(sql);
3654
3655        let mut schema = MappingSchema::new();
3656        let cols = vec![
3657            ("order_id".to_string(), DataType::Unknown),
3658            ("customer_id".to_string(), DataType::Unknown),
3659        ];
3660        schema
3661            .add_table("db.schema.stg_orders", &cols, None)
3662            .unwrap();
3663
3664        let node = lineage_with_schema(
3665            "customer_id",
3666            &expr,
3667            Some(&schema as &dyn Schema),
3668            None,
3669            false,
3670        )
3671        .unwrap();
3672        assert_eq!(node.name, "customer_id");
3673    }
3674
3675    #[test]
3676    fn test_lineage_cte_star_with_schema_nested() {
3677        // Nested CTEs: outer CTE references inner CTE with SELECT *,
3678        // inner CTE references external table with SELECT *
3679        let sql = r#"WITH
3680            raw AS (SELECT * FROM external_table),
3681            enriched AS (SELECT * FROM raw)
3682        SELECT * FROM enriched"#;
3683        let expr = parse(sql);
3684
3685        let mut schema = MappingSchema::new();
3686        let cols = vec![
3687            ("id".to_string(), DataType::Unknown),
3688            ("name".to_string(), DataType::Unknown),
3689        ];
3690        schema.add_table("external_table", &cols, None).unwrap();
3691
3692        let node =
3693            lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3694        assert_eq!(node.name, "name");
3695    }
3696
3697    #[test]
3698    fn test_lineage_cte_qualified_star_with_schema() {
3699        // CTE uses qualified star (orders.*) from a CTE whose columns
3700        // come from an external table via SELECT *
3701        let sql = r#"WITH
3702            orders AS (SELECT * FROM stg_orders),
3703            enriched AS (
3704                SELECT orders.*, 'extra' AS extra
3705                FROM orders
3706            )
3707        SELECT * FROM enriched"#;
3708        let expr = parse(sql);
3709
3710        let mut schema = MappingSchema::new();
3711        let cols = vec![
3712            ("order_id".to_string(), DataType::Unknown),
3713            ("total".to_string(), DataType::Unknown),
3714        ];
3715        schema.add_table("stg_orders", &cols, None).unwrap();
3716
3717        let node =
3718            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3719                .unwrap();
3720        assert_eq!(node.name, "order_id");
3721
3722        // Also verify the extra column works
3723        let extra =
3724            lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3725        assert_eq!(extra.name, "extra");
3726    }
3727
3728    #[test]
3729    fn test_lineage_cte_star_without_schema_still_works() {
3730        // Without schema, CTE-to-CTE star expansion still works
3731        let sql = r#"WITH
3732            cte1 AS (SELECT id, name FROM raw_table),
3733            cte2 AS (SELECT * FROM cte1)
3734        SELECT * FROM cte2"#;
3735        let expr = parse(sql);
3736
3737        // No schema — should still resolve through CTE chain
3738        let node = lineage("id", &expr, None, false).unwrap();
3739        assert_eq!(node.name, "id");
3740    }
3741
3742    #[test]
3743    fn test_lineage_nested_cte_star_with_join_and_schema() {
3744        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
3745        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
3746        let sql = r#"WITH
3747base_orders AS (
3748    SELECT * FROM stg_orders
3749),
3750with_payments AS (
3751    SELECT
3752        base_orders.*,
3753        p.amount
3754    FROM base_orders
3755    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
3756),
3757final_cte AS (
3758    SELECT * FROM with_payments
3759)
3760SELECT * FROM final_cte"#;
3761        let expr = parse(sql);
3762
3763        let mut schema = MappingSchema::new();
3764        let order_cols = vec![
3765            (
3766                "order_id".to_string(),
3767                crate::expressions::DataType::Unknown,
3768            ),
3769            (
3770                "customer_id".to_string(),
3771                crate::expressions::DataType::Unknown,
3772            ),
3773            ("status".to_string(), crate::expressions::DataType::Unknown),
3774        ];
3775        let pay_cols = vec![
3776            (
3777                "payment_id".to_string(),
3778                crate::expressions::DataType::Unknown,
3779            ),
3780            (
3781                "order_id".to_string(),
3782                crate::expressions::DataType::Unknown,
3783            ),
3784            ("amount".to_string(), crate::expressions::DataType::Unknown),
3785        ];
3786        schema.add_table("stg_orders", &order_cols, None).unwrap();
3787        schema.add_table("stg_payments", &pay_cols, None).unwrap();
3788
3789        // order_id should trace back to stg_orders
3790        let node =
3791            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3792                .unwrap();
3793        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3794
3795        // The leaf should be "stg_orders.order_id" (not just "order_id")
3796        let has_table_qualified = all_names
3797            .iter()
3798            .any(|n| n.contains('.') && n.contains("order_id"));
3799        assert!(
3800            has_table_qualified,
3801            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
3802            all_names
3803        );
3804
3805        // amount should trace back to stg_payments
3806        let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
3807            .unwrap();
3808        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3809
3810        let has_table_qualified = all_names
3811            .iter()
3812            .any(|n| n.contains('.') && n.contains("amount"));
3813        assert!(
3814            has_table_qualified,
3815            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
3816            all_names
3817        );
3818    }
3819
3820    #[test]
3821    fn test_lineage_cte_alias_resolution() {
3822        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
3823        let sql = r#"WITH import_stg_items AS (
3824    SELECT item_id, name, status FROM stg_items
3825)
3826SELECT base.item_id, base.status
3827FROM import_stg_items AS base"#;
3828        let expr = parse(sql);
3829
3830        let node = lineage("item_id", &expr, None, false).unwrap();
3831        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3832        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
3833        assert!(
3834            all_names.iter().any(|n| n == "stg_items.item_id"),
3835            "Expected leaf 'stg_items.item_id', got: {:?}",
3836            all_names
3837        );
3838    }
3839
3840    #[test]
3841    fn test_lineage_cte_alias_with_schema_and_star() {
3842        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
3843        let sql = r#"WITH import_stg AS (
3844    SELECT * FROM stg_items
3845)
3846SELECT base.item_id, base.status
3847FROM import_stg AS base"#;
3848        let expr = parse(sql);
3849
3850        let mut schema = MappingSchema::new();
3851        schema
3852            .add_table(
3853                "stg_items",
3854                &[
3855                    ("item_id".to_string(), DataType::Unknown),
3856                    ("name".to_string(), DataType::Unknown),
3857                    ("status".to_string(), DataType::Unknown),
3858                ],
3859                None,
3860            )
3861            .unwrap();
3862
3863        let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
3864            .unwrap();
3865        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3866        assert!(
3867            all_names.iter().any(|n| n == "stg_items.item_id"),
3868            "Expected leaf 'stg_items.item_id', got: {:?}",
3869            all_names
3870        );
3871    }
3872
3873    #[test]
3874    fn test_lineage_cte_alias_with_join() {
3875        // Multiple CTE aliases in a JOIN: each should resolve independently
3876        let sql = r#"WITH
3877    import_users AS (SELECT id, name FROM users),
3878    import_orders AS (SELECT id, user_id, amount FROM orders)
3879SELECT u.name, o.amount
3880FROM import_users AS u
3881LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
3882        let expr = parse(sql);
3883
3884        let node = lineage("name", &expr, None, false).unwrap();
3885        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3886        assert!(
3887            all_names.iter().any(|n| n == "users.name"),
3888            "Expected leaf 'users.name', got: {:?}",
3889            all_names
3890        );
3891
3892        let node = lineage("amount", &expr, None, false).unwrap();
3893        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3894        assert!(
3895            all_names.iter().any(|n| n == "orders.amount"),
3896            "Expected leaf 'orders.amount', got: {:?}",
3897            all_names
3898        );
3899    }
3900
3901    // -----------------------------------------------------------------------
3902    // Quoted CTE name tests — verifying SQL identifier case semantics
3903    // -----------------------------------------------------------------------
3904
3905    #[test]
3906    fn test_lineage_unquoted_cte_case_insensitive() {
3907        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
3908        // MyCte and MYCTE should match.
3909        let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
3910        let node = lineage("col", &expr, None, false).unwrap();
3911        assert_eq!(node.name, "col");
3912        assert!(
3913            !node.downstream.is_empty(),
3914            "Unquoted CTE should resolve case-insensitively"
3915        );
3916    }
3917
3918    #[test]
3919    fn test_lineage_quoted_cte_case_preserved() {
3920        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
3921        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
3922        let node = lineage("col", &expr, None, false).unwrap();
3923        assert_eq!(node.name, "col");
3924        assert!(
3925            !node.downstream.is_empty(),
3926            "Quoted CTE with matching case should resolve"
3927        );
3928    }
3929
3930    #[test]
3931    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
3932        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
3933        // sqlglot treats this as a table reference, not a CTE match.
3934        // Star expansion should NOT resolve through the CTE.
3935        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
3936        // lineage("col", ...) should fail because "mycte" is treated as an external
3937        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
3938        let result = lineage("col", &expr, None, false);
3939        assert!(
3940            result.is_err(),
3941            "Quoted CTE with case mismatch should not expand star: {:?}",
3942            result
3943        );
3944    }
3945
3946    #[test]
3947    fn test_lineage_mixed_quoted_unquoted_cte() {
3948        // Mix of unquoted and quoted CTEs in a nested chain.
3949        let expr = parse(
3950            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
3951        );
3952        let node = lineage("a", &expr, None, false).unwrap();
3953        assert_eq!(node.name, "a");
3954        assert!(
3955            !node.downstream.is_empty(),
3956            "Mixed quoted/unquoted CTE chain should resolve"
3957        );
3958    }
3959
3960    // -----------------------------------------------------------------------
3961    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
3962    // -----------------------------------------------------------------------
3963    //
3964    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
3965    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
3966    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
3967    // direct string comparison for CTE name matching, ignoring the quoted status.
3968    //
3969    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
3970    // and unquoted as case-insensitive. The scope system should do the same.
3971    //
3972    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
3973    // which is broader than the star expansion scope of this PR.
3974
3975    #[test]
3976    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
3977        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
3978        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
3979        // as "mycte" incorrectly resolves to the CTE.
3980        //
3981        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
3982        // case-sensitive: "mycte" should NOT match CTE "MyCte".
3983        //
3984        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
3985        // this test should fail — update the assertion to match correct behavior:
3986        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
3987        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
3988        let node = lineage("col", &expr, None, false).unwrap();
3989        assert!(!node.downstream.is_empty());
3990        let child = &node.downstream[0];
3991        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
3992        assert_eq!(
3993            child.source_name, "MyCte",
3994            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3995             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3996        );
3997    }
3998
3999    #[test]
4000    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
4001        // Known bug: same as above but with qualified column reference ("mycte".col).
4002        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
4003        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
4004        //
4005        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
4006        // this test should fail — update to assert source_name != "MyCte".
4007        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
4008        let node = lineage("col", &expr, None, false).unwrap();
4009        assert!(!node.downstream.is_empty());
4010        let child = &node.downstream[0];
4011        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
4012        assert_eq!(
4013            child.source_name, "MyCte",
4014            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
4015             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
4016        );
4017    }
4018
4019    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
4020
4021    /// sqlglot: test_node_name_doesnt_contain_comment
4022    /// Comments in column expressions should not affect lineage resolution.
4023    /// NOTE: This test uses SELECT * from a derived table, which is a separate
4024    /// known limitation in polyglot-sql (star expansion in subqueries).
4025    #[test]
4026    #[ignore = "requires derived table star expansion (separate issue)"]
4027    fn test_node_name_doesnt_contain_comment() {
4028        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
4029        let node = lineage("x", &expr, None, false).unwrap();
4030
4031        assert_eq!(node.name, "x");
4032        assert!(!node.downstream.is_empty());
4033    }
4034
4035    /// A line comment between SELECT and the first column wraps the column
4036    /// in an Annotated node. Lineage must unwrap it to find the column name.
4037    /// Verify that commented and uncommented queries produce identical lineage.
4038    #[test]
4039    fn test_comment_before_first_column_in_cte() {
4040        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
4041        let sql_without_comment = "with t as (select 1 as a) select a from t";
4042
4043        // Without comment — baseline
4044        let expr_ok = parse(sql_without_comment);
4045        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
4046
4047        // With comment — should produce identical lineage
4048        let expr_comment = parse(sql_with_comment);
4049        let node_comment = lineage("a", &expr_comment, None, false)
4050            .expect("with comment before first column should succeed");
4051
4052        assert_eq!(node_ok.name, node_comment.name, "node names should match");
4053        assert_eq!(
4054            node_ok.downstream_names(),
4055            node_comment.downstream_names(),
4056            "downstream lineage should be identical with or without comment"
4057        );
4058    }
4059
4060    /// Block comment between SELECT and first column.
4061    #[test]
4062    fn test_block_comment_before_first_column() {
4063        let sql = "with t as (select 1 as a) select /* section */ a from t";
4064        let expr = parse(sql);
4065        let node = lineage("a", &expr, None, false)
4066            .expect("block comment before first column should succeed");
4067        assert_eq!(node.name, "a");
4068        assert!(
4069            !node.downstream.is_empty(),
4070            "should have downstream lineage"
4071        );
4072    }
4073
4074    /// Comment before first column should not affect second column resolution.
4075    #[test]
4076    fn test_comment_before_first_column_second_col_ok() {
4077        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
4078        let expr = parse(sql);
4079
4080        let node_a =
4081            lineage("a", &expr, None, false).expect("column a with comment should succeed");
4082        assert_eq!(node_a.name, "a");
4083
4084        let node_b =
4085            lineage("b", &expr, None, false).expect("column b with comment should succeed");
4086        assert_eq!(node_b.name, "b");
4087    }
4088
4089    /// Aliased column with preceding comment.
4090    #[test]
4091    fn test_comment_before_aliased_column() {
4092        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
4093        let expr = parse(sql);
4094        let node =
4095            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
4096        assert_eq!(node.name, "y");
4097        assert!(
4098            !node.downstream.is_empty(),
4099            "aliased column should have downstream lineage"
4100        );
4101    }
4102}