Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19/// A node in the column lineage graph
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22    /// Name of this lineage step (e.g., "table.column")
23    pub name: String,
24    /// The expression at this node
25    pub expression: Expression,
26    /// The source expression (the full query context)
27    pub source: Expression,
28    /// Downstream nodes that depend on this one
29    pub downstream: Vec<LineageNode>,
30    /// Optional source name (e.g., for derived tables)
31    pub source_name: String,
32    /// Optional reference node name (e.g., for CTEs)
33    pub reference_node_name: String,
34}
35
36impl LineageNode {
37    /// Create a new lineage node
38    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
39        Self {
40            name: name.into(),
41            expression,
42            source,
43            downstream: Vec::new(),
44            source_name: String::new(),
45            reference_node_name: String::new(),
46        }
47    }
48
49    /// Iterate over all nodes in the lineage graph using DFS
50    pub fn walk(&self) -> LineageWalker<'_> {
51        LineageWalker { stack: vec![self] }
52    }
53
54    /// Get all downstream column names
55    pub fn downstream_names(&self) -> Vec<String> {
56        self.downstream.iter().map(|n| n.name.clone()).collect()
57    }
58}
59
60/// Iterator for walking the lineage graph
61pub struct LineageWalker<'a> {
62    stack: Vec<&'a LineageNode>,
63}
64
65impl<'a> Iterator for LineageWalker<'a> {
66    type Item = &'a LineageNode;
67
68    fn next(&mut self) -> Option<Self::Item> {
69        if let Some(node) = self.stack.pop() {
70            // Add children in reverse order so they're visited in order
71            for child in node.downstream.iter().rev() {
72                self.stack.push(child);
73            }
74            Some(node)
75        } else {
76            None
77        }
78    }
79}
80
81// ---------------------------------------------------------------------------
82// ColumnRef: name or positional index for column lookup
83// ---------------------------------------------------------------------------
84
85/// Column reference for lineage tracing — by name or positional index.
86enum ColumnRef<'a> {
87    Name(&'a str),
88    Index(usize),
89}
90
91// ---------------------------------------------------------------------------
92// Public API
93// ---------------------------------------------------------------------------
94
95/// Build the lineage graph for a column in a SQL query
96///
97/// # Arguments
98/// * `column` - The column name to trace lineage for
99/// * `sql` - The SQL expression (SELECT, UNION, etc.)
100/// * `dialect` - Optional dialect for parsing
101/// * `trim_selects` - If true, trim the source SELECT to only include the target column
102///
103/// # Returns
104/// The root lineage node for the specified column
105///
106/// # Example
107/// ```ignore
108/// use polyglot_sql::lineage::lineage;
109/// use polyglot_sql::parse_one;
110/// use polyglot_sql::DialectType;
111///
112/// let sql = "SELECT a, b + 1 AS c FROM t";
113/// let expr = parse_one(sql, DialectType::Generic).unwrap();
114/// let node = lineage("c", &expr, None, false).unwrap();
115/// ```
116pub fn lineage(
117    column: &str,
118    sql: &Expression,
119    dialect: Option<DialectType>,
120    trim_selects: bool,
121) -> Result<LineageNode> {
122    // Fast path: skip clone when there are no CTEs to expand
123    let has_with = matches!(sql, Expression::Select(s) if s.with.is_some());
124    if !has_with {
125        return lineage_from_expression(column, sql, dialect, trim_selects);
126    }
127    let mut owned = sql.clone();
128    expand_cte_stars(&mut owned, None);
129    lineage_from_expression(column, &owned, dialect, trim_selects)
130}
131
132/// Build the lineage graph for a column in a SQL query using optional schema metadata.
133///
134/// When `schema` is provided, the query is first qualified with
135/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
136/// ambiguous column references.
137///
138/// # Arguments
139/// * `column` - The column name to trace lineage for
140/// * `sql` - The SQL expression (SELECT, UNION, etc.)
141/// * `schema` - Optional schema used for qualification
142/// * `dialect` - Optional dialect for qualification and lineage handling
143/// * `trim_selects` - If true, trim the source SELECT to only include the target column
144///
145/// # Returns
146/// The root lineage node for the specified column
147pub fn lineage_with_schema(
148    column: &str,
149    sql: &Expression,
150    schema: Option<&dyn Schema>,
151    dialect: Option<DialectType>,
152    trim_selects: bool,
153) -> Result<LineageNode> {
154    let mut qualified_expression = if let Some(schema) = schema {
155        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
156            QualifyColumnsOptions::new().with_dialect(dialect_type)
157        } else {
158            QualifyColumnsOptions::new()
159        };
160
161        qualify_columns(sql.clone(), schema, &options).map_err(|e| {
162            Error::internal(format!("Lineage qualification failed with schema: {}", e))
163        })?
164    } else {
165        sql.clone()
166    };
167
168    // Annotate types in-place so lineage nodes carry type information
169    annotate_types(&mut qualified_expression, schema, dialect);
170
171    // Expand CTE stars on the already-owned expression (no extra clone).
172    // Pass schema so that stars from external tables can also be resolved.
173    expand_cte_stars(&mut qualified_expression, schema);
174
175    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
176}
177
178fn lineage_from_expression(
179    column: &str,
180    sql: &Expression,
181    dialect: Option<DialectType>,
182    trim_selects: bool,
183) -> Result<LineageNode> {
184    let scope = build_scope(sql);
185    to_node(
186        ColumnRef::Name(column),
187        &scope,
188        dialect,
189        "",
190        "",
191        "",
192        trim_selects,
193    )
194}
195
196// ---------------------------------------------------------------------------
197// CTE star expansion
198// ---------------------------------------------------------------------------
199
200/// Normalize an identifier for CTE name matching.
201///
202/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
203/// quoted identifiers preserve their original case. This matches sqlglot's
204/// `normalize_identifiers` behavior.
205fn normalize_cte_name(ident: &Identifier) -> String {
206    if ident.quoted {
207        ident.name.clone()
208    } else {
209        ident.name.to_lowercase()
210    }
211}
212
213/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
214/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
215/// which qualify_columns cannot resolve because it processes each SELECT independently.
216///
217/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
218/// by looking up column names in the schema. This enables correct expansion of patterns
219/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
220///
221/// CTE name matching follows SQL identifier semantics: unquoted names are compared
222/// case-insensitively (lowercased), while quoted names preserve their original case.
223/// This matches sqlglot's `normalize_identifiers` behavior.
224pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
225    let select = match expr {
226        Expression::Select(s) => s,
227        _ => return,
228    };
229
230    let with = match &mut select.with {
231        Some(w) => w,
232        None => return,
233    };
234
235    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
236
237    for cte in &mut with.ctes {
238        let cte_name = normalize_cte_name(&cte.alias);
239
240        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
241        if !cte.columns.is_empty() {
242            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
243            resolved_cte_columns.insert(cte_name, cols);
244            continue;
245        }
246
247        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
248        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
249        // references itself. We detect this conservatively: if the CTE name appears as
250        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
251        // block are still expanded.
252        if with.recursive {
253            let is_self_referencing =
254                if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
255                    let body_sources = get_select_sources(body_select);
256                    body_sources.iter().any(|s| s.normalized == cte_name)
257                } else {
258                    false
259                };
260            if is_self_referencing {
261                continue;
262            }
263        }
264
265        // Get the SELECT from the CTE body (handle UNION by taking left branch)
266        let body_select = match get_leftmost_select_mut(&mut cte.this) {
267            Some(s) => s,
268            None => continue,
269        };
270
271        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
272        resolved_cte_columns.insert(cte_name, columns);
273    }
274
275    // Also expand stars in the outer SELECT itself
276    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
277}
278
279/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
280///
281/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
282/// are determined by the left branch. This matches sqlglot's behavior.
283fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
284    let mut current = expr;
285    for _ in 0..MAX_LINEAGE_DEPTH {
286        match current {
287            Expression::Select(s) => return Some(s),
288            Expression::Union(u) => current = &mut u.left,
289            Expression::Intersect(i) => current = &mut i.left,
290            Expression::Except(e) => current = &mut e.left,
291            Expression::Paren(p) => current = &mut p.this,
292            _ => return None,
293        }
294    }
295    None
296}
297
298/// Rewrite star expressions in a SELECT using resolved CTE column lists.
299/// Falls back to `schema` for external table column lookup.
300/// Returns the list of output column names after expansion.
301fn rewrite_stars_in_select(
302    select: &mut Select,
303    resolved_ctes: &HashMap<String, Vec<String>>,
304    schema: Option<&dyn Schema>,
305) -> Vec<String> {
306    // The AST represents star expressions in two forms depending on syntax:
307    //   - `SELECT *`      → Expression::Star (unqualified star)
308    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
309    // Both must be checked to handle all star patterns.
310    let has_star = select
311        .expressions
312        .iter()
313        .any(|e| matches!(e, Expression::Star(_)));
314    let has_qualified_star = select
315        .expressions
316        .iter()
317        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
318
319    if !has_star && !has_qualified_star {
320        // No stars — just extract column names without rewriting
321        return select
322            .expressions
323            .iter()
324            .filter_map(get_expression_output_name)
325            .collect();
326    }
327
328    let sources = get_select_sources(select);
329    let mut new_expressions = Vec::new();
330    let mut result_columns = Vec::new();
331
332    for expr in &select.expressions {
333        match expr {
334            Expression::Star(star) => {
335                let qual = star.table.as_ref();
336                if let Some(expanded) =
337                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
338                {
339                    for (src_alias, col_name) in &expanded {
340                        let table_id = Identifier::new(src_alias);
341                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
342                        result_columns.push(col_name.clone());
343                    }
344                } else {
345                    new_expressions.push(expr.clone());
346                    result_columns.push("*".to_string());
347                }
348            }
349            Expression::Column(c) if c.name.name == "*" => {
350                let qual = c.table.as_ref();
351                if let Some(expanded) =
352                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
353                {
354                    for (_src_alias, col_name) in &expanded {
355                        // Keep the original table qualifier for qualified stars (table.*)
356                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
357                        result_columns.push(col_name.clone());
358                    }
359                } else {
360                    new_expressions.push(expr.clone());
361                    result_columns.push("*".to_string());
362                }
363            }
364            _ => {
365                new_expressions.push(expr.clone());
366                if let Some(name) = get_expression_output_name(expr) {
367                    result_columns.push(name);
368                }
369            }
370        }
371    }
372
373    select.expressions = new_expressions;
374    result_columns
375}
376
377/// Try to expand a star expression by looking up source columns from resolved CTEs,
378/// falling back to the schema for external tables.
379/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
380/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
381fn expand_star_from_sources(
382    qualifier: Option<&Identifier>,
383    sources: &[SourceInfo],
384    resolved_ctes: &HashMap<String, Vec<String>>,
385    schema: Option<&dyn Schema>,
386) -> Option<Vec<(String, String)>> {
387    let mut expanded = Vec::new();
388
389    if let Some(qual) = qualifier {
390        // Qualified star: table.*
391        let qual_normalized = normalize_cte_name(qual);
392        for src in sources {
393            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
394                // Try CTE first
395                if let Some(cols) = resolved_ctes.get(&src.normalized) {
396                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
397                    return Some(expanded);
398                }
399                // Fall back to schema
400                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
401                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
402                    return Some(expanded);
403                }
404            }
405        }
406        None
407    } else {
408        // Unqualified star: expand all sources.
409        // Intentionally conservative: if any source can't be resolved, the entire
410        // expansion is aborted. Partial expansion would produce an incomplete column
411        // list, causing downstream lineage resolution to silently omit columns.
412        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
413        let mut any_expanded = false;
414        for src in sources {
415            if let Some(cols) = resolved_ctes.get(&src.normalized) {
416                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
417                any_expanded = true;
418            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
419                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
420                any_expanded = true;
421            } else {
422                return None;
423            }
424        }
425        if any_expanded {
426            Some(expanded)
427        } else {
428            None
429        }
430    }
431}
432
433/// Look up column names for a table from the schema.
434fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
435    let schema = schema?;
436    if fq_name.is_empty() {
437        return None;
438    }
439    schema
440        .column_names(fq_name)
441        .ok()
442        .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
443}
444
445/// Create a Column expression with the given name and optional table qualifier.
446fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
447    Expression::Column(Box::new(crate::expressions::Column {
448        name: Identifier::new(name),
449        table: table.cloned(),
450        join_mark: false,
451        trailing_comments: Vec::new(),
452        span: None,
453        inferred_type: None,
454    }))
455}
456
457/// Extract the output name of a SELECT expression.
458fn get_expression_output_name(expr: &Expression) -> Option<String> {
459    match expr {
460        Expression::Alias(a) => Some(a.alias.name.clone()),
461        Expression::Column(c) => Some(c.name.name.clone()),
462        Expression::Identifier(id) => Some(id.name.clone()),
463        Expression::Star(_) => Some("*".to_string()),
464        _ => None,
465    }
466}
467
468/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
469struct SourceInfo {
470    alias: String,
471    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
472    normalized: String,
473    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
474    fq_name: String,
475}
476
477/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
478/// SELECT's FROM and JOIN clauses in a single pass.
479fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
480    let mut sources = Vec::new();
481
482    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
483        match expr {
484            Expression::Table(t) => {
485                let normalized = normalize_cte_name(&t.name);
486                let alias = t
487                    .alias
488                    .as_ref()
489                    .map(|a| a.name.clone())
490                    .unwrap_or_else(|| t.name.name.clone());
491                let mut parts = Vec::new();
492                if let Some(catalog) = &t.catalog {
493                    parts.push(catalog.name.clone());
494                }
495                if let Some(schema) = &t.schema {
496                    parts.push(schema.name.clone());
497                }
498                parts.push(t.name.name.clone());
499                let fq_name = parts.join(".");
500                Some(SourceInfo {
501                    alias,
502                    normalized,
503                    fq_name,
504                })
505            }
506            Expression::Subquery(s) => {
507                let alias = s.alias.as_ref()?.name.clone();
508                let normalized = alias.to_lowercase();
509                let fq_name = alias.clone();
510                Some(SourceInfo {
511                    alias,
512                    normalized,
513                    fq_name,
514                })
515            }
516            Expression::Paren(p) => extract_source(&p.this),
517            _ => None,
518        }
519    }
520
521    if let Some(from) = &select.from {
522        for expr in &from.expressions {
523            if let Some(info) = extract_source(expr) {
524                sources.push(info);
525            }
526        }
527    }
528    for join in &select.joins {
529        if let Some(info) = extract_source(&join.this) {
530            sources.push(info);
531        }
532    }
533    sources
534}
535
536/// Get all source tables from a lineage graph
537pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
538    let mut tables = HashSet::new();
539    collect_source_tables(node, &mut tables);
540    tables
541}
542
543/// Recursively collect source table names from lineage graph
544pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
545    if let Expression::Table(table) = &node.source {
546        tables.insert(table.name.name.clone());
547    }
548    for child in &node.downstream {
549        collect_source_tables(child, tables);
550    }
551}
552
553// ---------------------------------------------------------------------------
554// Core recursive lineage builder
555// ---------------------------------------------------------------------------
556
557/// Maximum recursion depth for lineage tracing to prevent stack overflow
558/// on circular or deeply nested CTE chains.
559const MAX_LINEAGE_DEPTH: usize = 64;
560
561/// Recursively build a lineage node for a column in a scope.
562fn to_node(
563    column: ColumnRef<'_>,
564    scope: &Scope,
565    dialect: Option<DialectType>,
566    scope_name: &str,
567    source_name: &str,
568    reference_node_name: &str,
569    trim_selects: bool,
570) -> Result<LineageNode> {
571    to_node_inner(
572        column,
573        scope,
574        dialect,
575        scope_name,
576        source_name,
577        reference_node_name,
578        trim_selects,
579        &[],
580        0,
581    )
582}
583
584fn to_node_inner(
585    column: ColumnRef<'_>,
586    scope: &Scope,
587    dialect: Option<DialectType>,
588    scope_name: &str,
589    source_name: &str,
590    reference_node_name: &str,
591    trim_selects: bool,
592    ancestor_cte_scopes: &[Scope],
593    depth: usize,
594) -> Result<LineageNode> {
595    if depth > MAX_LINEAGE_DEPTH {
596        return Err(Error::internal(format!(
597            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
598        )));
599    }
600    let scope_expr = &scope.expression;
601
602    // Build combined CTE scopes: current scope's cte_scopes + ancestors
603    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
604    for s in ancestor_cte_scopes {
605        all_cte_scopes.push(s);
606    }
607
608    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
609    //    but we need the inner query (SELECT/UNION) for column lookup.
610    let effective_expr = match scope_expr {
611        Expression::Cte(cte) => &cte.this,
612        other => other,
613    };
614
615    // 1. Set operations (UNION / INTERSECT / EXCEPT)
616    if matches!(
617        effective_expr,
618        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
619    ) {
620        // For CTE wrapping a set op, create a temporary scope with the inner expression
621        if matches!(scope_expr, Expression::Cte(_)) {
622            let mut inner_scope = Scope::new(effective_expr.clone());
623            inner_scope.union_scopes = scope.union_scopes.clone();
624            inner_scope.sources = scope.sources.clone();
625            inner_scope.cte_sources = scope.cte_sources.clone();
626            inner_scope.cte_scopes = scope.cte_scopes.clone();
627            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
628            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
629            return handle_set_operation(
630                &column,
631                &inner_scope,
632                dialect,
633                scope_name,
634                source_name,
635                reference_node_name,
636                trim_selects,
637                ancestor_cte_scopes,
638                depth,
639            );
640        }
641        return handle_set_operation(
642            &column,
643            scope,
644            dialect,
645            scope_name,
646            source_name,
647            reference_node_name,
648            trim_selects,
649            ancestor_cte_scopes,
650            depth,
651        );
652    }
653
654    // 2. Find the select expression for this column
655    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
656    let column_name = resolve_column_name(&column, &select_expr);
657
658    // 3. Trim source if requested
659    let node_source = if trim_selects {
660        trim_source(effective_expr, &select_expr)
661    } else {
662        effective_expr.clone()
663    };
664
665    // 4. Create the lineage node
666    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
667    node.source_name = source_name.to_string();
668    node.reference_node_name = reference_node_name.to_string();
669
670    // 5. Star handling — add downstream for each source
671    if matches!(&select_expr, Expression::Star(_)) {
672        for (name, source_info) in &scope.sources {
673            let child = LineageNode::new(
674                format!("{}.*", name),
675                Expression::Star(crate::expressions::Star {
676                    table: None,
677                    except: None,
678                    replace: None,
679                    rename: None,
680                    trailing_comments: vec![],
681                    span: None,
682                }),
683                source_info.expression.clone(),
684            );
685            node.downstream.push(child);
686        }
687        return Ok(node);
688    }
689
690    // 6. Subqueries in select — trace through scalar subqueries
691    let subqueries: Vec<&Expression> =
692        select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
693    for sq_expr in subqueries {
694        if let Expression::Subquery(sq) = sq_expr {
695            for sq_scope in &scope.subquery_scopes {
696                if sq_scope.expression == sq.this {
697                    if let Ok(child) = to_node_inner(
698                        ColumnRef::Index(0),
699                        sq_scope,
700                        dialect,
701                        &column_name,
702                        "",
703                        "",
704                        trim_selects,
705                        ancestor_cte_scopes,
706                        depth + 1,
707                    ) {
708                        node.downstream.push(child);
709                    }
710                    break;
711                }
712            }
713        }
714    }
715
716    // 7. Column references — trace each column to its source
717    let col_refs = find_column_refs_in_expr(&select_expr, dialect);
718    for col_ref in col_refs {
719        let col_name = &col_ref.column;
720        if let Some(ref table_id) = col_ref.table {
721            let tbl = &table_id.name;
722            resolve_qualified_column(
723                &mut node,
724                scope,
725                dialect,
726                tbl,
727                col_name,
728                &column_name,
729                trim_selects,
730                &all_cte_scopes,
731                depth,
732            );
733        } else {
734            resolve_unqualified_column(
735                &mut node,
736                scope,
737                dialect,
738                col_name,
739                &column_name,
740                trim_selects,
741                &all_cte_scopes,
742                depth,
743            );
744        }
745    }
746
747    Ok(node)
748}
749
750// ---------------------------------------------------------------------------
751// Set operation handling
752// ---------------------------------------------------------------------------
753
754fn handle_set_operation(
755    column: &ColumnRef<'_>,
756    scope: &Scope,
757    dialect: Option<DialectType>,
758    scope_name: &str,
759    source_name: &str,
760    reference_node_name: &str,
761    trim_selects: bool,
762    ancestor_cte_scopes: &[Scope],
763    depth: usize,
764) -> Result<LineageNode> {
765    let scope_expr = &scope.expression;
766
767    // Determine column index
768    let col_index = match column {
769        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
770        ColumnRef::Index(i) => *i,
771    };
772
773    let col_name = match column {
774        ColumnRef::Name(name) => name.to_string(),
775        ColumnRef::Index(_) => format!("_{col_index}"),
776    };
777
778    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
779    node.source_name = source_name.to_string();
780    node.reference_node_name = reference_node_name.to_string();
781
782    // Recurse into each union branch
783    for branch_scope in &scope.union_scopes {
784        if let Ok(child) = to_node_inner(
785            ColumnRef::Index(col_index),
786            branch_scope,
787            dialect,
788            scope_name,
789            "",
790            "",
791            trim_selects,
792            ancestor_cte_scopes,
793            depth + 1,
794        ) {
795            node.downstream.push(child);
796        }
797    }
798
799    Ok(node)
800}
801
802// ---------------------------------------------------------------------------
803// Column resolution helpers
804// ---------------------------------------------------------------------------
805
806fn resolve_qualified_column(
807    node: &mut LineageNode,
808    scope: &Scope,
809    dialect: Option<DialectType>,
810    table: &str,
811    col_name: &str,
812    parent_name: &str,
813    trim_selects: bool,
814    all_cte_scopes: &[&Scope],
815    depth: usize,
816) {
817    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
818    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
819    let resolved_cte_name = resolve_cte_alias(scope, table);
820    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
821
822    // Check if table is a CTE reference — check both the current scope's cte_sources
823    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
824    let is_cte = scope.cte_sources.contains_key(effective_table)
825        || all_cte_scopes.iter().any(
826            |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
827        );
828    if is_cte {
829        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
830            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
831            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
832            if let Ok(child) = to_node_inner(
833                ColumnRef::Name(col_name),
834                child_scope,
835                dialect,
836                parent_name,
837                effective_table,
838                parent_name,
839                trim_selects,
840                &ancestors,
841                depth + 1,
842            ) {
843                node.downstream.push(child);
844                return;
845            }
846        }
847    }
848
849    // Check if table is a derived table (is_scope = true in sources)
850    if let Some(source_info) = scope.sources.get(table) {
851        if source_info.is_scope {
852            if let Some(child_scope) = find_child_scope(scope, table) {
853                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
854                if let Ok(child) = to_node_inner(
855                    ColumnRef::Name(col_name),
856                    child_scope,
857                    dialect,
858                    parent_name,
859                    table,
860                    parent_name,
861                    trim_selects,
862                    &ancestors,
863                    depth + 1,
864                ) {
865                    node.downstream.push(child);
866                    return;
867                }
868            }
869        }
870    }
871
872    // Base table source found in current scope: preserve alias in the display name
873    // but store the resolved table expression and name for downstream consumers.
874    if let Some(source_info) = scope.sources.get(table) {
875        if !source_info.is_scope {
876            node.downstream.push(make_table_column_node_from_source(
877                table,
878                col_name,
879                &source_info.expression,
880            ));
881            return;
882        }
883    }
884
885    // Base table or unresolved — terminal node
886    node.downstream
887        .push(make_table_column_node(table, col_name));
888}
889
890/// Resolve a FROM alias to the original CTE name.
891///
892/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
893/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
894/// This function checks if `name` is such an alias and returns the CTE name.
895fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
896    // If it's already a known CTE name, no resolution needed
897    if scope.cte_sources.contains_key(name) {
898        return None;
899    }
900    // Check if the source's expression is a CTE — if so, extract the CTE name
901    if let Some(source_info) = scope.sources.get(name) {
902        if source_info.is_scope {
903            if let Expression::Cte(cte) = &source_info.expression {
904                let cte_name = &cte.alias.name;
905                if scope.cte_sources.contains_key(cte_name) {
906                    return Some(cte_name.clone());
907                }
908            }
909        }
910    }
911    None
912}
913
914fn resolve_unqualified_column(
915    node: &mut LineageNode,
916    scope: &Scope,
917    dialect: Option<DialectType>,
918    col_name: &str,
919    parent_name: &str,
920    trim_selects: bool,
921    all_cte_scopes: &[&Scope],
922    depth: usize,
923) {
924    // Try to find which source this column belongs to.
925    // Build the source list from the actual FROM/JOIN clauses to avoid
926    // mixing in CTE definitions that are in scope but not referenced.
927    let from_source_names = source_names_from_from_join(scope);
928
929    if from_source_names.len() == 1 {
930        let tbl = &from_source_names[0];
931        resolve_qualified_column(
932            node,
933            scope,
934            dialect,
935            tbl,
936            col_name,
937            parent_name,
938            trim_selects,
939            all_cte_scopes,
940            depth,
941        );
942        return;
943    }
944
945    // Multiple sources — can't resolve without schema info, add unqualified node
946    let child = LineageNode::new(
947        col_name.to_string(),
948        Expression::Column(Box::new(crate::expressions::Column {
949            name: crate::expressions::Identifier::new(col_name.to_string()),
950            table: None,
951            join_mark: false,
952            trailing_comments: vec![],
953            span: None,
954            inferred_type: None,
955        })),
956        node.source.clone(),
957    );
958    node.downstream.push(child);
959}
960
961fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
962    fn source_name(expr: &Expression) -> Option<String> {
963        match expr {
964            Expression::Table(table) => Some(
965                table
966                    .alias
967                    .as_ref()
968                    .map(|a| a.name.clone())
969                    .unwrap_or_else(|| table.name.name.clone()),
970            ),
971            Expression::Subquery(subquery) => {
972                subquery.alias.as_ref().map(|alias| alias.name.clone())
973            }
974            Expression::Paren(paren) => source_name(&paren.this),
975            _ => None,
976        }
977    }
978
979    let effective_expr = match &scope.expression {
980        Expression::Cte(cte) => &cte.this,
981        expr => expr,
982    };
983
984    let mut names = Vec::new();
985    let mut seen = std::collections::HashSet::new();
986
987    if let Expression::Select(select) = effective_expr {
988        if let Some(from) = &select.from {
989            for expr in &from.expressions {
990                if let Some(name) = source_name(expr) {
991                    if !name.is_empty() && seen.insert(name.clone()) {
992                        names.push(name);
993                    }
994                }
995            }
996        }
997        for join in &select.joins {
998            if let Some(name) = source_name(&join.this) {
999                if !name.is_empty() && seen.insert(name.clone()) {
1000                    names.push(name);
1001                }
1002            }
1003        }
1004    }
1005
1006    names
1007}
1008
1009// ---------------------------------------------------------------------------
1010// Helper functions
1011// ---------------------------------------------------------------------------
1012
1013/// Get the alias or name of an expression
1014fn get_alias_or_name(expr: &Expression) -> Option<String> {
1015    match expr {
1016        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1017        Expression::Column(col) => Some(col.name.name.clone()),
1018        Expression::Identifier(id) => Some(id.name.clone()),
1019        Expression::Star(_) => Some("*".to_string()),
1020        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
1021        // Unwrap to get the actual column/alias name from the inner expression.
1022        Expression::Annotated(a) => get_alias_or_name(&a.this),
1023        _ => None,
1024    }
1025}
1026
1027/// Resolve the display name for a column reference.
1028fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1029    match column {
1030        ColumnRef::Name(n) => n.to_string(),
1031        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1032    }
1033}
1034
1035/// Find the select expression matching a column reference.
1036fn find_select_expr(
1037    scope_expr: &Expression,
1038    column: &ColumnRef<'_>,
1039    dialect: Option<DialectType>,
1040) -> Result<Expression> {
1041    if let Expression::Select(ref select) = scope_expr {
1042        match column {
1043            ColumnRef::Name(name) => {
1044                let normalized_name = normalize_column_name(name, dialect);
1045                for expr in &select.expressions {
1046                    if let Some(alias_or_name) = get_alias_or_name(expr) {
1047                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1048                            return Ok(expr.clone());
1049                        }
1050                    }
1051                }
1052                Err(crate::error::Error::parse(
1053                    format!("Cannot find column '{}' in query", name),
1054                    0,
1055                    0,
1056                    0,
1057                    0,
1058                ))
1059            }
1060            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1061                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1062            }),
1063        }
1064    } else {
1065        Err(crate::error::Error::parse(
1066            "Expected SELECT expression for column lookup",
1067            0,
1068            0,
1069            0,
1070            0,
1071        ))
1072    }
1073}
1074
1075/// Find the positional index of a column name in a set operation's first SELECT branch.
1076fn column_to_index(
1077    set_op_expr: &Expression,
1078    name: &str,
1079    dialect: Option<DialectType>,
1080) -> Result<usize> {
1081    let normalized_name = normalize_column_name(name, dialect);
1082    let mut expr = set_op_expr;
1083    loop {
1084        match expr {
1085            Expression::Union(u) => expr = &u.left,
1086            Expression::Intersect(i) => expr = &i.left,
1087            Expression::Except(e) => expr = &e.left,
1088            Expression::Select(select) => {
1089                for (i, e) in select.expressions.iter().enumerate() {
1090                    if let Some(alias_or_name) = get_alias_or_name(e) {
1091                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1092                            return Ok(i);
1093                        }
1094                    }
1095                }
1096                return Err(crate::error::Error::parse(
1097                    format!("Cannot find column '{}' in set operation", name),
1098                    0,
1099                    0,
1100                    0,
1101                    0,
1102                ));
1103            }
1104            _ => {
1105                return Err(crate::error::Error::parse(
1106                    "Expected SELECT or set operation",
1107                    0,
1108                    0,
1109                    0,
1110                    0,
1111                ))
1112            }
1113        }
1114    }
1115}
1116
1117fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1118    normalize_name(name, dialect, false, true)
1119}
1120
1121/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
1122fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1123    if let Expression::Select(select) = select_expr {
1124        let mut trimmed = select.as_ref().clone();
1125        trimmed.expressions = vec![target_expr.clone()];
1126        Expression::Select(Box::new(trimmed))
1127    } else {
1128        select_expr.clone()
1129    }
1130}
1131
1132/// Find the child scope (CTE or derived table) for a given source name.
1133fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1134    // Check CTE scopes
1135    if scope.cte_sources.contains_key(source_name) {
1136        for cte_scope in &scope.cte_scopes {
1137            if let Expression::Cte(cte) = &cte_scope.expression {
1138                if cte.alias.name == source_name {
1139                    return Some(cte_scope);
1140                }
1141            }
1142        }
1143    }
1144
1145    // Check derived table scopes
1146    if let Some(source_info) = scope.sources.get(source_name) {
1147        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1148            if let Expression::Subquery(sq) = &source_info.expression {
1149                for dt_scope in &scope.derived_table_scopes {
1150                    if dt_scope.expression == sq.this {
1151                        return Some(dt_scope);
1152                    }
1153                }
1154            }
1155        }
1156    }
1157
1158    None
1159}
1160
1161/// Find a CTE scope by name, searching through a combined list of CTE scopes.
1162/// This handles nested CTEs where the current scope doesn't have the CTE scope
1163/// as a direct child but knows about it via cte_sources.
1164fn find_child_scope_in<'a>(
1165    all_cte_scopes: &[&'a Scope],
1166    scope: &'a Scope,
1167    source_name: &str,
1168) -> Option<&'a Scope> {
1169    // First try the scope's own cte_scopes
1170    for cte_scope in &scope.cte_scopes {
1171        if let Expression::Cte(cte) = &cte_scope.expression {
1172            if cte.alias.name == source_name {
1173                return Some(cte_scope);
1174            }
1175        }
1176    }
1177
1178    // Then search through all ancestor CTE scopes
1179    for cte_scope in all_cte_scopes {
1180        if let Expression::Cte(cte) = &cte_scope.expression {
1181            if cte.alias.name == source_name {
1182                return Some(cte_scope);
1183            }
1184        }
1185    }
1186
1187    // Fall back to derived table scopes
1188    if let Some(source_info) = scope.sources.get(source_name) {
1189        if source_info.is_scope {
1190            if let Expression::Subquery(sq) = &source_info.expression {
1191                for dt_scope in &scope.derived_table_scopes {
1192                    if dt_scope.expression == sq.this {
1193                        return Some(dt_scope);
1194                    }
1195                }
1196            }
1197        }
1198    }
1199
1200    None
1201}
1202
1203/// Create a terminal lineage node for a table.column reference.
1204fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1205    let mut node = LineageNode::new(
1206        format!("{}.{}", table, column),
1207        Expression::Column(Box::new(crate::expressions::Column {
1208            name: crate::expressions::Identifier::new(column.to_string()),
1209            table: Some(crate::expressions::Identifier::new(table.to_string())),
1210            join_mark: false,
1211            trailing_comments: vec![],
1212            span: None,
1213            inferred_type: None,
1214        })),
1215        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1216    );
1217    node.source_name = table.to_string();
1218    node
1219}
1220
1221fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1222    let mut parts: Vec<String> = Vec::new();
1223    if let Some(catalog) = &table_ref.catalog {
1224        parts.push(catalog.name.clone());
1225    }
1226    if let Some(schema) = &table_ref.schema {
1227        parts.push(schema.name.clone());
1228    }
1229    parts.push(table_ref.name.name.clone());
1230    parts.join(".")
1231}
1232
1233fn make_table_column_node_from_source(
1234    table_alias: &str,
1235    column: &str,
1236    source: &Expression,
1237) -> LineageNode {
1238    let mut node = LineageNode::new(
1239        format!("{}.{}", table_alias, column),
1240        Expression::Column(Box::new(crate::expressions::Column {
1241            name: crate::expressions::Identifier::new(column.to_string()),
1242            table: Some(crate::expressions::Identifier::new(table_alias.to_string())),
1243            join_mark: false,
1244            trailing_comments: vec![],
1245            span: None,
1246            inferred_type: None,
1247        })),
1248        source.clone(),
1249    );
1250
1251    if let Expression::Table(table_ref) = source {
1252        node.source_name = table_name_from_table_ref(table_ref);
1253    } else {
1254        node.source_name = table_alias.to_string();
1255    }
1256
1257    node
1258}
1259
1260/// Simple column reference extracted from an expression
1261#[derive(Debug, Clone)]
1262struct SimpleColumnRef {
1263    table: Option<crate::expressions::Identifier>,
1264    column: String,
1265}
1266
1267/// Find all column references in an expression (does not recurse into subqueries).
1268fn find_column_refs_in_expr(
1269    expr: &Expression,
1270    dialect: Option<DialectType>,
1271) -> Vec<SimpleColumnRef> {
1272    let mut refs = Vec::new();
1273    collect_column_refs(expr, dialect, &mut refs);
1274    refs
1275}
1276
1277fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
1278    match expr {
1279        Expression::Column(col) => {
1280            col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
1281        }
1282        Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
1283        _ => false,
1284    }
1285}
1286
1287fn collect_column_refs(
1288    expr: &Expression,
1289    dialect: Option<DialectType>,
1290    refs: &mut Vec<SimpleColumnRef>,
1291) {
1292    let mut stack: Vec<&Expression> = vec![expr];
1293
1294    while let Some(current) = stack.pop() {
1295        match current {
1296            // === Leaf: collect Column references ===
1297            Expression::Column(col) => {
1298                refs.push(SimpleColumnRef {
1299                    table: col.table.clone(),
1300                    column: col.name.name.clone(),
1301                });
1302            }
1303
1304            // === Boundary: don't recurse into subqueries (handled separately) ===
1305            Expression::Subquery(_) | Expression::Exists(_) => {}
1306
1307            // === BinaryOp variants: left, right ===
1308            Expression::And(op)
1309            | Expression::Or(op)
1310            | Expression::Eq(op)
1311            | Expression::Neq(op)
1312            | Expression::Lt(op)
1313            | Expression::Lte(op)
1314            | Expression::Gt(op)
1315            | Expression::Gte(op)
1316            | Expression::Add(op)
1317            | Expression::Sub(op)
1318            | Expression::Mul(op)
1319            | Expression::Div(op)
1320            | Expression::Mod(op)
1321            | Expression::BitwiseAnd(op)
1322            | Expression::BitwiseOr(op)
1323            | Expression::BitwiseXor(op)
1324            | Expression::BitwiseLeftShift(op)
1325            | Expression::BitwiseRightShift(op)
1326            | Expression::Concat(op)
1327            | Expression::Adjacent(op)
1328            | Expression::TsMatch(op)
1329            | Expression::PropertyEQ(op)
1330            | Expression::ArrayContainsAll(op)
1331            | Expression::ArrayContainedBy(op)
1332            | Expression::ArrayOverlaps(op)
1333            | Expression::JSONBContainsAllTopKeys(op)
1334            | Expression::JSONBContainsAnyTopKeys(op)
1335            | Expression::JSONBDeleteAtPath(op)
1336            | Expression::ExtendsLeft(op)
1337            | Expression::ExtendsRight(op)
1338            | Expression::Is(op)
1339            | Expression::MemberOf(op)
1340            | Expression::NullSafeEq(op)
1341            | Expression::NullSafeNeq(op)
1342            | Expression::Glob(op)
1343            | Expression::Match(op) => {
1344                stack.push(&op.left);
1345                stack.push(&op.right);
1346            }
1347
1348            // === UnaryOp variants: this ===
1349            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1350                stack.push(&u.this);
1351            }
1352
1353            // === UnaryFunc variants: this ===
1354            Expression::Upper(f)
1355            | Expression::Lower(f)
1356            | Expression::Length(f)
1357            | Expression::LTrim(f)
1358            | Expression::RTrim(f)
1359            | Expression::Reverse(f)
1360            | Expression::Abs(f)
1361            | Expression::Sqrt(f)
1362            | Expression::Cbrt(f)
1363            | Expression::Ln(f)
1364            | Expression::Exp(f)
1365            | Expression::Sign(f)
1366            | Expression::Date(f)
1367            | Expression::Time(f)
1368            | Expression::DateFromUnixDate(f)
1369            | Expression::UnixDate(f)
1370            | Expression::UnixSeconds(f)
1371            | Expression::UnixMillis(f)
1372            | Expression::UnixMicros(f)
1373            | Expression::TimeStrToDate(f)
1374            | Expression::DateToDi(f)
1375            | Expression::DiToDate(f)
1376            | Expression::TsOrDiToDi(f)
1377            | Expression::TsOrDsToDatetime(f)
1378            | Expression::TsOrDsToTimestamp(f)
1379            | Expression::YearOfWeek(f)
1380            | Expression::YearOfWeekIso(f)
1381            | Expression::Initcap(f)
1382            | Expression::Ascii(f)
1383            | Expression::Chr(f)
1384            | Expression::Soundex(f)
1385            | Expression::ByteLength(f)
1386            | Expression::Hex(f)
1387            | Expression::LowerHex(f)
1388            | Expression::Unicode(f)
1389            | Expression::Radians(f)
1390            | Expression::Degrees(f)
1391            | Expression::Sin(f)
1392            | Expression::Cos(f)
1393            | Expression::Tan(f)
1394            | Expression::Asin(f)
1395            | Expression::Acos(f)
1396            | Expression::Atan(f)
1397            | Expression::IsNan(f)
1398            | Expression::IsInf(f)
1399            | Expression::ArrayLength(f)
1400            | Expression::ArraySize(f)
1401            | Expression::Cardinality(f)
1402            | Expression::ArrayReverse(f)
1403            | Expression::ArrayDistinct(f)
1404            | Expression::ArrayFlatten(f)
1405            | Expression::ArrayCompact(f)
1406            | Expression::Explode(f)
1407            | Expression::ExplodeOuter(f)
1408            | Expression::ToArray(f)
1409            | Expression::MapFromEntries(f)
1410            | Expression::MapKeys(f)
1411            | Expression::MapValues(f)
1412            | Expression::JsonArrayLength(f)
1413            | Expression::JsonKeys(f)
1414            | Expression::JsonType(f)
1415            | Expression::ParseJson(f)
1416            | Expression::ToJson(f)
1417            | Expression::Typeof(f)
1418            | Expression::BitwiseCount(f)
1419            | Expression::Year(f)
1420            | Expression::Month(f)
1421            | Expression::Day(f)
1422            | Expression::Hour(f)
1423            | Expression::Minute(f)
1424            | Expression::Second(f)
1425            | Expression::DayOfWeek(f)
1426            | Expression::DayOfWeekIso(f)
1427            | Expression::DayOfMonth(f)
1428            | Expression::DayOfYear(f)
1429            | Expression::WeekOfYear(f)
1430            | Expression::Quarter(f)
1431            | Expression::Epoch(f)
1432            | Expression::EpochMs(f)
1433            | Expression::TimeStrToUnix(f)
1434            | Expression::SHA(f)
1435            | Expression::SHA1Digest(f)
1436            | Expression::TimeToUnix(f)
1437            | Expression::JSONBool(f)
1438            | Expression::Int64(f)
1439            | Expression::MD5NumberLower64(f)
1440            | Expression::MD5NumberUpper64(f)
1441            | Expression::DateStrToDate(f)
1442            | Expression::DateToDateStr(f) => {
1443                stack.push(&f.this);
1444            }
1445
1446            // === BinaryFunc variants: this, expression ===
1447            Expression::Power(f)
1448            | Expression::NullIf(f)
1449            | Expression::IfNull(f)
1450            | Expression::Nvl(f)
1451            | Expression::UnixToTimeStr(f)
1452            | Expression::Contains(f)
1453            | Expression::StartsWith(f)
1454            | Expression::EndsWith(f)
1455            | Expression::Levenshtein(f)
1456            | Expression::ModFunc(f)
1457            | Expression::Atan2(f)
1458            | Expression::IntDiv(f)
1459            | Expression::AddMonths(f)
1460            | Expression::MonthsBetween(f)
1461            | Expression::NextDay(f)
1462            | Expression::ArrayContains(f)
1463            | Expression::ArrayPosition(f)
1464            | Expression::ArrayAppend(f)
1465            | Expression::ArrayPrepend(f)
1466            | Expression::ArrayUnion(f)
1467            | Expression::ArrayExcept(f)
1468            | Expression::ArrayRemove(f)
1469            | Expression::StarMap(f)
1470            | Expression::MapFromArrays(f)
1471            | Expression::MapContainsKey(f)
1472            | Expression::ElementAt(f)
1473            | Expression::JsonMergePatch(f)
1474            | Expression::JSONBContains(f)
1475            | Expression::JSONBExtract(f) => {
1476                stack.push(&f.this);
1477                stack.push(&f.expression);
1478            }
1479
1480            // === VarArgFunc variants: expressions ===
1481            Expression::Greatest(f)
1482            | Expression::Least(f)
1483            | Expression::Coalesce(f)
1484            | Expression::ArrayConcat(f)
1485            | Expression::ArrayIntersect(f)
1486            | Expression::ArrayZip(f)
1487            | Expression::MapConcat(f)
1488            | Expression::JsonArray(f) => {
1489                for e in &f.expressions {
1490                    stack.push(e);
1491                }
1492            }
1493
1494            // === AggFunc variants: this, filter, having_max, limit ===
1495            Expression::Sum(f)
1496            | Expression::Avg(f)
1497            | Expression::Min(f)
1498            | Expression::Max(f)
1499            | Expression::ArrayAgg(f)
1500            | Expression::CountIf(f)
1501            | Expression::Stddev(f)
1502            | Expression::StddevPop(f)
1503            | Expression::StddevSamp(f)
1504            | Expression::Variance(f)
1505            | Expression::VarPop(f)
1506            | Expression::VarSamp(f)
1507            | Expression::Median(f)
1508            | Expression::Mode(f)
1509            | Expression::First(f)
1510            | Expression::Last(f)
1511            | Expression::AnyValue(f)
1512            | Expression::ApproxDistinct(f)
1513            | Expression::ApproxCountDistinct(f)
1514            | Expression::LogicalAnd(f)
1515            | Expression::LogicalOr(f)
1516            | Expression::Skewness(f)
1517            | Expression::ArrayConcatAgg(f)
1518            | Expression::ArrayUniqueAgg(f)
1519            | Expression::BoolXorAgg(f)
1520            | Expression::BitwiseAndAgg(f)
1521            | Expression::BitwiseOrAgg(f)
1522            | Expression::BitwiseXorAgg(f) => {
1523                stack.push(&f.this);
1524                if let Some(ref filter) = f.filter {
1525                    stack.push(filter);
1526                }
1527                if let Some((ref expr, _)) = f.having_max {
1528                    stack.push(expr);
1529                }
1530                if let Some(ref limit) = f.limit {
1531                    stack.push(limit);
1532                }
1533            }
1534
1535            // === Generic Function / AggregateFunction: args ===
1536            Expression::Function(func) => {
1537                for arg in &func.args {
1538                    stack.push(arg);
1539                }
1540            }
1541            Expression::AggregateFunction(func) => {
1542                for arg in &func.args {
1543                    stack.push(arg);
1544                }
1545                if let Some(ref filter) = func.filter {
1546                    stack.push(filter);
1547                }
1548                if let Some(ref limit) = func.limit {
1549                    stack.push(limit);
1550                }
1551            }
1552
1553            // === WindowFunction: this (skip Over for lineage purposes) ===
1554            Expression::WindowFunction(wf) => {
1555                stack.push(&wf.this);
1556            }
1557
1558            // === Containers and special expressions ===
1559            Expression::Alias(a) => {
1560                stack.push(&a.this);
1561            }
1562            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1563                stack.push(&c.this);
1564                if let Some(ref fmt) = c.format {
1565                    stack.push(fmt);
1566                }
1567                if let Some(ref def) = c.default {
1568                    stack.push(def);
1569                }
1570            }
1571            Expression::Paren(p) => {
1572                stack.push(&p.this);
1573            }
1574            Expression::Annotated(a) => {
1575                stack.push(&a.this);
1576            }
1577            Expression::Case(case) => {
1578                if let Some(ref operand) = case.operand {
1579                    stack.push(operand);
1580                }
1581                for (cond, result) in &case.whens {
1582                    stack.push(cond);
1583                    stack.push(result);
1584                }
1585                if let Some(ref else_expr) = case.else_ {
1586                    stack.push(else_expr);
1587                }
1588            }
1589            Expression::Collation(c) => {
1590                stack.push(&c.this);
1591            }
1592            Expression::In(i) => {
1593                stack.push(&i.this);
1594                for e in &i.expressions {
1595                    stack.push(e);
1596                }
1597                if let Some(ref q) = i.query {
1598                    stack.push(q);
1599                }
1600                if let Some(ref u) = i.unnest {
1601                    stack.push(u);
1602                }
1603            }
1604            Expression::Between(b) => {
1605                stack.push(&b.this);
1606                stack.push(&b.low);
1607                stack.push(&b.high);
1608            }
1609            Expression::IsNull(n) => {
1610                stack.push(&n.this);
1611            }
1612            Expression::IsTrue(t) | Expression::IsFalse(t) => {
1613                stack.push(&t.this);
1614            }
1615            Expression::IsJson(j) => {
1616                stack.push(&j.this);
1617            }
1618            Expression::Like(l) | Expression::ILike(l) => {
1619                stack.push(&l.left);
1620                stack.push(&l.right);
1621                if let Some(ref esc) = l.escape {
1622                    stack.push(esc);
1623                }
1624            }
1625            Expression::SimilarTo(s) => {
1626                stack.push(&s.this);
1627                stack.push(&s.pattern);
1628                if let Some(ref esc) = s.escape {
1629                    stack.push(esc);
1630                }
1631            }
1632            Expression::Ordered(o) => {
1633                stack.push(&o.this);
1634            }
1635            Expression::Array(a) => {
1636                for e in &a.expressions {
1637                    stack.push(e);
1638                }
1639            }
1640            Expression::Tuple(t) => {
1641                for e in &t.expressions {
1642                    stack.push(e);
1643                }
1644            }
1645            Expression::Struct(s) => {
1646                for (_, e) in &s.fields {
1647                    stack.push(e);
1648                }
1649            }
1650            Expression::Subscript(s) => {
1651                stack.push(&s.this);
1652                stack.push(&s.index);
1653            }
1654            Expression::Dot(d) => {
1655                stack.push(&d.this);
1656            }
1657            Expression::MethodCall(m) => {
1658                if !matches!(dialect, Some(DialectType::BigQuery))
1659                    || !is_bigquery_safe_namespace_receiver(&m.this)
1660                {
1661                    stack.push(&m.this);
1662                }
1663                for arg in &m.args {
1664                    stack.push(arg);
1665                }
1666            }
1667            Expression::ArraySlice(s) => {
1668                stack.push(&s.this);
1669                if let Some(ref start) = s.start {
1670                    stack.push(start);
1671                }
1672                if let Some(ref end) = s.end {
1673                    stack.push(end);
1674                }
1675            }
1676            Expression::Lambda(l) => {
1677                stack.push(&l.body);
1678            }
1679            Expression::NamedArgument(n) => {
1680                stack.push(&n.value);
1681            }
1682            Expression::TryCatch(t) => {
1683                for stmt in &t.try_body {
1684                    stack.push(stmt);
1685                }
1686                if let Some(catch_body) = &t.catch_body {
1687                    for stmt in catch_body {
1688                        stack.push(stmt);
1689                    }
1690                }
1691            }
1692            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
1693                stack.push(e);
1694            }
1695
1696            // === Custom function structs ===
1697            Expression::Substring(f) => {
1698                stack.push(&f.this);
1699                stack.push(&f.start);
1700                if let Some(ref len) = f.length {
1701                    stack.push(len);
1702                }
1703            }
1704            Expression::Trim(f) => {
1705                stack.push(&f.this);
1706                if let Some(ref chars) = f.characters {
1707                    stack.push(chars);
1708                }
1709            }
1710            Expression::Replace(f) => {
1711                stack.push(&f.this);
1712                stack.push(&f.old);
1713                stack.push(&f.new);
1714            }
1715            Expression::IfFunc(f) => {
1716                stack.push(&f.condition);
1717                stack.push(&f.true_value);
1718                if let Some(ref fv) = f.false_value {
1719                    stack.push(fv);
1720                }
1721            }
1722            Expression::Nvl2(f) => {
1723                stack.push(&f.this);
1724                stack.push(&f.true_value);
1725                stack.push(&f.false_value);
1726            }
1727            Expression::ConcatWs(f) => {
1728                stack.push(&f.separator);
1729                for e in &f.expressions {
1730                    stack.push(e);
1731                }
1732            }
1733            Expression::Count(f) => {
1734                if let Some(ref this) = f.this {
1735                    stack.push(this);
1736                }
1737                if let Some(ref filter) = f.filter {
1738                    stack.push(filter);
1739                }
1740            }
1741            Expression::GroupConcat(f) => {
1742                stack.push(&f.this);
1743                if let Some(ref sep) = f.separator {
1744                    stack.push(sep);
1745                }
1746                if let Some(ref filter) = f.filter {
1747                    stack.push(filter);
1748                }
1749            }
1750            Expression::StringAgg(f) => {
1751                stack.push(&f.this);
1752                if let Some(ref sep) = f.separator {
1753                    stack.push(sep);
1754                }
1755                if let Some(ref filter) = f.filter {
1756                    stack.push(filter);
1757                }
1758                if let Some(ref limit) = f.limit {
1759                    stack.push(limit);
1760                }
1761            }
1762            Expression::ListAgg(f) => {
1763                stack.push(&f.this);
1764                if let Some(ref sep) = f.separator {
1765                    stack.push(sep);
1766                }
1767                if let Some(ref filter) = f.filter {
1768                    stack.push(filter);
1769                }
1770            }
1771            Expression::SumIf(f) => {
1772                stack.push(&f.this);
1773                stack.push(&f.condition);
1774                if let Some(ref filter) = f.filter {
1775                    stack.push(filter);
1776                }
1777            }
1778            Expression::DateAdd(f) | Expression::DateSub(f) => {
1779                stack.push(&f.this);
1780                stack.push(&f.interval);
1781            }
1782            Expression::DateDiff(f) => {
1783                stack.push(&f.this);
1784                stack.push(&f.expression);
1785            }
1786            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
1787                stack.push(&f.this);
1788            }
1789            Expression::Extract(f) => {
1790                stack.push(&f.this);
1791            }
1792            Expression::Round(f) => {
1793                stack.push(&f.this);
1794                if let Some(ref d) = f.decimals {
1795                    stack.push(d);
1796                }
1797            }
1798            Expression::Floor(f) => {
1799                stack.push(&f.this);
1800                if let Some(ref s) = f.scale {
1801                    stack.push(s);
1802                }
1803                if let Some(ref t) = f.to {
1804                    stack.push(t);
1805                }
1806            }
1807            Expression::Ceil(f) => {
1808                stack.push(&f.this);
1809                if let Some(ref d) = f.decimals {
1810                    stack.push(d);
1811                }
1812                if let Some(ref t) = f.to {
1813                    stack.push(t);
1814                }
1815            }
1816            Expression::Log(f) => {
1817                stack.push(&f.this);
1818                if let Some(ref b) = f.base {
1819                    stack.push(b);
1820                }
1821            }
1822            Expression::AtTimeZone(f) => {
1823                stack.push(&f.this);
1824                stack.push(&f.zone);
1825            }
1826            Expression::Lead(f) | Expression::Lag(f) => {
1827                stack.push(&f.this);
1828                if let Some(ref off) = f.offset {
1829                    stack.push(off);
1830                }
1831                if let Some(ref def) = f.default {
1832                    stack.push(def);
1833                }
1834            }
1835            Expression::FirstValue(f) | Expression::LastValue(f) => {
1836                stack.push(&f.this);
1837            }
1838            Expression::NthValue(f) => {
1839                stack.push(&f.this);
1840                stack.push(&f.offset);
1841            }
1842            Expression::Position(f) => {
1843                stack.push(&f.substring);
1844                stack.push(&f.string);
1845                if let Some(ref start) = f.start {
1846                    stack.push(start);
1847                }
1848            }
1849            Expression::Decode(f) => {
1850                stack.push(&f.this);
1851                for (search, result) in &f.search_results {
1852                    stack.push(search);
1853                    stack.push(result);
1854                }
1855                if let Some(ref def) = f.default {
1856                    stack.push(def);
1857                }
1858            }
1859            Expression::CharFunc(f) => {
1860                for arg in &f.args {
1861                    stack.push(arg);
1862                }
1863            }
1864            Expression::ArraySort(f) => {
1865                stack.push(&f.this);
1866                if let Some(ref cmp) = f.comparator {
1867                    stack.push(cmp);
1868                }
1869            }
1870            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
1871                stack.push(&f.this);
1872                stack.push(&f.separator);
1873                if let Some(ref nr) = f.null_replacement {
1874                    stack.push(nr);
1875                }
1876            }
1877            Expression::ArrayFilter(f) => {
1878                stack.push(&f.this);
1879                stack.push(&f.filter);
1880            }
1881            Expression::ArrayTransform(f) => {
1882                stack.push(&f.this);
1883                stack.push(&f.transform);
1884            }
1885            Expression::Sequence(f)
1886            | Expression::Generate(f)
1887            | Expression::ExplodingGenerateSeries(f) => {
1888                stack.push(&f.start);
1889                stack.push(&f.stop);
1890                if let Some(ref step) = f.step {
1891                    stack.push(step);
1892                }
1893            }
1894            Expression::JsonExtract(f)
1895            | Expression::JsonExtractScalar(f)
1896            | Expression::JsonQuery(f)
1897            | Expression::JsonValue(f) => {
1898                stack.push(&f.this);
1899                stack.push(&f.path);
1900            }
1901            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
1902                stack.push(&f.this);
1903                for p in &f.paths {
1904                    stack.push(p);
1905                }
1906            }
1907            Expression::JsonObject(f) => {
1908                for (k, v) in &f.pairs {
1909                    stack.push(k);
1910                    stack.push(v);
1911                }
1912            }
1913            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
1914                stack.push(&f.this);
1915                for (path, val) in &f.path_values {
1916                    stack.push(path);
1917                    stack.push(val);
1918                }
1919            }
1920            Expression::Overlay(f) => {
1921                stack.push(&f.this);
1922                stack.push(&f.replacement);
1923                stack.push(&f.from);
1924                if let Some(ref len) = f.length {
1925                    stack.push(len);
1926                }
1927            }
1928            Expression::Convert(f) => {
1929                stack.push(&f.this);
1930                if let Some(ref style) = f.style {
1931                    stack.push(style);
1932                }
1933            }
1934            Expression::ApproxPercentile(f) => {
1935                stack.push(&f.this);
1936                stack.push(&f.percentile);
1937                if let Some(ref acc) = f.accuracy {
1938                    stack.push(acc);
1939                }
1940                if let Some(ref filter) = f.filter {
1941                    stack.push(filter);
1942                }
1943            }
1944            Expression::Percentile(f)
1945            | Expression::PercentileCont(f)
1946            | Expression::PercentileDisc(f) => {
1947                stack.push(&f.this);
1948                stack.push(&f.percentile);
1949                if let Some(ref filter) = f.filter {
1950                    stack.push(filter);
1951                }
1952            }
1953            Expression::WithinGroup(f) => {
1954                stack.push(&f.this);
1955            }
1956            Expression::Left(f) | Expression::Right(f) => {
1957                stack.push(&f.this);
1958                stack.push(&f.length);
1959            }
1960            Expression::Repeat(f) => {
1961                stack.push(&f.this);
1962                stack.push(&f.times);
1963            }
1964            Expression::Lpad(f) | Expression::Rpad(f) => {
1965                stack.push(&f.this);
1966                stack.push(&f.length);
1967                if let Some(ref fill) = f.fill {
1968                    stack.push(fill);
1969                }
1970            }
1971            Expression::Split(f) => {
1972                stack.push(&f.this);
1973                stack.push(&f.delimiter);
1974            }
1975            Expression::RegexpLike(f) => {
1976                stack.push(&f.this);
1977                stack.push(&f.pattern);
1978                if let Some(ref flags) = f.flags {
1979                    stack.push(flags);
1980                }
1981            }
1982            Expression::RegexpReplace(f) => {
1983                stack.push(&f.this);
1984                stack.push(&f.pattern);
1985                stack.push(&f.replacement);
1986                if let Some(ref flags) = f.flags {
1987                    stack.push(flags);
1988                }
1989            }
1990            Expression::RegexpExtract(f) => {
1991                stack.push(&f.this);
1992                stack.push(&f.pattern);
1993                if let Some(ref group) = f.group {
1994                    stack.push(group);
1995                }
1996            }
1997            Expression::ToDate(f) => {
1998                stack.push(&f.this);
1999                if let Some(ref fmt) = f.format {
2000                    stack.push(fmt);
2001                }
2002            }
2003            Expression::ToTimestamp(f) => {
2004                stack.push(&f.this);
2005                if let Some(ref fmt) = f.format {
2006                    stack.push(fmt);
2007                }
2008            }
2009            Expression::DateFormat(f) | Expression::FormatDate(f) => {
2010                stack.push(&f.this);
2011                stack.push(&f.format);
2012            }
2013            Expression::LastDay(f) => {
2014                stack.push(&f.this);
2015            }
2016            Expression::FromUnixtime(f) => {
2017                stack.push(&f.this);
2018                if let Some(ref fmt) = f.format {
2019                    stack.push(fmt);
2020                }
2021            }
2022            Expression::UnixTimestamp(f) => {
2023                if let Some(ref this) = f.this {
2024                    stack.push(this);
2025                }
2026                if let Some(ref fmt) = f.format {
2027                    stack.push(fmt);
2028                }
2029            }
2030            Expression::MakeDate(f) => {
2031                stack.push(&f.year);
2032                stack.push(&f.month);
2033                stack.push(&f.day);
2034            }
2035            Expression::MakeTimestamp(f) => {
2036                stack.push(&f.year);
2037                stack.push(&f.month);
2038                stack.push(&f.day);
2039                stack.push(&f.hour);
2040                stack.push(&f.minute);
2041                stack.push(&f.second);
2042                if let Some(ref tz) = f.timezone {
2043                    stack.push(tz);
2044                }
2045            }
2046            Expression::TruncFunc(f) => {
2047                stack.push(&f.this);
2048                if let Some(ref d) = f.decimals {
2049                    stack.push(d);
2050                }
2051            }
2052            Expression::ArrayFunc(f) => {
2053                for e in &f.expressions {
2054                    stack.push(e);
2055                }
2056            }
2057            Expression::Unnest(f) => {
2058                stack.push(&f.this);
2059                for e in &f.expressions {
2060                    stack.push(e);
2061                }
2062            }
2063            Expression::StructFunc(f) => {
2064                for (_, e) in &f.fields {
2065                    stack.push(e);
2066                }
2067            }
2068            Expression::StructExtract(f) => {
2069                stack.push(&f.this);
2070            }
2071            Expression::NamedStruct(f) => {
2072                for (k, v) in &f.pairs {
2073                    stack.push(k);
2074                    stack.push(v);
2075                }
2076            }
2077            Expression::MapFunc(f) => {
2078                for k in &f.keys {
2079                    stack.push(k);
2080                }
2081                for v in &f.values {
2082                    stack.push(v);
2083                }
2084            }
2085            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2086                stack.push(&f.this);
2087                stack.push(&f.transform);
2088            }
2089            Expression::JsonArrayAgg(f) => {
2090                stack.push(&f.this);
2091                if let Some(ref filter) = f.filter {
2092                    stack.push(filter);
2093                }
2094            }
2095            Expression::JsonObjectAgg(f) => {
2096                stack.push(&f.key);
2097                stack.push(&f.value);
2098                if let Some(ref filter) = f.filter {
2099                    stack.push(filter);
2100                }
2101            }
2102            Expression::NTile(f) => {
2103                if let Some(ref n) = f.num_buckets {
2104                    stack.push(n);
2105                }
2106            }
2107            Expression::Rand(f) => {
2108                if let Some(ref s) = f.seed {
2109                    stack.push(s);
2110                }
2111                if let Some(ref lo) = f.lower {
2112                    stack.push(lo);
2113                }
2114                if let Some(ref hi) = f.upper {
2115                    stack.push(hi);
2116                }
2117            }
2118            Expression::Any(q) | Expression::All(q) => {
2119                stack.push(&q.this);
2120                stack.push(&q.subquery);
2121            }
2122            Expression::Overlaps(o) => {
2123                if let Some(ref this) = o.this {
2124                    stack.push(this);
2125                }
2126                if let Some(ref expr) = o.expression {
2127                    stack.push(expr);
2128                }
2129                if let Some(ref ls) = o.left_start {
2130                    stack.push(ls);
2131                }
2132                if let Some(ref le) = o.left_end {
2133                    stack.push(le);
2134                }
2135                if let Some(ref rs) = o.right_start {
2136                    stack.push(rs);
2137                }
2138                if let Some(ref re) = o.right_end {
2139                    stack.push(re);
2140                }
2141            }
2142            Expression::Interval(i) => {
2143                if let Some(ref this) = i.this {
2144                    stack.push(this);
2145                }
2146            }
2147            Expression::TimeStrToTime(f) => {
2148                stack.push(&f.this);
2149                if let Some(ref zone) = f.zone {
2150                    stack.push(zone);
2151                }
2152            }
2153            Expression::JSONBExtractScalar(f) => {
2154                stack.push(&f.this);
2155                stack.push(&f.expression);
2156                if let Some(ref jt) = f.json_type {
2157                    stack.push(jt);
2158                }
2159            }
2160
2161            // === True leaves and non-expression-bearing nodes ===
2162            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
2163            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
2164            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
2165            _ => {}
2166        }
2167    }
2168}
2169
2170// ---------------------------------------------------------------------------
2171// Tests
2172// ---------------------------------------------------------------------------
2173
2174#[cfg(test)]
2175mod tests {
2176    use super::*;
2177    use crate::dialects::{Dialect, DialectType};
2178    use crate::expressions::DataType;
2179    use crate::optimizer::annotate_types::annotate_types;
2180    use crate::parse_one;
2181    use crate::schema::{MappingSchema, Schema};
2182
2183    fn parse(sql: &str) -> Expression {
2184        let dialect = Dialect::get(DialectType::Generic);
2185        let ast = dialect.parse(sql).unwrap();
2186        ast.into_iter().next().unwrap()
2187    }
2188
2189    #[test]
2190    fn test_simple_lineage() {
2191        let expr = parse("SELECT a FROM t");
2192        let node = lineage("a", &expr, None, false).unwrap();
2193
2194        assert_eq!(node.name, "a");
2195        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2196        // Should trace to t.a
2197        let names = node.downstream_names();
2198        assert!(
2199            names.iter().any(|n| n == "t.a"),
2200            "Expected t.a in downstream, got: {:?}",
2201            names
2202        );
2203    }
2204
2205    #[test]
2206    fn test_lineage_walk() {
2207        let root = LineageNode {
2208            name: "col_a".to_string(),
2209            expression: Expression::Null(crate::expressions::Null),
2210            source: Expression::Null(crate::expressions::Null),
2211            downstream: vec![LineageNode::new(
2212                "t.a",
2213                Expression::Null(crate::expressions::Null),
2214                Expression::Null(crate::expressions::Null),
2215            )],
2216            source_name: String::new(),
2217            reference_node_name: String::new(),
2218        };
2219
2220        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2221        assert_eq!(names.len(), 2);
2222        assert_eq!(names[0], "col_a");
2223        assert_eq!(names[1], "t.a");
2224    }
2225
2226    #[test]
2227    fn test_aliased_column() {
2228        let expr = parse("SELECT a + 1 AS b FROM t");
2229        let node = lineage("b", &expr, None, false).unwrap();
2230
2231        assert_eq!(node.name, "b");
2232        // Should trace through the expression to t.a
2233        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2234        assert!(
2235            all_names.iter().any(|n| n.contains("a")),
2236            "Expected to trace to column a, got: {:?}",
2237            all_names
2238        );
2239    }
2240
2241    #[test]
2242    fn test_qualified_column() {
2243        let expr = parse("SELECT t.a FROM t");
2244        let node = lineage("a", &expr, None, false).unwrap();
2245
2246        assert_eq!(node.name, "a");
2247        let names = node.downstream_names();
2248        assert!(
2249            names.iter().any(|n| n == "t.a"),
2250            "Expected t.a, got: {:?}",
2251            names
2252        );
2253    }
2254
2255    #[test]
2256    fn test_unqualified_column() {
2257        let expr = parse("SELECT a FROM t");
2258        let node = lineage("a", &expr, None, false).unwrap();
2259
2260        // Unqualified but single source → resolved to t.a
2261        let names = node.downstream_names();
2262        assert!(
2263            names.iter().any(|n| n == "t.a"),
2264            "Expected t.a, got: {:?}",
2265            names
2266        );
2267    }
2268
2269    #[test]
2270    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2271        let query = "SELECT name FROM users";
2272        let dialect = Dialect::get(DialectType::BigQuery);
2273        let expr = dialect
2274            .parse(query)
2275            .unwrap()
2276            .into_iter()
2277            .next()
2278            .expect("expected one expression");
2279
2280        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2281        schema
2282            .add_table("users", &[("name".into(), DataType::Text)], None)
2283            .expect("schema setup");
2284
2285        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2286            .expect("lineage without schema");
2287        let mut expr_without = node_without_schema.expression.clone();
2288        annotate_types(
2289            &mut expr_without,
2290            Some(&schema),
2291            Some(DialectType::BigQuery),
2292        );
2293        assert_eq!(
2294            expr_without.inferred_type(),
2295            None,
2296            "Expected unresolved root type without schema-aware lineage qualification"
2297        );
2298
2299        let node_with_schema = lineage_with_schema(
2300            "name",
2301            &expr,
2302            Some(&schema),
2303            Some(DialectType::BigQuery),
2304            false,
2305        )
2306        .expect("lineage with schema");
2307        let mut expr_with = node_with_schema.expression.clone();
2308        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2309
2310        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2311    }
2312
2313    #[test]
2314    fn test_lineage_with_schema_correlated_scalar_subquery() {
2315        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2316        let dialect = Dialect::get(DialectType::BigQuery);
2317        let expr = dialect
2318            .parse(query)
2319            .unwrap()
2320            .into_iter()
2321            .next()
2322            .expect("expected one expression");
2323
2324        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2325        schema
2326            .add_table(
2327                "t1",
2328                &[("id".into(), DataType::BigInt { length: None })],
2329                None,
2330            )
2331            .expect("schema setup");
2332        schema
2333            .add_table(
2334                "t2",
2335                &[
2336                    ("id".into(), DataType::BigInt { length: None }),
2337                    ("val".into(), DataType::BigInt { length: None }),
2338                ],
2339                None,
2340            )
2341            .expect("schema setup");
2342
2343        let node = lineage_with_schema(
2344            "id",
2345            &expr,
2346            Some(&schema),
2347            Some(DialectType::BigQuery),
2348            false,
2349        )
2350        .expect("lineage_with_schema should handle correlated scalar subqueries");
2351
2352        assert_eq!(node.name, "id");
2353    }
2354
2355    #[test]
2356    fn test_lineage_with_schema_join_using() {
2357        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2358        let dialect = Dialect::get(DialectType::BigQuery);
2359        let expr = dialect
2360            .parse(query)
2361            .unwrap()
2362            .into_iter()
2363            .next()
2364            .expect("expected one expression");
2365
2366        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2367        schema
2368            .add_table(
2369                "t1",
2370                &[("a".into(), DataType::BigInt { length: None })],
2371                None,
2372            )
2373            .expect("schema setup");
2374        schema
2375            .add_table(
2376                "t2",
2377                &[("a".into(), DataType::BigInt { length: None })],
2378                None,
2379            )
2380            .expect("schema setup");
2381
2382        let node = lineage_with_schema(
2383            "a",
2384            &expr,
2385            Some(&schema),
2386            Some(DialectType::BigQuery),
2387            false,
2388        )
2389        .expect("lineage_with_schema should handle JOIN USING");
2390
2391        assert_eq!(node.name, "a");
2392    }
2393
2394    #[test]
2395    fn test_lineage_with_schema_qualified_table_name() {
2396        let query = "SELECT a FROM raw.t1";
2397        let dialect = Dialect::get(DialectType::BigQuery);
2398        let expr = dialect
2399            .parse(query)
2400            .unwrap()
2401            .into_iter()
2402            .next()
2403            .expect("expected one expression");
2404
2405        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2406        schema
2407            .add_table(
2408                "raw.t1",
2409                &[("a".into(), DataType::BigInt { length: None })],
2410                None,
2411            )
2412            .expect("schema setup");
2413
2414        let node = lineage_with_schema(
2415            "a",
2416            &expr,
2417            Some(&schema),
2418            Some(DialectType::BigQuery),
2419            false,
2420        )
2421        .expect("lineage_with_schema should handle dotted schema.table names");
2422
2423        assert_eq!(node.name, "a");
2424    }
2425
2426    #[test]
2427    fn test_lineage_with_schema_none_matches_lineage() {
2428        let expr = parse("SELECT a FROM t");
2429        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2430        let with_none =
2431            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2432
2433        assert_eq!(with_none.name, baseline.name);
2434        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2435    }
2436
2437    #[test]
2438    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2439        let dialect = Dialect::get(DialectType::BigQuery);
2440        let expr = dialect
2441            .parse("SELECT Name AS name FROM teams")
2442            .unwrap()
2443            .into_iter()
2444            .next()
2445            .expect("expected one expression");
2446
2447        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2448        schema
2449            .add_table(
2450                "teams",
2451                &[("Name".into(), DataType::String { length: None })],
2452                None,
2453            )
2454            .expect("schema setup");
2455
2456        let node = lineage_with_schema(
2457            "name",
2458            &expr,
2459            Some(&schema),
2460            Some(DialectType::BigQuery),
2461            false,
2462        )
2463        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2464
2465        let names = node.downstream_names();
2466        assert!(
2467            names.iter().any(|n| n == "teams.Name"),
2468            "Expected teams.Name in downstream, got: {:?}",
2469            names
2470        );
2471    }
2472
2473    #[test]
2474    fn test_lineage_bigquery_mixed_case_alias_lookup() {
2475        let dialect = Dialect::get(DialectType::BigQuery);
2476        let expr = dialect
2477            .parse("SELECT Name AS Name FROM teams")
2478            .unwrap()
2479            .into_iter()
2480            .next()
2481            .expect("expected one expression");
2482
2483        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2484            .expect("lineage should resolve mixed-case aliases in BigQuery");
2485
2486        assert_eq!(node.name, "name");
2487    }
2488
2489    #[test]
2490    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
2491        let expr = parse_one(
2492            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2493            DialectType::Snowflake,
2494        )
2495        .expect("parse");
2496
2497        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2498        schema
2499            .add_table(
2500                "fact.some_daily_metrics",
2501                &[("date_utc".to_string(), DataType::Date)],
2502                None,
2503            )
2504            .expect("schema setup");
2505
2506        let node = lineage_with_schema(
2507            "recency",
2508            &expr,
2509            Some(&schema),
2510            Some(DialectType::Snowflake),
2511            false,
2512        )
2513        .expect("lineage_with_schema should not treat date part as a column");
2514
2515        let names = node.downstream_names();
2516        assert!(
2517            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2518            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2519            names
2520        );
2521        assert!(
2522            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2523            "Did not expect date part to appear as lineage column, got: {:?}",
2524            names
2525        );
2526    }
2527
2528    #[test]
2529    fn test_snowflake_datediff_parses_to_typed_ast() {
2530        let expr = parse_one(
2531            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2532            DialectType::Snowflake,
2533        )
2534        .expect("parse");
2535
2536        match expr {
2537            Expression::Select(select) => match &select.expressions[0] {
2538                Expression::Alias(alias) => match &alias.this {
2539                    Expression::DateDiff(f) => {
2540                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
2541                    }
2542                    other => panic!("expected DateDiff, got {other:?}"),
2543                },
2544                other => panic!("expected Alias, got {other:?}"),
2545            },
2546            other => panic!("expected Select, got {other:?}"),
2547        }
2548    }
2549
2550    #[test]
2551    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
2552        let expr = parse_one(
2553            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2554            DialectType::Snowflake,
2555        )
2556        .expect("parse");
2557
2558        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2559        schema
2560            .add_table(
2561                "fact.some_daily_metrics",
2562                &[("date_utc".to_string(), DataType::Date)],
2563                None,
2564            )
2565            .expect("schema setup");
2566
2567        let node = lineage_with_schema(
2568            "next_day",
2569            &expr,
2570            Some(&schema),
2571            Some(DialectType::Snowflake),
2572            false,
2573        )
2574        .expect("lineage_with_schema should not treat DATEADD date part as a column");
2575
2576        let names = node.downstream_names();
2577        assert!(
2578            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2579            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2580            names
2581        );
2582        assert!(
2583            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2584            "Did not expect date part to appear as lineage column, got: {:?}",
2585            names
2586        );
2587    }
2588
2589    #[test]
2590    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
2591        let expr = parse_one(
2592            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2593            DialectType::Snowflake,
2594        )
2595        .expect("parse");
2596
2597        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2598        schema
2599            .add_table(
2600                "fact.some_daily_metrics",
2601                &[("date_utc".to_string(), DataType::Date)],
2602                None,
2603            )
2604            .expect("schema setup");
2605
2606        let node = lineage_with_schema(
2607            "day_part",
2608            &expr,
2609            Some(&schema),
2610            Some(DialectType::Snowflake),
2611            false,
2612        )
2613        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
2614
2615        let names = node.downstream_names();
2616        assert!(
2617            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2618            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2619            names
2620        );
2621        assert!(
2622            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2623            "Did not expect date part to appear as lineage column, got: {:?}",
2624            names
2625        );
2626    }
2627
2628    #[test]
2629    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
2630        let expr = parse_one(
2631            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2632            DialectType::Snowflake,
2633        )
2634        .expect("parse");
2635
2636        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2637        schema
2638            .add_table(
2639                "fact.some_daily_metrics",
2640                &[("date_utc".to_string(), DataType::Date)],
2641                None,
2642            )
2643            .expect("schema setup");
2644
2645        let node = lineage_with_schema(
2646            "day_part",
2647            &expr,
2648            Some(&schema),
2649            Some(DialectType::Snowflake),
2650            false,
2651        )
2652        .expect("quoted DATE_PART should continue to work");
2653
2654        let names = node.downstream_names();
2655        assert!(
2656            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2657            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2658            names
2659        );
2660    }
2661
2662    #[test]
2663    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
2664        let expr = parse_one(
2665            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2666            DialectType::Snowflake,
2667        )
2668        .expect("parse");
2669
2670        match expr {
2671            Expression::Select(select) => match &select.expressions[0] {
2672                Expression::Alias(alias) => match &alias.this {
2673                    Expression::Function(f) => {
2674                        assert_eq!(f.name.to_uppercase(), "DATEADD");
2675                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2676                    }
2677                    other => panic!("expected generic DATEADD function, got {other:?}"),
2678                },
2679                other => panic!("expected Alias, got {other:?}"),
2680            },
2681            other => panic!("expected Select, got {other:?}"),
2682        }
2683    }
2684
2685    #[test]
2686    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
2687        let expr = parse_one(
2688            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2689            DialectType::Snowflake,
2690        )
2691        .expect("parse");
2692
2693        match expr {
2694            Expression::Select(select) => match &select.expressions[0] {
2695                Expression::Alias(alias) => match &alias.this {
2696                    Expression::Function(f) => {
2697                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2698                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2699                    }
2700                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2701                },
2702                other => panic!("expected Alias, got {other:?}"),
2703            },
2704            other => panic!("expected Select, got {other:?}"),
2705        }
2706    }
2707
2708    #[test]
2709    fn test_snowflake_date_part_string_literal_stays_generic_function() {
2710        let expr = parse_one(
2711            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2712            DialectType::Snowflake,
2713        )
2714        .expect("parse");
2715
2716        match expr {
2717            Expression::Select(select) => match &select.expressions[0] {
2718                Expression::Alias(alias) => match &alias.this {
2719                    Expression::Function(f) => {
2720                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2721                    }
2722                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2723                },
2724                other => panic!("expected Alias, got {other:?}"),
2725            },
2726            other => panic!("expected Select, got {other:?}"),
2727        }
2728    }
2729
2730    #[test]
2731    fn test_lineage_join() {
2732        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2733
2734        let node_a = lineage("a", &expr, None, false).unwrap();
2735        let names_a = node_a.downstream_names();
2736        assert!(
2737            names_a.iter().any(|n| n == "t.a"),
2738            "Expected t.a, got: {:?}",
2739            names_a
2740        );
2741
2742        let node_b = lineage("b", &expr, None, false).unwrap();
2743        let names_b = node_b.downstream_names();
2744        assert!(
2745            names_b.iter().any(|n| n == "s.b"),
2746            "Expected s.b, got: {:?}",
2747            names_b
2748        );
2749    }
2750
2751    #[test]
2752    fn test_lineage_alias_leaf_has_resolved_source_name() {
2753        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
2754        let node = lineage("col1", &expr, None, false).unwrap();
2755
2756        // Keep alias in the display lineage edge.
2757        let names = node.downstream_names();
2758        assert!(
2759            names.iter().any(|n| n == "t1.col1"),
2760            "Expected aliased column edge t1.col1, got: {:?}",
2761            names
2762        );
2763
2764        // Leaf should expose the resolved base table for consumers.
2765        let leaf = node
2766            .downstream
2767            .iter()
2768            .find(|n| n.name == "t1.col1")
2769            .expect("Expected t1.col1 leaf");
2770        assert_eq!(leaf.source_name, "table1");
2771        match &leaf.source {
2772            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
2773            _ => panic!("Expected leaf source to be a table expression"),
2774        }
2775    }
2776
2777    #[test]
2778    fn test_lineage_derived_table() {
2779        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
2780        let node = lineage("a", &expr, None, false).unwrap();
2781
2782        assert_eq!(node.name, "a");
2783        // Should trace through the derived table to t.a
2784        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2785        assert!(
2786            all_names.iter().any(|n| n == "t.a"),
2787            "Expected to trace through derived table to t.a, got: {:?}",
2788            all_names
2789        );
2790    }
2791
2792    #[test]
2793    fn test_lineage_cte() {
2794        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
2795        let node = lineage("a", &expr, None, false).unwrap();
2796
2797        assert_eq!(node.name, "a");
2798        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2799        assert!(
2800            all_names.iter().any(|n| n == "t.a"),
2801            "Expected to trace through CTE to t.a, got: {:?}",
2802            all_names
2803        );
2804    }
2805
2806    #[test]
2807    fn test_lineage_union() {
2808        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
2809        let node = lineage("a", &expr, None, false).unwrap();
2810
2811        assert_eq!(node.name, "a");
2812        // Should have 2 downstream branches
2813        assert_eq!(
2814            node.downstream.len(),
2815            2,
2816            "Expected 2 branches for UNION, got {}",
2817            node.downstream.len()
2818        );
2819    }
2820
2821    #[test]
2822    fn test_lineage_cte_union() {
2823        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
2824        let node = lineage("a", &expr, None, false).unwrap();
2825
2826        // Should trace through CTE into both UNION branches
2827        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2828        assert!(
2829            all_names.len() >= 3,
2830            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
2831            all_names
2832        );
2833    }
2834
2835    #[test]
2836    fn test_lineage_star() {
2837        let expr = parse("SELECT * FROM t");
2838        let node = lineage("*", &expr, None, false).unwrap();
2839
2840        assert_eq!(node.name, "*");
2841        // Should have downstream for table t
2842        assert!(
2843            !node.downstream.is_empty(),
2844            "Star should produce downstream nodes"
2845        );
2846    }
2847
2848    #[test]
2849    fn test_lineage_subquery_in_select() {
2850        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
2851        let node = lineage("x", &expr, None, false).unwrap();
2852
2853        assert_eq!(node.name, "x");
2854        // Should have traced into the scalar subquery
2855        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2856        assert!(
2857            all_names.len() >= 2,
2858            "Expected tracing into scalar subquery, got: {:?}",
2859            all_names
2860        );
2861    }
2862
2863    #[test]
2864    fn test_lineage_multiple_columns() {
2865        let expr = parse("SELECT a, b FROM t");
2866
2867        let node_a = lineage("a", &expr, None, false).unwrap();
2868        let node_b = lineage("b", &expr, None, false).unwrap();
2869
2870        assert_eq!(node_a.name, "a");
2871        assert_eq!(node_b.name, "b");
2872
2873        // Each should trace independently
2874        let names_a = node_a.downstream_names();
2875        let names_b = node_b.downstream_names();
2876        assert!(names_a.iter().any(|n| n == "t.a"));
2877        assert!(names_b.iter().any(|n| n == "t.b"));
2878    }
2879
2880    #[test]
2881    fn test_get_source_tables() {
2882        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2883        let node = lineage("a", &expr, None, false).unwrap();
2884
2885        let tables = get_source_tables(&node);
2886        assert!(
2887            tables.contains("t"),
2888            "Expected source table 't', got: {:?}",
2889            tables
2890        );
2891    }
2892
2893    #[test]
2894    fn test_lineage_column_not_found() {
2895        let expr = parse("SELECT a FROM t");
2896        let result = lineage("nonexistent", &expr, None, false);
2897        assert!(result.is_err());
2898    }
2899
2900    #[test]
2901    fn test_lineage_nested_cte() {
2902        let expr = parse(
2903            "WITH cte1 AS (SELECT a FROM t), \
2904             cte2 AS (SELECT a FROM cte1) \
2905             SELECT a FROM cte2",
2906        );
2907        let node = lineage("a", &expr, None, false).unwrap();
2908
2909        // Should trace through cte2 → cte1 → t
2910        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2911        assert!(
2912            all_names.len() >= 3,
2913            "Expected to trace through nested CTEs, got: {:?}",
2914            all_names
2915        );
2916    }
2917
2918    #[test]
2919    fn test_trim_selects_true() {
2920        let expr = parse("SELECT a, b, c FROM t");
2921        let node = lineage("a", &expr, None, true).unwrap();
2922
2923        // The source should be trimmed to only include 'a'
2924        if let Expression::Select(select) = &node.source {
2925            assert_eq!(
2926                select.expressions.len(),
2927                1,
2928                "Trimmed source should have 1 expression, got {}",
2929                select.expressions.len()
2930            );
2931        } else {
2932            panic!("Expected Select source");
2933        }
2934    }
2935
2936    #[test]
2937    fn test_trim_selects_false() {
2938        let expr = parse("SELECT a, b, c FROM t");
2939        let node = lineage("a", &expr, None, false).unwrap();
2940
2941        // The source should keep all columns
2942        if let Expression::Select(select) = &node.source {
2943            assert_eq!(
2944                select.expressions.len(),
2945                3,
2946                "Untrimmed source should have 3 expressions"
2947            );
2948        } else {
2949            panic!("Expected Select source");
2950        }
2951    }
2952
2953    #[test]
2954    fn test_lineage_expression_in_select() {
2955        let expr = parse("SELECT a + b AS c FROM t");
2956        let node = lineage("c", &expr, None, false).unwrap();
2957
2958        // Should trace to both a and b from t
2959        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2960        assert!(
2961            all_names.len() >= 3,
2962            "Expected to trace a + b to both columns, got: {:?}",
2963            all_names
2964        );
2965    }
2966
2967    #[test]
2968    fn test_set_operation_by_index() {
2969        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
2970
2971        // Trace column "a" which is at index 0
2972        let node = lineage("a", &expr, None, false).unwrap();
2973
2974        // UNION branches should be traced by index
2975        assert_eq!(node.downstream.len(), 2);
2976    }
2977
2978    // --- Tests for column lineage inside function calls (issue #18) ---
2979
2980    fn print_node(node: &LineageNode, indent: usize) {
2981        let pad = "  ".repeat(indent);
2982        println!(
2983            "{pad}name={:?} source_name={:?}",
2984            node.name, node.source_name
2985        );
2986        for child in &node.downstream {
2987            print_node(child, indent + 1);
2988        }
2989    }
2990
2991    #[test]
2992    fn test_issue18_repro() {
2993        // Exact scenario from the issue
2994        let query = "SELECT UPPER(name) as upper_name FROM users";
2995        println!("Query: {query}\n");
2996
2997        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
2998        let exprs = dialect.parse(query).unwrap();
2999        let expr = &exprs[0];
3000
3001        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
3002        println!("lineage(\"upper_name\"):");
3003        print_node(&node, 1);
3004
3005        let names = node.downstream_names();
3006        assert!(
3007            names.iter().any(|n| n == "users.name"),
3008            "Expected users.name in downstream, got: {:?}",
3009            names
3010        );
3011    }
3012
3013    #[test]
3014    fn test_lineage_bigquery_safe_namespace_issue207() {
3015        let query = r#"
3016WITH import_cte AS (
3017  SELECT timestamp, data, operation
3018  FROM `project`.`dataset`.`source_table`
3019),
3020transform_cte AS (
3021  SELECT
3022    timestamp,
3023    SAFE.PARSE_JSON(data) AS json_data
3024  FROM import_cte
3025)
3026SELECT json_data FROM transform_cte
3027"#;
3028        let expr = parse_one(query, DialectType::BigQuery).expect("parse");
3029        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3030            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3031        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3032
3033        assert!(
3034            names.iter().any(|name| name == "source_table.data"),
3035            "expected source_table.data in lineage, got {names:?}"
3036        );
3037        assert!(
3038            !names
3039                .iter()
3040                .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
3041            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3042        );
3043    }
3044
3045    #[test]
3046    fn test_lineage_bigquery_safe_namespace_method_call_guard() {
3047        let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
3048        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
3049            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
3050        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3051
3052        assert!(
3053            names.iter().any(|name| name == "t.data"),
3054            "expected t.data in lineage, got {names:?}"
3055        );
3056        assert!(
3057            !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
3058            "did not expect SAFE namespace receiver in lineage, got {names:?}"
3059        );
3060    }
3061
3062    #[test]
3063    fn test_lineage_method_call_receiver_control() {
3064        let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
3065        let node = lineage("out", &expr, None, false).expect("lineage");
3066        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3067
3068        assert!(
3069            names.iter().any(|name| name == "t.obj"),
3070            "expected ordinary method receiver to remain in lineage, got {names:?}"
3071        );
3072        assert!(
3073            names.iter().any(|name| name == "t.arg"),
3074            "expected method argument in lineage, got {names:?}"
3075        );
3076    }
3077
3078    #[test]
3079    fn test_lineage_upper_function() {
3080        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
3081        let node = lineage("upper_name", &expr, None, false).unwrap();
3082
3083        let names = node.downstream_names();
3084        assert!(
3085            names.iter().any(|n| n == "users.name"),
3086            "Expected users.name in downstream, got: {:?}",
3087            names
3088        );
3089    }
3090
3091    #[test]
3092    fn test_lineage_round_function() {
3093        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3094        let node = lineage("rounded", &expr, None, false).unwrap();
3095
3096        let names = node.downstream_names();
3097        assert!(
3098            names.iter().any(|n| n == "products.price"),
3099            "Expected products.price in downstream, got: {:?}",
3100            names
3101        );
3102    }
3103
3104    #[test]
3105    fn test_lineage_coalesce_function() {
3106        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3107        let node = lineage("val", &expr, None, false).unwrap();
3108
3109        let names = node.downstream_names();
3110        assert!(
3111            names.iter().any(|n| n == "t.a"),
3112            "Expected t.a in downstream, got: {:?}",
3113            names
3114        );
3115        assert!(
3116            names.iter().any(|n| n == "t.b"),
3117            "Expected t.b in downstream, got: {:?}",
3118            names
3119        );
3120    }
3121
3122    #[test]
3123    fn test_lineage_count_function() {
3124        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3125        let node = lineage("cnt", &expr, None, false).unwrap();
3126
3127        let names = node.downstream_names();
3128        assert!(
3129            names.iter().any(|n| n == "t.id"),
3130            "Expected t.id in downstream, got: {:?}",
3131            names
3132        );
3133    }
3134
3135    #[test]
3136    fn test_lineage_sum_function() {
3137        let expr = parse("SELECT SUM(amount) AS total FROM t");
3138        let node = lineage("total", &expr, None, false).unwrap();
3139
3140        let names = node.downstream_names();
3141        assert!(
3142            names.iter().any(|n| n == "t.amount"),
3143            "Expected t.amount in downstream, got: {:?}",
3144            names
3145        );
3146    }
3147
3148    #[test]
3149    fn test_lineage_case_with_nested_functions() {
3150        let expr =
3151            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3152        let node = lineage("result", &expr, None, false).unwrap();
3153
3154        let names = node.downstream_names();
3155        assert!(
3156            names.iter().any(|n| n == "t.x"),
3157            "Expected t.x in downstream, got: {:?}",
3158            names
3159        );
3160        assert!(
3161            names.iter().any(|n| n == "t.name"),
3162            "Expected t.name in downstream, got: {:?}",
3163            names
3164        );
3165    }
3166
3167    #[test]
3168    fn test_lineage_substring_function() {
3169        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3170        let node = lineage("short", &expr, None, false).unwrap();
3171
3172        let names = node.downstream_names();
3173        assert!(
3174            names.iter().any(|n| n == "t.name"),
3175            "Expected t.name in downstream, got: {:?}",
3176            names
3177        );
3178    }
3179
3180    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
3181
3182    #[test]
3183    fn test_lineage_cte_select_star() {
3184        // Ported from sqlglot: test_lineage_source_with_star
3185        // WITH y AS (SELECT * FROM x) SELECT a FROM y
3186        // After star expansion: SELECT y.a AS a FROM y
3187        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3188        let node = lineage("a", &expr, None, false).unwrap();
3189
3190        assert_eq!(node.name, "a");
3191        // Should successfully resolve column 'a' through the CTE
3192        // (previously failed with "Cannot find column 'a' in query")
3193        assert!(
3194            !node.downstream.is_empty(),
3195            "Expected downstream nodes tracing through CTE, got none"
3196        );
3197    }
3198
3199    #[test]
3200    fn test_lineage_cte_select_star_renamed_column() {
3201        // dbt standard pattern: CTE with column rename + outer SELECT *
3202        // This is the primary use case for dbt projects (jaffle-shop etc.)
3203        let expr =
3204            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3205        let node = lineage("customer_id", &expr, None, false).unwrap();
3206
3207        assert_eq!(node.name, "customer_id");
3208        // Should trace customer_id → renamed CTE → source.id
3209        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3210        assert!(
3211            all_names.len() >= 2,
3212            "Expected at least 2 nodes (customer_id → source), got: {:?}",
3213            all_names
3214        );
3215    }
3216
3217    #[test]
3218    fn test_lineage_cte_select_star_multiple_columns() {
3219        // CTE exposes multiple columns, outer SELECT * should resolve each
3220        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3221
3222        for col in &["a", "b", "c"] {
3223            let node = lineage(col, &expr, None, false).unwrap();
3224            assert_eq!(node.name, *col);
3225            // Verify lineage resolves without error (star expanded to explicit columns)
3226            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3227            assert!(
3228                all_names.len() >= 2,
3229                "Expected at least 2 nodes for column {}, got: {:?}",
3230                col,
3231                all_names
3232            );
3233        }
3234    }
3235
3236    #[test]
3237    fn test_lineage_nested_cte_select_star() {
3238        // Nested CTE star expansion: cte2 references cte1 via SELECT *
3239        let expr = parse(
3240            "WITH cte1 AS (SELECT a FROM t), \
3241             cte2 AS (SELECT * FROM cte1) \
3242             SELECT * FROM cte2",
3243        );
3244        let node = lineage("a", &expr, None, false).unwrap();
3245
3246        assert_eq!(node.name, "a");
3247        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3248        assert!(
3249            all_names.len() >= 3,
3250            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3251            all_names
3252        );
3253    }
3254
3255    #[test]
3256    fn test_lineage_three_level_nested_cte_star() {
3257        // Three-level nested CTE: cte3 → cte2 → cte1 → t
3258        let expr = parse(
3259            "WITH cte1 AS (SELECT x FROM t), \
3260             cte2 AS (SELECT * FROM cte1), \
3261             cte3 AS (SELECT * FROM cte2) \
3262             SELECT * FROM cte3",
3263        );
3264        let node = lineage("x", &expr, None, false).unwrap();
3265
3266        assert_eq!(node.name, "x");
3267        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3268        assert!(
3269            all_names.len() >= 4,
3270            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3271            all_names
3272        );
3273    }
3274
3275    #[test]
3276    fn test_lineage_cte_union_star() {
3277        // CTE with UNION body, outer SELECT * should resolve from left branch
3278        let expr = parse(
3279            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3280             SELECT * FROM cte",
3281        );
3282        let node = lineage("a", &expr, None, false).unwrap();
3283
3284        assert_eq!(node.name, "a");
3285        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3286        assert!(
3287            all_names.len() >= 2,
3288            "Expected at least 2 nodes for CTE union star, got: {:?}",
3289            all_names
3290        );
3291    }
3292
3293    #[test]
3294    fn test_lineage_cte_star_unknown_table() {
3295        // When CTE references an unknown table, star expansion is skipped gracefully
3296        // and lineage falls back to normal resolution (which may fail)
3297        let expr = parse(
3298            "WITH cte AS (SELECT * FROM unknown_table) \
3299             SELECT * FROM cte",
3300        );
3301        // This should not panic — it may succeed or fail depending on resolution,
3302        // but should not crash
3303        let _result = lineage("x", &expr, None, false);
3304    }
3305
3306    #[test]
3307    fn test_lineage_cte_explicit_columns() {
3308        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
3309        let expr = parse(
3310            "WITH cte(x, y) AS (SELECT a, b FROM t) \
3311             SELECT * FROM cte",
3312        );
3313        let node = lineage("x", &expr, None, false).unwrap();
3314        assert_eq!(node.name, "x");
3315    }
3316
3317    #[test]
3318    fn test_lineage_cte_qualified_star() {
3319        // Qualified star: SELECT cte.* FROM cte
3320        let expr = parse(
3321            "WITH cte AS (SELECT a, b FROM t) \
3322             SELECT cte.* FROM cte",
3323        );
3324        for col in &["a", "b"] {
3325            let node = lineage(col, &expr, None, false).unwrap();
3326            assert_eq!(node.name, *col);
3327            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3328            assert!(
3329                all_names.len() >= 2,
3330                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3331                col,
3332                all_names
3333            );
3334        }
3335    }
3336
3337    #[test]
3338    fn test_lineage_subquery_select_star() {
3339        // Ported from sqlglot: test_select_star
3340        // SELECT x FROM (SELECT * FROM table_a)
3341        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3342        let node = lineage("x", &expr, None, false).unwrap();
3343
3344        assert_eq!(node.name, "x");
3345        assert!(
3346            !node.downstream.is_empty(),
3347            "Expected downstream nodes for subquery with SELECT *, got none"
3348        );
3349    }
3350
3351    #[test]
3352    fn test_lineage_cte_star_with_schema_external_table() {
3353        // CTE references an external table via SELECT * — schema enables expansion
3354        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3355SELECT * FROM orders"#;
3356        let expr = parse(sql);
3357
3358        let mut schema = MappingSchema::new();
3359        let cols = vec![
3360            ("order_id".to_string(), DataType::Unknown),
3361            ("customer_id".to_string(), DataType::Unknown),
3362            ("amount".to_string(), DataType::Unknown),
3363        ];
3364        schema.add_table("stg_orders", &cols, None).unwrap();
3365
3366        let node =
3367            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3368                .unwrap();
3369        assert_eq!(node.name, "order_id");
3370    }
3371
3372    #[test]
3373    fn test_lineage_cte_star_with_schema_three_part_name() {
3374        // CTE references an external table with fully-qualified 3-part name
3375        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3376SELECT * FROM orders"#;
3377        let expr = parse(sql);
3378
3379        let mut schema = MappingSchema::new();
3380        let cols = vec![
3381            ("order_id".to_string(), DataType::Unknown),
3382            ("customer_id".to_string(), DataType::Unknown),
3383        ];
3384        schema
3385            .add_table("db.schema.stg_orders", &cols, None)
3386            .unwrap();
3387
3388        let node = lineage_with_schema(
3389            "customer_id",
3390            &expr,
3391            Some(&schema as &dyn Schema),
3392            None,
3393            false,
3394        )
3395        .unwrap();
3396        assert_eq!(node.name, "customer_id");
3397    }
3398
3399    #[test]
3400    fn test_lineage_cte_star_with_schema_nested() {
3401        // Nested CTEs: outer CTE references inner CTE with SELECT *,
3402        // inner CTE references external table with SELECT *
3403        let sql = r#"WITH
3404            raw AS (SELECT * FROM external_table),
3405            enriched AS (SELECT * FROM raw)
3406        SELECT * FROM enriched"#;
3407        let expr = parse(sql);
3408
3409        let mut schema = MappingSchema::new();
3410        let cols = vec![
3411            ("id".to_string(), DataType::Unknown),
3412            ("name".to_string(), DataType::Unknown),
3413        ];
3414        schema.add_table("external_table", &cols, None).unwrap();
3415
3416        let node =
3417            lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3418        assert_eq!(node.name, "name");
3419    }
3420
3421    #[test]
3422    fn test_lineage_cte_qualified_star_with_schema() {
3423        // CTE uses qualified star (orders.*) from a CTE whose columns
3424        // come from an external table via SELECT *
3425        let sql = r#"WITH
3426            orders AS (SELECT * FROM stg_orders),
3427            enriched AS (
3428                SELECT orders.*, 'extra' AS extra
3429                FROM orders
3430            )
3431        SELECT * FROM enriched"#;
3432        let expr = parse(sql);
3433
3434        let mut schema = MappingSchema::new();
3435        let cols = vec![
3436            ("order_id".to_string(), DataType::Unknown),
3437            ("total".to_string(), DataType::Unknown),
3438        ];
3439        schema.add_table("stg_orders", &cols, None).unwrap();
3440
3441        let node =
3442            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3443                .unwrap();
3444        assert_eq!(node.name, "order_id");
3445
3446        // Also verify the extra column works
3447        let extra =
3448            lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3449        assert_eq!(extra.name, "extra");
3450    }
3451
3452    #[test]
3453    fn test_lineage_cte_star_without_schema_still_works() {
3454        // Without schema, CTE-to-CTE star expansion still works
3455        let sql = r#"WITH
3456            cte1 AS (SELECT id, name FROM raw_table),
3457            cte2 AS (SELECT * FROM cte1)
3458        SELECT * FROM cte2"#;
3459        let expr = parse(sql);
3460
3461        // No schema — should still resolve through CTE chain
3462        let node = lineage("id", &expr, None, false).unwrap();
3463        assert_eq!(node.name, "id");
3464    }
3465
3466    #[test]
3467    fn test_lineage_nested_cte_star_with_join_and_schema() {
3468        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
3469        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
3470        let sql = r#"WITH
3471base_orders AS (
3472    SELECT * FROM stg_orders
3473),
3474with_payments AS (
3475    SELECT
3476        base_orders.*,
3477        p.amount
3478    FROM base_orders
3479    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
3480),
3481final_cte AS (
3482    SELECT * FROM with_payments
3483)
3484SELECT * FROM final_cte"#;
3485        let expr = parse(sql);
3486
3487        let mut schema = MappingSchema::new();
3488        let order_cols = vec![
3489            (
3490                "order_id".to_string(),
3491                crate::expressions::DataType::Unknown,
3492            ),
3493            (
3494                "customer_id".to_string(),
3495                crate::expressions::DataType::Unknown,
3496            ),
3497            ("status".to_string(), crate::expressions::DataType::Unknown),
3498        ];
3499        let pay_cols = vec![
3500            (
3501                "payment_id".to_string(),
3502                crate::expressions::DataType::Unknown,
3503            ),
3504            (
3505                "order_id".to_string(),
3506                crate::expressions::DataType::Unknown,
3507            ),
3508            ("amount".to_string(), crate::expressions::DataType::Unknown),
3509        ];
3510        schema.add_table("stg_orders", &order_cols, None).unwrap();
3511        schema.add_table("stg_payments", &pay_cols, None).unwrap();
3512
3513        // order_id should trace back to stg_orders
3514        let node =
3515            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3516                .unwrap();
3517        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3518
3519        // The leaf should be "stg_orders.order_id" (not just "order_id")
3520        let has_table_qualified = all_names
3521            .iter()
3522            .any(|n| n.contains('.') && n.contains("order_id"));
3523        assert!(
3524            has_table_qualified,
3525            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
3526            all_names
3527        );
3528
3529        // amount should trace back to stg_payments
3530        let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
3531            .unwrap();
3532        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3533
3534        let has_table_qualified = all_names
3535            .iter()
3536            .any(|n| n.contains('.') && n.contains("amount"));
3537        assert!(
3538            has_table_qualified,
3539            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
3540            all_names
3541        );
3542    }
3543
3544    #[test]
3545    fn test_lineage_cte_alias_resolution() {
3546        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
3547        let sql = r#"WITH import_stg_items AS (
3548    SELECT item_id, name, status FROM stg_items
3549)
3550SELECT base.item_id, base.status
3551FROM import_stg_items AS base"#;
3552        let expr = parse(sql);
3553
3554        let node = lineage("item_id", &expr, None, false).unwrap();
3555        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3556        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
3557        assert!(
3558            all_names.iter().any(|n| n == "stg_items.item_id"),
3559            "Expected leaf 'stg_items.item_id', got: {:?}",
3560            all_names
3561        );
3562    }
3563
3564    #[test]
3565    fn test_lineage_cte_alias_with_schema_and_star() {
3566        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
3567        let sql = r#"WITH import_stg AS (
3568    SELECT * FROM stg_items
3569)
3570SELECT base.item_id, base.status
3571FROM import_stg AS base"#;
3572        let expr = parse(sql);
3573
3574        let mut schema = MappingSchema::new();
3575        schema
3576            .add_table(
3577                "stg_items",
3578                &[
3579                    ("item_id".to_string(), DataType::Unknown),
3580                    ("name".to_string(), DataType::Unknown),
3581                    ("status".to_string(), DataType::Unknown),
3582                ],
3583                None,
3584            )
3585            .unwrap();
3586
3587        let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
3588            .unwrap();
3589        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3590        assert!(
3591            all_names.iter().any(|n| n == "stg_items.item_id"),
3592            "Expected leaf 'stg_items.item_id', got: {:?}",
3593            all_names
3594        );
3595    }
3596
3597    #[test]
3598    fn test_lineage_cte_alias_with_join() {
3599        // Multiple CTE aliases in a JOIN: each should resolve independently
3600        let sql = r#"WITH
3601    import_users AS (SELECT id, name FROM users),
3602    import_orders AS (SELECT id, user_id, amount FROM orders)
3603SELECT u.name, o.amount
3604FROM import_users AS u
3605LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
3606        let expr = parse(sql);
3607
3608        let node = lineage("name", &expr, None, false).unwrap();
3609        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3610        assert!(
3611            all_names.iter().any(|n| n == "users.name"),
3612            "Expected leaf 'users.name', got: {:?}",
3613            all_names
3614        );
3615
3616        let node = lineage("amount", &expr, None, false).unwrap();
3617        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3618        assert!(
3619            all_names.iter().any(|n| n == "orders.amount"),
3620            "Expected leaf 'orders.amount', got: {:?}",
3621            all_names
3622        );
3623    }
3624
3625    // -----------------------------------------------------------------------
3626    // Quoted CTE name tests — verifying SQL identifier case semantics
3627    // -----------------------------------------------------------------------
3628
3629    #[test]
3630    fn test_lineage_unquoted_cte_case_insensitive() {
3631        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
3632        // MyCte and MYCTE should match.
3633        let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
3634        let node = lineage("col", &expr, None, false).unwrap();
3635        assert_eq!(node.name, "col");
3636        assert!(
3637            !node.downstream.is_empty(),
3638            "Unquoted CTE should resolve case-insensitively"
3639        );
3640    }
3641
3642    #[test]
3643    fn test_lineage_quoted_cte_case_preserved() {
3644        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
3645        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
3646        let node = lineage("col", &expr, None, false).unwrap();
3647        assert_eq!(node.name, "col");
3648        assert!(
3649            !node.downstream.is_empty(),
3650            "Quoted CTE with matching case should resolve"
3651        );
3652    }
3653
3654    #[test]
3655    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
3656        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
3657        // sqlglot treats this as a table reference, not a CTE match.
3658        // Star expansion should NOT resolve through the CTE.
3659        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
3660        // lineage("col", ...) should fail because "mycte" is treated as an external
3661        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
3662        let result = lineage("col", &expr, None, false);
3663        assert!(
3664            result.is_err(),
3665            "Quoted CTE with case mismatch should not expand star: {:?}",
3666            result
3667        );
3668    }
3669
3670    #[test]
3671    fn test_lineage_mixed_quoted_unquoted_cte() {
3672        // Mix of unquoted and quoted CTEs in a nested chain.
3673        let expr = parse(
3674            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
3675        );
3676        let node = lineage("a", &expr, None, false).unwrap();
3677        assert_eq!(node.name, "a");
3678        assert!(
3679            !node.downstream.is_empty(),
3680            "Mixed quoted/unquoted CTE chain should resolve"
3681        );
3682    }
3683
3684    // -----------------------------------------------------------------------
3685    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
3686    // -----------------------------------------------------------------------
3687    //
3688    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
3689    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
3690    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
3691    // direct string comparison for CTE name matching, ignoring the quoted status.
3692    //
3693    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
3694    // and unquoted as case-insensitive. The scope system should do the same.
3695    //
3696    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
3697    // which is broader than the star expansion scope of this PR.
3698
3699    #[test]
3700    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
3701        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
3702        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
3703        // as "mycte" incorrectly resolves to the CTE.
3704        //
3705        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
3706        // case-sensitive: "mycte" should NOT match CTE "MyCte".
3707        //
3708        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
3709        // this test should fail — update the assertion to match correct behavior:
3710        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
3711        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
3712        let node = lineage("col", &expr, None, false).unwrap();
3713        assert!(!node.downstream.is_empty());
3714        let child = &node.downstream[0];
3715        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
3716        assert_eq!(
3717            child.source_name, "MyCte",
3718            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3719             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3720        );
3721    }
3722
3723    #[test]
3724    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
3725        // Known bug: same as above but with qualified column reference ("mycte".col).
3726        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
3727        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
3728        //
3729        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
3730        // this test should fail — update to assert source_name != "MyCte".
3731        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
3732        let node = lineage("col", &expr, None, false).unwrap();
3733        assert!(!node.downstream.is_empty());
3734        let child = &node.downstream[0];
3735        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
3736        assert_eq!(
3737            child.source_name, "MyCte",
3738            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3739             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3740        );
3741    }
3742
3743    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
3744
3745    /// sqlglot: test_node_name_doesnt_contain_comment
3746    /// Comments in column expressions should not affect lineage resolution.
3747    /// NOTE: This test uses SELECT * from a derived table, which is a separate
3748    /// known limitation in polyglot-sql (star expansion in subqueries).
3749    #[test]
3750    #[ignore = "requires derived table star expansion (separate issue)"]
3751    fn test_node_name_doesnt_contain_comment() {
3752        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
3753        let node = lineage("x", &expr, None, false).unwrap();
3754
3755        assert_eq!(node.name, "x");
3756        assert!(!node.downstream.is_empty());
3757    }
3758
3759    /// A line comment between SELECT and the first column wraps the column
3760    /// in an Annotated node. Lineage must unwrap it to find the column name.
3761    /// Verify that commented and uncommented queries produce identical lineage.
3762    #[test]
3763    fn test_comment_before_first_column_in_cte() {
3764        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
3765        let sql_without_comment = "with t as (select 1 as a) select a from t";
3766
3767        // Without comment — baseline
3768        let expr_ok = parse(sql_without_comment);
3769        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
3770
3771        // With comment — should produce identical lineage
3772        let expr_comment = parse(sql_with_comment);
3773        let node_comment = lineage("a", &expr_comment, None, false)
3774            .expect("with comment before first column should succeed");
3775
3776        assert_eq!(node_ok.name, node_comment.name, "node names should match");
3777        assert_eq!(
3778            node_ok.downstream_names(),
3779            node_comment.downstream_names(),
3780            "downstream lineage should be identical with or without comment"
3781        );
3782    }
3783
3784    /// Block comment between SELECT and first column.
3785    #[test]
3786    fn test_block_comment_before_first_column() {
3787        let sql = "with t as (select 1 as a) select /* section */ a from t";
3788        let expr = parse(sql);
3789        let node = lineage("a", &expr, None, false)
3790            .expect("block comment before first column should succeed");
3791        assert_eq!(node.name, "a");
3792        assert!(
3793            !node.downstream.is_empty(),
3794            "should have downstream lineage"
3795        );
3796    }
3797
3798    /// Comment before first column should not affect second column resolution.
3799    #[test]
3800    fn test_comment_before_first_column_second_col_ok() {
3801        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
3802        let expr = parse(sql);
3803
3804        let node_a =
3805            lineage("a", &expr, None, false).expect("column a with comment should succeed");
3806        assert_eq!(node_a.name, "a");
3807
3808        let node_b =
3809            lineage("b", &expr, None, false).expect("column b with comment should succeed");
3810        assert_eq!(node_b.name, "b");
3811    }
3812
3813    /// Aliased column with preceding comment.
3814    #[test]
3815    fn test_comment_before_aliased_column() {
3816        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
3817        let expr = parse(sql);
3818        let node =
3819            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
3820        assert_eq!(node.name, "y");
3821        assert!(
3822            !node.downstream.is_empty(),
3823            "aliased column should have downstream lineage"
3824        );
3825    }
3826}