Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19/// A node in the column lineage graph
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22    /// Name of this lineage step (e.g., "table.column")
23    pub name: String,
24    /// The expression at this node
25    pub expression: Expression,
26    /// The source expression (the full query context)
27    pub source: Expression,
28    /// Downstream nodes that depend on this one
29    pub downstream: Vec<LineageNode>,
30    /// Optional source name (e.g., for derived tables)
31    pub source_name: String,
32    /// Optional reference node name (e.g., for CTEs)
33    pub reference_node_name: String,
34}
35
36impl LineageNode {
37    /// Create a new lineage node
38    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
39        Self {
40            name: name.into(),
41            expression,
42            source,
43            downstream: Vec::new(),
44            source_name: String::new(),
45            reference_node_name: String::new(),
46        }
47    }
48
49    /// Iterate over all nodes in the lineage graph using DFS
50    pub fn walk(&self) -> LineageWalker<'_> {
51        LineageWalker { stack: vec![self] }
52    }
53
54    /// Get all downstream column names
55    pub fn downstream_names(&self) -> Vec<String> {
56        self.downstream.iter().map(|n| n.name.clone()).collect()
57    }
58}
59
60/// Iterator for walking the lineage graph
61pub struct LineageWalker<'a> {
62    stack: Vec<&'a LineageNode>,
63}
64
65impl<'a> Iterator for LineageWalker<'a> {
66    type Item = &'a LineageNode;
67
68    fn next(&mut self) -> Option<Self::Item> {
69        if let Some(node) = self.stack.pop() {
70            // Add children in reverse order so they're visited in order
71            for child in node.downstream.iter().rev() {
72                self.stack.push(child);
73            }
74            Some(node)
75        } else {
76            None
77        }
78    }
79}
80
81// ---------------------------------------------------------------------------
82// ColumnRef: name or positional index for column lookup
83// ---------------------------------------------------------------------------
84
85/// Column reference for lineage tracing — by name or positional index.
86enum ColumnRef<'a> {
87    Name(&'a str),
88    Index(usize),
89}
90
91// ---------------------------------------------------------------------------
92// Public API
93// ---------------------------------------------------------------------------
94
95/// Build the lineage graph for a column in a SQL query
96///
97/// # Arguments
98/// * `column` - The column name to trace lineage for
99/// * `sql` - The SQL expression (SELECT, UNION, etc.)
100/// * `dialect` - Optional dialect for parsing
101/// * `trim_selects` - If true, trim the source SELECT to only include the target column
102///
103/// # Returns
104/// The root lineage node for the specified column
105///
106/// # Example
107/// ```ignore
108/// use polyglot_sql::lineage::lineage;
109/// use polyglot_sql::parse_one;
110/// use polyglot_sql::DialectType;
111///
112/// let sql = "SELECT a, b + 1 AS c FROM t";
113/// let expr = parse_one(sql, DialectType::Generic).unwrap();
114/// let node = lineage("c", &expr, None, false).unwrap();
115/// ```
116pub fn lineage(
117    column: &str,
118    sql: &Expression,
119    dialect: Option<DialectType>,
120    trim_selects: bool,
121) -> Result<LineageNode> {
122    // Fast path: skip clone when there are no CTEs to expand
123    let has_with = matches!(sql, Expression::Select(s) if s.with.is_some());
124    if !has_with {
125        return lineage_from_expression(column, sql, dialect, trim_selects);
126    }
127    let mut owned = sql.clone();
128    expand_cte_stars(&mut owned, None);
129    lineage_from_expression(column, &owned, dialect, trim_selects)
130}
131
132/// Build the lineage graph for a column in a SQL query using optional schema metadata.
133///
134/// When `schema` is provided, the query is first qualified with
135/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
136/// ambiguous column references.
137///
138/// # Arguments
139/// * `column` - The column name to trace lineage for
140/// * `sql` - The SQL expression (SELECT, UNION, etc.)
141/// * `schema` - Optional schema used for qualification
142/// * `dialect` - Optional dialect for qualification and lineage handling
143/// * `trim_selects` - If true, trim the source SELECT to only include the target column
144///
145/// # Returns
146/// The root lineage node for the specified column
147pub fn lineage_with_schema(
148    column: &str,
149    sql: &Expression,
150    schema: Option<&dyn Schema>,
151    dialect: Option<DialectType>,
152    trim_selects: bool,
153) -> Result<LineageNode> {
154    let mut qualified_expression = if let Some(schema) = schema {
155        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
156            QualifyColumnsOptions::new().with_dialect(dialect_type)
157        } else {
158            QualifyColumnsOptions::new()
159        };
160
161        qualify_columns(sql.clone(), schema, &options).map_err(|e| {
162            Error::internal(format!("Lineage qualification failed with schema: {}", e))
163        })?
164    } else {
165        sql.clone()
166    };
167
168    // Annotate types in-place so lineage nodes carry type information
169    annotate_types(&mut qualified_expression, schema, dialect);
170
171    // Expand CTE stars on the already-owned expression (no extra clone).
172    // Pass schema so that stars from external tables can also be resolved.
173    expand_cte_stars(&mut qualified_expression, schema);
174
175    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
176}
177
178fn lineage_from_expression(
179    column: &str,
180    sql: &Expression,
181    dialect: Option<DialectType>,
182    trim_selects: bool,
183) -> Result<LineageNode> {
184    let scope = build_scope(sql);
185    to_node(
186        ColumnRef::Name(column),
187        &scope,
188        dialect,
189        "",
190        "",
191        "",
192        trim_selects,
193    )
194}
195
196// ---------------------------------------------------------------------------
197// CTE star expansion
198// ---------------------------------------------------------------------------
199
200/// Normalize an identifier for CTE name matching.
201///
202/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
203/// quoted identifiers preserve their original case. This matches sqlglot's
204/// `normalize_identifiers` behavior.
205fn normalize_cte_name(ident: &Identifier) -> String {
206    if ident.quoted {
207        ident.name.clone()
208    } else {
209        ident.name.to_lowercase()
210    }
211}
212
213/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
214/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
215/// which qualify_columns cannot resolve because it processes each SELECT independently.
216///
217/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
218/// by looking up column names in the schema. This enables correct expansion of patterns
219/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
220///
221/// CTE name matching follows SQL identifier semantics: unquoted names are compared
222/// case-insensitively (lowercased), while quoted names preserve their original case.
223/// This matches sqlglot's `normalize_identifiers` behavior.
224pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
225    let select = match expr {
226        Expression::Select(s) => s,
227        _ => return,
228    };
229
230    let with = match &mut select.with {
231        Some(w) => w,
232        None => return,
233    };
234
235    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
236
237    for cte in &mut with.ctes {
238        let cte_name = normalize_cte_name(&cte.alias);
239
240        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
241        if !cte.columns.is_empty() {
242            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
243            resolved_cte_columns.insert(cte_name, cols);
244            continue;
245        }
246
247        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
248        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
249        // references itself. We detect this conservatively: if the CTE name appears as
250        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
251        // block are still expanded.
252        if with.recursive {
253            let is_self_referencing =
254                if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
255                    let body_sources = get_select_sources(body_select);
256                    body_sources.iter().any(|s| s.normalized == cte_name)
257                } else {
258                    false
259                };
260            if is_self_referencing {
261                continue;
262            }
263        }
264
265        // Get the SELECT from the CTE body (handle UNION by taking left branch)
266        let body_select = match get_leftmost_select_mut(&mut cte.this) {
267            Some(s) => s,
268            None => continue,
269        };
270
271        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
272        resolved_cte_columns.insert(cte_name, columns);
273    }
274
275    // Also expand stars in the outer SELECT itself
276    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
277}
278
279/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
280///
281/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
282/// are determined by the left branch. This matches sqlglot's behavior.
283fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
284    let mut current = expr;
285    for _ in 0..MAX_LINEAGE_DEPTH {
286        match current {
287            Expression::Select(s) => return Some(s),
288            Expression::Union(u) => current = &mut u.left,
289            Expression::Intersect(i) => current = &mut i.left,
290            Expression::Except(e) => current = &mut e.left,
291            Expression::Paren(p) => current = &mut p.this,
292            _ => return None,
293        }
294    }
295    None
296}
297
298/// Rewrite star expressions in a SELECT using resolved CTE column lists.
299/// Falls back to `schema` for external table column lookup.
300/// Returns the list of output column names after expansion.
301fn rewrite_stars_in_select(
302    select: &mut Select,
303    resolved_ctes: &HashMap<String, Vec<String>>,
304    schema: Option<&dyn Schema>,
305) -> Vec<String> {
306    // The AST represents star expressions in two forms depending on syntax:
307    //   - `SELECT *`      → Expression::Star (unqualified star)
308    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
309    // Both must be checked to handle all star patterns.
310    let has_star = select
311        .expressions
312        .iter()
313        .any(|e| matches!(e, Expression::Star(_)));
314    let has_qualified_star = select
315        .expressions
316        .iter()
317        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
318
319    if !has_star && !has_qualified_star {
320        // No stars — just extract column names without rewriting
321        return select
322            .expressions
323            .iter()
324            .filter_map(get_expression_output_name)
325            .collect();
326    }
327
328    let sources = get_select_sources(select);
329    let mut new_expressions = Vec::new();
330    let mut result_columns = Vec::new();
331
332    for expr in &select.expressions {
333        match expr {
334            Expression::Star(star) => {
335                let qual = star.table.as_ref();
336                if let Some(expanded) =
337                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
338                {
339                    for (src_alias, col_name) in &expanded {
340                        let table_id = Identifier::new(src_alias);
341                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
342                        result_columns.push(col_name.clone());
343                    }
344                } else {
345                    new_expressions.push(expr.clone());
346                    result_columns.push("*".to_string());
347                }
348            }
349            Expression::Column(c) if c.name.name == "*" => {
350                let qual = c.table.as_ref();
351                if let Some(expanded) =
352                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
353                {
354                    for (_src_alias, col_name) in &expanded {
355                        // Keep the original table qualifier for qualified stars (table.*)
356                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
357                        result_columns.push(col_name.clone());
358                    }
359                } else {
360                    new_expressions.push(expr.clone());
361                    result_columns.push("*".to_string());
362                }
363            }
364            _ => {
365                new_expressions.push(expr.clone());
366                if let Some(name) = get_expression_output_name(expr) {
367                    result_columns.push(name);
368                }
369            }
370        }
371    }
372
373    select.expressions = new_expressions;
374    result_columns
375}
376
377/// Try to expand a star expression by looking up source columns from resolved CTEs,
378/// falling back to the schema for external tables.
379/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
380/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
381fn expand_star_from_sources(
382    qualifier: Option<&Identifier>,
383    sources: &[SourceInfo],
384    resolved_ctes: &HashMap<String, Vec<String>>,
385    schema: Option<&dyn Schema>,
386) -> Option<Vec<(String, String)>> {
387    let mut expanded = Vec::new();
388
389    if let Some(qual) = qualifier {
390        // Qualified star: table.*
391        let qual_normalized = normalize_cte_name(qual);
392        for src in sources {
393            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
394                // Try CTE first
395                if let Some(cols) = resolved_ctes.get(&src.normalized) {
396                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
397                    return Some(expanded);
398                }
399                // Fall back to schema
400                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
401                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
402                    return Some(expanded);
403                }
404            }
405        }
406        None
407    } else {
408        // Unqualified star: expand all sources.
409        // Intentionally conservative: if any source can't be resolved, the entire
410        // expansion is aborted. Partial expansion would produce an incomplete column
411        // list, causing downstream lineage resolution to silently omit columns.
412        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
413        let mut any_expanded = false;
414        for src in sources {
415            if let Some(cols) = resolved_ctes.get(&src.normalized) {
416                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
417                any_expanded = true;
418            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
419                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
420                any_expanded = true;
421            } else {
422                return None;
423            }
424        }
425        if any_expanded {
426            Some(expanded)
427        } else {
428            None
429        }
430    }
431}
432
433/// Look up column names for a table from the schema.
434fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
435    let schema = schema?;
436    if fq_name.is_empty() {
437        return None;
438    }
439    schema
440        .column_names(fq_name)
441        .ok()
442        .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
443}
444
445/// Create a Column expression with the given name and optional table qualifier.
446fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
447    Expression::Column(Box::new(crate::expressions::Column {
448        name: Identifier::new(name),
449        table: table.cloned(),
450        join_mark: false,
451        trailing_comments: Vec::new(),
452        span: None,
453        inferred_type: None,
454    }))
455}
456
457/// Extract the output name of a SELECT expression.
458fn get_expression_output_name(expr: &Expression) -> Option<String> {
459    match expr {
460        Expression::Alias(a) => Some(a.alias.name.clone()),
461        Expression::Column(c) => Some(c.name.name.clone()),
462        Expression::Identifier(id) => Some(id.name.clone()),
463        Expression::Star(_) => Some("*".to_string()),
464        _ => None,
465    }
466}
467
468/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
469struct SourceInfo {
470    alias: String,
471    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
472    normalized: String,
473    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
474    fq_name: String,
475}
476
477/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
478/// SELECT's FROM and JOIN clauses in a single pass.
479fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
480    let mut sources = Vec::new();
481
482    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
483        match expr {
484            Expression::Table(t) => {
485                let normalized = normalize_cte_name(&t.name);
486                let alias = t
487                    .alias
488                    .as_ref()
489                    .map(|a| a.name.clone())
490                    .unwrap_or_else(|| t.name.name.clone());
491                let mut parts = Vec::new();
492                if let Some(catalog) = &t.catalog {
493                    parts.push(catalog.name.clone());
494                }
495                if let Some(schema) = &t.schema {
496                    parts.push(schema.name.clone());
497                }
498                parts.push(t.name.name.clone());
499                let fq_name = parts.join(".");
500                Some(SourceInfo {
501                    alias,
502                    normalized,
503                    fq_name,
504                })
505            }
506            Expression::Subquery(s) => {
507                let alias = s.alias.as_ref()?.name.clone();
508                let normalized = alias.to_lowercase();
509                let fq_name = alias.clone();
510                Some(SourceInfo {
511                    alias,
512                    normalized,
513                    fq_name,
514                })
515            }
516            Expression::Paren(p) => extract_source(&p.this),
517            _ => None,
518        }
519    }
520
521    if let Some(from) = &select.from {
522        for expr in &from.expressions {
523            if let Some(info) = extract_source(expr) {
524                sources.push(info);
525            }
526        }
527    }
528    for join in &select.joins {
529        if let Some(info) = extract_source(&join.this) {
530            sources.push(info);
531        }
532    }
533    sources
534}
535
536/// Get all source tables from a lineage graph
537pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
538    let mut tables = HashSet::new();
539    collect_source_tables(node, &mut tables);
540    tables
541}
542
543/// Recursively collect source table names from lineage graph
544pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
545    if let Expression::Table(table) = &node.source {
546        tables.insert(table.name.name.clone());
547    }
548    for child in &node.downstream {
549        collect_source_tables(child, tables);
550    }
551}
552
553// ---------------------------------------------------------------------------
554// Core recursive lineage builder
555// ---------------------------------------------------------------------------
556
557/// Maximum recursion depth for lineage tracing to prevent stack overflow
558/// on circular or deeply nested CTE chains.
559const MAX_LINEAGE_DEPTH: usize = 64;
560
561/// Recursively build a lineage node for a column in a scope.
562fn to_node(
563    column: ColumnRef<'_>,
564    scope: &Scope,
565    dialect: Option<DialectType>,
566    scope_name: &str,
567    source_name: &str,
568    reference_node_name: &str,
569    trim_selects: bool,
570) -> Result<LineageNode> {
571    to_node_inner(
572        column,
573        scope,
574        dialect,
575        scope_name,
576        source_name,
577        reference_node_name,
578        trim_selects,
579        &[],
580        0,
581    )
582}
583
584fn to_node_inner(
585    column: ColumnRef<'_>,
586    scope: &Scope,
587    dialect: Option<DialectType>,
588    scope_name: &str,
589    source_name: &str,
590    reference_node_name: &str,
591    trim_selects: bool,
592    ancestor_cte_scopes: &[Scope],
593    depth: usize,
594) -> Result<LineageNode> {
595    if depth > MAX_LINEAGE_DEPTH {
596        return Err(Error::internal(format!(
597            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
598        )));
599    }
600    let scope_expr = &scope.expression;
601
602    // Build combined CTE scopes: current scope's cte_scopes + ancestors
603    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
604    for s in ancestor_cte_scopes {
605        all_cte_scopes.push(s);
606    }
607
608    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
609    //    but we need the inner query (SELECT/UNION) for column lookup.
610    let effective_expr = match scope_expr {
611        Expression::Cte(cte) => &cte.this,
612        other => other,
613    };
614
615    // 1. Set operations (UNION / INTERSECT / EXCEPT)
616    if matches!(
617        effective_expr,
618        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
619    ) {
620        // For CTE wrapping a set op, create a temporary scope with the inner expression
621        if matches!(scope_expr, Expression::Cte(_)) {
622            let mut inner_scope = Scope::new(effective_expr.clone());
623            inner_scope.union_scopes = scope.union_scopes.clone();
624            inner_scope.sources = scope.sources.clone();
625            inner_scope.cte_sources = scope.cte_sources.clone();
626            inner_scope.cte_scopes = scope.cte_scopes.clone();
627            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
628            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
629            return handle_set_operation(
630                &column,
631                &inner_scope,
632                dialect,
633                scope_name,
634                source_name,
635                reference_node_name,
636                trim_selects,
637                ancestor_cte_scopes,
638                depth,
639            );
640        }
641        return handle_set_operation(
642            &column,
643            scope,
644            dialect,
645            scope_name,
646            source_name,
647            reference_node_name,
648            trim_selects,
649            ancestor_cte_scopes,
650            depth,
651        );
652    }
653
654    // 2. Find the select expression for this column
655    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
656    let column_name = resolve_column_name(&column, &select_expr);
657
658    // 3. Trim source if requested
659    let node_source = if trim_selects {
660        trim_source(effective_expr, &select_expr)
661    } else {
662        effective_expr.clone()
663    };
664
665    // 4. Create the lineage node
666    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
667    node.source_name = source_name.to_string();
668    node.reference_node_name = reference_node_name.to_string();
669
670    // 5. Star handling — add downstream for each source
671    if matches!(&select_expr, Expression::Star(_)) {
672        for (name, source_info) in &scope.sources {
673            let child = LineageNode::new(
674                format!("{}.*", name),
675                Expression::Star(crate::expressions::Star {
676                    table: None,
677                    except: None,
678                    replace: None,
679                    rename: None,
680                    trailing_comments: vec![],
681                    span: None,
682                }),
683                source_info.expression.clone(),
684            );
685            node.downstream.push(child);
686        }
687        return Ok(node);
688    }
689
690    // 6. Subqueries in select — trace through scalar subqueries
691    let subqueries: Vec<&Expression> =
692        select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
693    for sq_expr in subqueries {
694        if let Expression::Subquery(sq) = sq_expr {
695            for sq_scope in &scope.subquery_scopes {
696                if sq_scope.expression == sq.this {
697                    if let Ok(child) = to_node_inner(
698                        ColumnRef::Index(0),
699                        sq_scope,
700                        dialect,
701                        &column_name,
702                        "",
703                        "",
704                        trim_selects,
705                        ancestor_cte_scopes,
706                        depth + 1,
707                    ) {
708                        node.downstream.push(child);
709                    }
710                    break;
711                }
712            }
713        }
714    }
715
716    // 7. Column references — trace each column to its source
717    let col_refs = find_column_refs_in_expr(&select_expr);
718    for col_ref in col_refs {
719        let col_name = &col_ref.column;
720        if let Some(ref table_id) = col_ref.table {
721            let tbl = &table_id.name;
722            resolve_qualified_column(
723                &mut node,
724                scope,
725                dialect,
726                tbl,
727                col_name,
728                &column_name,
729                trim_selects,
730                &all_cte_scopes,
731                depth,
732            );
733        } else {
734            resolve_unqualified_column(
735                &mut node,
736                scope,
737                dialect,
738                col_name,
739                &column_name,
740                trim_selects,
741                &all_cte_scopes,
742                depth,
743            );
744        }
745    }
746
747    Ok(node)
748}
749
750// ---------------------------------------------------------------------------
751// Set operation handling
752// ---------------------------------------------------------------------------
753
754fn handle_set_operation(
755    column: &ColumnRef<'_>,
756    scope: &Scope,
757    dialect: Option<DialectType>,
758    scope_name: &str,
759    source_name: &str,
760    reference_node_name: &str,
761    trim_selects: bool,
762    ancestor_cte_scopes: &[Scope],
763    depth: usize,
764) -> Result<LineageNode> {
765    let scope_expr = &scope.expression;
766
767    // Determine column index
768    let col_index = match column {
769        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
770        ColumnRef::Index(i) => *i,
771    };
772
773    let col_name = match column {
774        ColumnRef::Name(name) => name.to_string(),
775        ColumnRef::Index(_) => format!("_{col_index}"),
776    };
777
778    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
779    node.source_name = source_name.to_string();
780    node.reference_node_name = reference_node_name.to_string();
781
782    // Recurse into each union branch
783    for branch_scope in &scope.union_scopes {
784        if let Ok(child) = to_node_inner(
785            ColumnRef::Index(col_index),
786            branch_scope,
787            dialect,
788            scope_name,
789            "",
790            "",
791            trim_selects,
792            ancestor_cte_scopes,
793            depth + 1,
794        ) {
795            node.downstream.push(child);
796        }
797    }
798
799    Ok(node)
800}
801
802// ---------------------------------------------------------------------------
803// Column resolution helpers
804// ---------------------------------------------------------------------------
805
806fn resolve_qualified_column(
807    node: &mut LineageNode,
808    scope: &Scope,
809    dialect: Option<DialectType>,
810    table: &str,
811    col_name: &str,
812    parent_name: &str,
813    trim_selects: bool,
814    all_cte_scopes: &[&Scope],
815    depth: usize,
816) {
817    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
818    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
819    let resolved_cte_name = resolve_cte_alias(scope, table);
820    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
821
822    // Check if table is a CTE reference — check both the current scope's cte_sources
823    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
824    let is_cte = scope.cte_sources.contains_key(effective_table)
825        || all_cte_scopes.iter().any(
826            |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
827        );
828    if is_cte {
829        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
830            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
831            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
832            if let Ok(child) = to_node_inner(
833                ColumnRef::Name(col_name),
834                child_scope,
835                dialect,
836                parent_name,
837                effective_table,
838                parent_name,
839                trim_selects,
840                &ancestors,
841                depth + 1,
842            ) {
843                node.downstream.push(child);
844                return;
845            }
846        }
847    }
848
849    // Check if table is a derived table (is_scope = true in sources)
850    if let Some(source_info) = scope.sources.get(table) {
851        if source_info.is_scope {
852            if let Some(child_scope) = find_child_scope(scope, table) {
853                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
854                if let Ok(child) = to_node_inner(
855                    ColumnRef::Name(col_name),
856                    child_scope,
857                    dialect,
858                    parent_name,
859                    table,
860                    parent_name,
861                    trim_selects,
862                    &ancestors,
863                    depth + 1,
864                ) {
865                    node.downstream.push(child);
866                    return;
867                }
868            }
869        }
870    }
871
872    // Base table source found in current scope: preserve alias in the display name
873    // but store the resolved table expression and name for downstream consumers.
874    if let Some(source_info) = scope.sources.get(table) {
875        if !source_info.is_scope {
876            node.downstream.push(make_table_column_node_from_source(
877                table,
878                col_name,
879                &source_info.expression,
880            ));
881            return;
882        }
883    }
884
885    // Base table or unresolved — terminal node
886    node.downstream
887        .push(make_table_column_node(table, col_name));
888}
889
890/// Resolve a FROM alias to the original CTE name.
891///
892/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
893/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
894/// This function checks if `name` is such an alias and returns the CTE name.
895fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
896    // If it's already a known CTE name, no resolution needed
897    if scope.cte_sources.contains_key(name) {
898        return None;
899    }
900    // Check if the source's expression is a CTE — if so, extract the CTE name
901    if let Some(source_info) = scope.sources.get(name) {
902        if source_info.is_scope {
903            if let Expression::Cte(cte) = &source_info.expression {
904                let cte_name = &cte.alias.name;
905                if scope.cte_sources.contains_key(cte_name) {
906                    return Some(cte_name.clone());
907                }
908            }
909        }
910    }
911    None
912}
913
914fn resolve_unqualified_column(
915    node: &mut LineageNode,
916    scope: &Scope,
917    dialect: Option<DialectType>,
918    col_name: &str,
919    parent_name: &str,
920    trim_selects: bool,
921    all_cte_scopes: &[&Scope],
922    depth: usize,
923) {
924    // Try to find which source this column belongs to.
925    // Build the source list from the actual FROM/JOIN clauses to avoid
926    // mixing in CTE definitions that are in scope but not referenced.
927    let from_source_names = source_names_from_from_join(scope);
928
929    if from_source_names.len() == 1 {
930        let tbl = &from_source_names[0];
931        resolve_qualified_column(
932            node,
933            scope,
934            dialect,
935            tbl,
936            col_name,
937            parent_name,
938            trim_selects,
939            all_cte_scopes,
940            depth,
941        );
942        return;
943    }
944
945    // Multiple sources — can't resolve without schema info, add unqualified node
946    let child = LineageNode::new(
947        col_name.to_string(),
948        Expression::Column(Box::new(crate::expressions::Column {
949            name: crate::expressions::Identifier::new(col_name.to_string()),
950            table: None,
951            join_mark: false,
952            trailing_comments: vec![],
953            span: None,
954            inferred_type: None,
955        })),
956        node.source.clone(),
957    );
958    node.downstream.push(child);
959}
960
961fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
962    fn source_name(expr: &Expression) -> Option<String> {
963        match expr {
964            Expression::Table(table) => Some(
965                table
966                    .alias
967                    .as_ref()
968                    .map(|a| a.name.clone())
969                    .unwrap_or_else(|| table.name.name.clone()),
970            ),
971            Expression::Subquery(subquery) => {
972                subquery.alias.as_ref().map(|alias| alias.name.clone())
973            }
974            Expression::Paren(paren) => source_name(&paren.this),
975            _ => None,
976        }
977    }
978
979    let effective_expr = match &scope.expression {
980        Expression::Cte(cte) => &cte.this,
981        expr => expr,
982    };
983
984    let mut names = Vec::new();
985    let mut seen = std::collections::HashSet::new();
986
987    if let Expression::Select(select) = effective_expr {
988        if let Some(from) = &select.from {
989            for expr in &from.expressions {
990                if let Some(name) = source_name(expr) {
991                    if !name.is_empty() && seen.insert(name.clone()) {
992                        names.push(name);
993                    }
994                }
995            }
996        }
997        for join in &select.joins {
998            if let Some(name) = source_name(&join.this) {
999                if !name.is_empty() && seen.insert(name.clone()) {
1000                    names.push(name);
1001                }
1002            }
1003        }
1004    }
1005
1006    names
1007}
1008
1009// ---------------------------------------------------------------------------
1010// Helper functions
1011// ---------------------------------------------------------------------------
1012
1013/// Get the alias or name of an expression
1014fn get_alias_or_name(expr: &Expression) -> Option<String> {
1015    match expr {
1016        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1017        Expression::Column(col) => Some(col.name.name.clone()),
1018        Expression::Identifier(id) => Some(id.name.clone()),
1019        Expression::Star(_) => Some("*".to_string()),
1020        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
1021        // Unwrap to get the actual column/alias name from the inner expression.
1022        Expression::Annotated(a) => get_alias_or_name(&a.this),
1023        _ => None,
1024    }
1025}
1026
1027/// Resolve the display name for a column reference.
1028fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1029    match column {
1030        ColumnRef::Name(n) => n.to_string(),
1031        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1032    }
1033}
1034
1035/// Find the select expression matching a column reference.
1036fn find_select_expr(
1037    scope_expr: &Expression,
1038    column: &ColumnRef<'_>,
1039    dialect: Option<DialectType>,
1040) -> Result<Expression> {
1041    if let Expression::Select(ref select) = scope_expr {
1042        match column {
1043            ColumnRef::Name(name) => {
1044                let normalized_name = normalize_column_name(name, dialect);
1045                for expr in &select.expressions {
1046                    if let Some(alias_or_name) = get_alias_or_name(expr) {
1047                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1048                            return Ok(expr.clone());
1049                        }
1050                    }
1051                }
1052                Err(crate::error::Error::parse(
1053                    format!("Cannot find column '{}' in query", name),
1054                    0,
1055                    0,
1056                    0,
1057                    0,
1058                ))
1059            }
1060            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1061                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1062            }),
1063        }
1064    } else {
1065        Err(crate::error::Error::parse(
1066            "Expected SELECT expression for column lookup",
1067            0,
1068            0,
1069            0,
1070            0,
1071        ))
1072    }
1073}
1074
1075/// Find the positional index of a column name in a set operation's first SELECT branch.
1076fn column_to_index(
1077    set_op_expr: &Expression,
1078    name: &str,
1079    dialect: Option<DialectType>,
1080) -> Result<usize> {
1081    let normalized_name = normalize_column_name(name, dialect);
1082    let mut expr = set_op_expr;
1083    loop {
1084        match expr {
1085            Expression::Union(u) => expr = &u.left,
1086            Expression::Intersect(i) => expr = &i.left,
1087            Expression::Except(e) => expr = &e.left,
1088            Expression::Select(select) => {
1089                for (i, e) in select.expressions.iter().enumerate() {
1090                    if let Some(alias_or_name) = get_alias_or_name(e) {
1091                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1092                            return Ok(i);
1093                        }
1094                    }
1095                }
1096                return Err(crate::error::Error::parse(
1097                    format!("Cannot find column '{}' in set operation", name),
1098                    0,
1099                    0,
1100                    0,
1101                    0,
1102                ));
1103            }
1104            _ => {
1105                return Err(crate::error::Error::parse(
1106                    "Expected SELECT or set operation",
1107                    0,
1108                    0,
1109                    0,
1110                    0,
1111                ))
1112            }
1113        }
1114    }
1115}
1116
1117fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1118    normalize_name(name, dialect, false, true)
1119}
1120
1121/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
1122fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1123    if let Expression::Select(select) = select_expr {
1124        let mut trimmed = select.as_ref().clone();
1125        trimmed.expressions = vec![target_expr.clone()];
1126        Expression::Select(Box::new(trimmed))
1127    } else {
1128        select_expr.clone()
1129    }
1130}
1131
1132/// Find the child scope (CTE or derived table) for a given source name.
1133fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1134    // Check CTE scopes
1135    if scope.cte_sources.contains_key(source_name) {
1136        for cte_scope in &scope.cte_scopes {
1137            if let Expression::Cte(cte) = &cte_scope.expression {
1138                if cte.alias.name == source_name {
1139                    return Some(cte_scope);
1140                }
1141            }
1142        }
1143    }
1144
1145    // Check derived table scopes
1146    if let Some(source_info) = scope.sources.get(source_name) {
1147        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1148            if let Expression::Subquery(sq) = &source_info.expression {
1149                for dt_scope in &scope.derived_table_scopes {
1150                    if dt_scope.expression == sq.this {
1151                        return Some(dt_scope);
1152                    }
1153                }
1154            }
1155        }
1156    }
1157
1158    None
1159}
1160
1161/// Find a CTE scope by name, searching through a combined list of CTE scopes.
1162/// This handles nested CTEs where the current scope doesn't have the CTE scope
1163/// as a direct child but knows about it via cte_sources.
1164fn find_child_scope_in<'a>(
1165    all_cte_scopes: &[&'a Scope],
1166    scope: &'a Scope,
1167    source_name: &str,
1168) -> Option<&'a Scope> {
1169    // First try the scope's own cte_scopes
1170    for cte_scope in &scope.cte_scopes {
1171        if let Expression::Cte(cte) = &cte_scope.expression {
1172            if cte.alias.name == source_name {
1173                return Some(cte_scope);
1174            }
1175        }
1176    }
1177
1178    // Then search through all ancestor CTE scopes
1179    for cte_scope in all_cte_scopes {
1180        if let Expression::Cte(cte) = &cte_scope.expression {
1181            if cte.alias.name == source_name {
1182                return Some(cte_scope);
1183            }
1184        }
1185    }
1186
1187    // Fall back to derived table scopes
1188    if let Some(source_info) = scope.sources.get(source_name) {
1189        if source_info.is_scope {
1190            if let Expression::Subquery(sq) = &source_info.expression {
1191                for dt_scope in &scope.derived_table_scopes {
1192                    if dt_scope.expression == sq.this {
1193                        return Some(dt_scope);
1194                    }
1195                }
1196            }
1197        }
1198    }
1199
1200    None
1201}
1202
1203/// Create a terminal lineage node for a table.column reference.
1204fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1205    let mut node = LineageNode::new(
1206        format!("{}.{}", table, column),
1207        Expression::Column(Box::new(crate::expressions::Column {
1208            name: crate::expressions::Identifier::new(column.to_string()),
1209            table: Some(crate::expressions::Identifier::new(table.to_string())),
1210            join_mark: false,
1211            trailing_comments: vec![],
1212            span: None,
1213            inferred_type: None,
1214        })),
1215        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1216    );
1217    node.source_name = table.to_string();
1218    node
1219}
1220
1221fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1222    let mut parts: Vec<String> = Vec::new();
1223    if let Some(catalog) = &table_ref.catalog {
1224        parts.push(catalog.name.clone());
1225    }
1226    if let Some(schema) = &table_ref.schema {
1227        parts.push(schema.name.clone());
1228    }
1229    parts.push(table_ref.name.name.clone());
1230    parts.join(".")
1231}
1232
1233fn make_table_column_node_from_source(
1234    table_alias: &str,
1235    column: &str,
1236    source: &Expression,
1237) -> LineageNode {
1238    let mut node = LineageNode::new(
1239        format!("{}.{}", table_alias, column),
1240        Expression::Column(Box::new(crate::expressions::Column {
1241            name: crate::expressions::Identifier::new(column.to_string()),
1242            table: Some(crate::expressions::Identifier::new(table_alias.to_string())),
1243            join_mark: false,
1244            trailing_comments: vec![],
1245            span: None,
1246            inferred_type: None,
1247        })),
1248        source.clone(),
1249    );
1250
1251    if let Expression::Table(table_ref) = source {
1252        node.source_name = table_name_from_table_ref(table_ref);
1253    } else {
1254        node.source_name = table_alias.to_string();
1255    }
1256
1257    node
1258}
1259
1260/// Simple column reference extracted from an expression
1261#[derive(Debug, Clone)]
1262struct SimpleColumnRef {
1263    table: Option<crate::expressions::Identifier>,
1264    column: String,
1265}
1266
1267/// Find all column references in an expression (does not recurse into subqueries).
1268fn find_column_refs_in_expr(expr: &Expression) -> Vec<SimpleColumnRef> {
1269    let mut refs = Vec::new();
1270    collect_column_refs(expr, &mut refs);
1271    refs
1272}
1273
1274fn collect_column_refs(expr: &Expression, refs: &mut Vec<SimpleColumnRef>) {
1275    let mut stack: Vec<&Expression> = vec![expr];
1276
1277    while let Some(current) = stack.pop() {
1278        match current {
1279            // === Leaf: collect Column references ===
1280            Expression::Column(col) => {
1281                refs.push(SimpleColumnRef {
1282                    table: col.table.clone(),
1283                    column: col.name.name.clone(),
1284                });
1285            }
1286
1287            // === Boundary: don't recurse into subqueries (handled separately) ===
1288            Expression::Subquery(_) | Expression::Exists(_) => {}
1289
1290            // === BinaryOp variants: left, right ===
1291            Expression::And(op)
1292            | Expression::Or(op)
1293            | Expression::Eq(op)
1294            | Expression::Neq(op)
1295            | Expression::Lt(op)
1296            | Expression::Lte(op)
1297            | Expression::Gt(op)
1298            | Expression::Gte(op)
1299            | Expression::Add(op)
1300            | Expression::Sub(op)
1301            | Expression::Mul(op)
1302            | Expression::Div(op)
1303            | Expression::Mod(op)
1304            | Expression::BitwiseAnd(op)
1305            | Expression::BitwiseOr(op)
1306            | Expression::BitwiseXor(op)
1307            | Expression::BitwiseLeftShift(op)
1308            | Expression::BitwiseRightShift(op)
1309            | Expression::Concat(op)
1310            | Expression::Adjacent(op)
1311            | Expression::TsMatch(op)
1312            | Expression::PropertyEQ(op)
1313            | Expression::ArrayContainsAll(op)
1314            | Expression::ArrayContainedBy(op)
1315            | Expression::ArrayOverlaps(op)
1316            | Expression::JSONBContainsAllTopKeys(op)
1317            | Expression::JSONBContainsAnyTopKeys(op)
1318            | Expression::JSONBDeleteAtPath(op)
1319            | Expression::ExtendsLeft(op)
1320            | Expression::ExtendsRight(op)
1321            | Expression::Is(op)
1322            | Expression::MemberOf(op)
1323            | Expression::NullSafeEq(op)
1324            | Expression::NullSafeNeq(op)
1325            | Expression::Glob(op)
1326            | Expression::Match(op) => {
1327                stack.push(&op.left);
1328                stack.push(&op.right);
1329            }
1330
1331            // === UnaryOp variants: this ===
1332            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1333                stack.push(&u.this);
1334            }
1335
1336            // === UnaryFunc variants: this ===
1337            Expression::Upper(f)
1338            | Expression::Lower(f)
1339            | Expression::Length(f)
1340            | Expression::LTrim(f)
1341            | Expression::RTrim(f)
1342            | Expression::Reverse(f)
1343            | Expression::Abs(f)
1344            | Expression::Sqrt(f)
1345            | Expression::Cbrt(f)
1346            | Expression::Ln(f)
1347            | Expression::Exp(f)
1348            | Expression::Sign(f)
1349            | Expression::Date(f)
1350            | Expression::Time(f)
1351            | Expression::DateFromUnixDate(f)
1352            | Expression::UnixDate(f)
1353            | Expression::UnixSeconds(f)
1354            | Expression::UnixMillis(f)
1355            | Expression::UnixMicros(f)
1356            | Expression::TimeStrToDate(f)
1357            | Expression::DateToDi(f)
1358            | Expression::DiToDate(f)
1359            | Expression::TsOrDiToDi(f)
1360            | Expression::TsOrDsToDatetime(f)
1361            | Expression::TsOrDsToTimestamp(f)
1362            | Expression::YearOfWeek(f)
1363            | Expression::YearOfWeekIso(f)
1364            | Expression::Initcap(f)
1365            | Expression::Ascii(f)
1366            | Expression::Chr(f)
1367            | Expression::Soundex(f)
1368            | Expression::ByteLength(f)
1369            | Expression::Hex(f)
1370            | Expression::LowerHex(f)
1371            | Expression::Unicode(f)
1372            | Expression::Radians(f)
1373            | Expression::Degrees(f)
1374            | Expression::Sin(f)
1375            | Expression::Cos(f)
1376            | Expression::Tan(f)
1377            | Expression::Asin(f)
1378            | Expression::Acos(f)
1379            | Expression::Atan(f)
1380            | Expression::IsNan(f)
1381            | Expression::IsInf(f)
1382            | Expression::ArrayLength(f)
1383            | Expression::ArraySize(f)
1384            | Expression::Cardinality(f)
1385            | Expression::ArrayReverse(f)
1386            | Expression::ArrayDistinct(f)
1387            | Expression::ArrayFlatten(f)
1388            | Expression::ArrayCompact(f)
1389            | Expression::Explode(f)
1390            | Expression::ExplodeOuter(f)
1391            | Expression::ToArray(f)
1392            | Expression::MapFromEntries(f)
1393            | Expression::MapKeys(f)
1394            | Expression::MapValues(f)
1395            | Expression::JsonArrayLength(f)
1396            | Expression::JsonKeys(f)
1397            | Expression::JsonType(f)
1398            | Expression::ParseJson(f)
1399            | Expression::ToJson(f)
1400            | Expression::Typeof(f)
1401            | Expression::BitwiseCount(f)
1402            | Expression::Year(f)
1403            | Expression::Month(f)
1404            | Expression::Day(f)
1405            | Expression::Hour(f)
1406            | Expression::Minute(f)
1407            | Expression::Second(f)
1408            | Expression::DayOfWeek(f)
1409            | Expression::DayOfWeekIso(f)
1410            | Expression::DayOfMonth(f)
1411            | Expression::DayOfYear(f)
1412            | Expression::WeekOfYear(f)
1413            | Expression::Quarter(f)
1414            | Expression::Epoch(f)
1415            | Expression::EpochMs(f)
1416            | Expression::TimeStrToUnix(f)
1417            | Expression::SHA(f)
1418            | Expression::SHA1Digest(f)
1419            | Expression::TimeToUnix(f)
1420            | Expression::JSONBool(f)
1421            | Expression::Int64(f)
1422            | Expression::MD5NumberLower64(f)
1423            | Expression::MD5NumberUpper64(f)
1424            | Expression::DateStrToDate(f)
1425            | Expression::DateToDateStr(f) => {
1426                stack.push(&f.this);
1427            }
1428
1429            // === BinaryFunc variants: this, expression ===
1430            Expression::Power(f)
1431            | Expression::NullIf(f)
1432            | Expression::IfNull(f)
1433            | Expression::Nvl(f)
1434            | Expression::UnixToTimeStr(f)
1435            | Expression::Contains(f)
1436            | Expression::StartsWith(f)
1437            | Expression::EndsWith(f)
1438            | Expression::Levenshtein(f)
1439            | Expression::ModFunc(f)
1440            | Expression::Atan2(f)
1441            | Expression::IntDiv(f)
1442            | Expression::AddMonths(f)
1443            | Expression::MonthsBetween(f)
1444            | Expression::NextDay(f)
1445            | Expression::ArrayContains(f)
1446            | Expression::ArrayPosition(f)
1447            | Expression::ArrayAppend(f)
1448            | Expression::ArrayPrepend(f)
1449            | Expression::ArrayUnion(f)
1450            | Expression::ArrayExcept(f)
1451            | Expression::ArrayRemove(f)
1452            | Expression::StarMap(f)
1453            | Expression::MapFromArrays(f)
1454            | Expression::MapContainsKey(f)
1455            | Expression::ElementAt(f)
1456            | Expression::JsonMergePatch(f)
1457            | Expression::JSONBContains(f)
1458            | Expression::JSONBExtract(f) => {
1459                stack.push(&f.this);
1460                stack.push(&f.expression);
1461            }
1462
1463            // === VarArgFunc variants: expressions ===
1464            Expression::Greatest(f)
1465            | Expression::Least(f)
1466            | Expression::Coalesce(f)
1467            | Expression::ArrayConcat(f)
1468            | Expression::ArrayIntersect(f)
1469            | Expression::ArrayZip(f)
1470            | Expression::MapConcat(f)
1471            | Expression::JsonArray(f) => {
1472                for e in &f.expressions {
1473                    stack.push(e);
1474                }
1475            }
1476
1477            // === AggFunc variants: this, filter, having_max, limit ===
1478            Expression::Sum(f)
1479            | Expression::Avg(f)
1480            | Expression::Min(f)
1481            | Expression::Max(f)
1482            | Expression::ArrayAgg(f)
1483            | Expression::CountIf(f)
1484            | Expression::Stddev(f)
1485            | Expression::StddevPop(f)
1486            | Expression::StddevSamp(f)
1487            | Expression::Variance(f)
1488            | Expression::VarPop(f)
1489            | Expression::VarSamp(f)
1490            | Expression::Median(f)
1491            | Expression::Mode(f)
1492            | Expression::First(f)
1493            | Expression::Last(f)
1494            | Expression::AnyValue(f)
1495            | Expression::ApproxDistinct(f)
1496            | Expression::ApproxCountDistinct(f)
1497            | Expression::LogicalAnd(f)
1498            | Expression::LogicalOr(f)
1499            | Expression::Skewness(f)
1500            | Expression::ArrayConcatAgg(f)
1501            | Expression::ArrayUniqueAgg(f)
1502            | Expression::BoolXorAgg(f)
1503            | Expression::BitwiseAndAgg(f)
1504            | Expression::BitwiseOrAgg(f)
1505            | Expression::BitwiseXorAgg(f) => {
1506                stack.push(&f.this);
1507                if let Some(ref filter) = f.filter {
1508                    stack.push(filter);
1509                }
1510                if let Some((ref expr, _)) = f.having_max {
1511                    stack.push(expr);
1512                }
1513                if let Some(ref limit) = f.limit {
1514                    stack.push(limit);
1515                }
1516            }
1517
1518            // === Generic Function / AggregateFunction: args ===
1519            Expression::Function(func) => {
1520                for arg in &func.args {
1521                    stack.push(arg);
1522                }
1523            }
1524            Expression::AggregateFunction(func) => {
1525                for arg in &func.args {
1526                    stack.push(arg);
1527                }
1528                if let Some(ref filter) = func.filter {
1529                    stack.push(filter);
1530                }
1531                if let Some(ref limit) = func.limit {
1532                    stack.push(limit);
1533                }
1534            }
1535
1536            // === WindowFunction: this (skip Over for lineage purposes) ===
1537            Expression::WindowFunction(wf) => {
1538                stack.push(&wf.this);
1539            }
1540
1541            // === Containers and special expressions ===
1542            Expression::Alias(a) => {
1543                stack.push(&a.this);
1544            }
1545            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1546                stack.push(&c.this);
1547                if let Some(ref fmt) = c.format {
1548                    stack.push(fmt);
1549                }
1550                if let Some(ref def) = c.default {
1551                    stack.push(def);
1552                }
1553            }
1554            Expression::Paren(p) => {
1555                stack.push(&p.this);
1556            }
1557            Expression::Annotated(a) => {
1558                stack.push(&a.this);
1559            }
1560            Expression::Case(case) => {
1561                if let Some(ref operand) = case.operand {
1562                    stack.push(operand);
1563                }
1564                for (cond, result) in &case.whens {
1565                    stack.push(cond);
1566                    stack.push(result);
1567                }
1568                if let Some(ref else_expr) = case.else_ {
1569                    stack.push(else_expr);
1570                }
1571            }
1572            Expression::Collation(c) => {
1573                stack.push(&c.this);
1574            }
1575            Expression::In(i) => {
1576                stack.push(&i.this);
1577                for e in &i.expressions {
1578                    stack.push(e);
1579                }
1580                if let Some(ref q) = i.query {
1581                    stack.push(q);
1582                }
1583                if let Some(ref u) = i.unnest {
1584                    stack.push(u);
1585                }
1586            }
1587            Expression::Between(b) => {
1588                stack.push(&b.this);
1589                stack.push(&b.low);
1590                stack.push(&b.high);
1591            }
1592            Expression::IsNull(n) => {
1593                stack.push(&n.this);
1594            }
1595            Expression::IsTrue(t) | Expression::IsFalse(t) => {
1596                stack.push(&t.this);
1597            }
1598            Expression::IsJson(j) => {
1599                stack.push(&j.this);
1600            }
1601            Expression::Like(l) | Expression::ILike(l) => {
1602                stack.push(&l.left);
1603                stack.push(&l.right);
1604                if let Some(ref esc) = l.escape {
1605                    stack.push(esc);
1606                }
1607            }
1608            Expression::SimilarTo(s) => {
1609                stack.push(&s.this);
1610                stack.push(&s.pattern);
1611                if let Some(ref esc) = s.escape {
1612                    stack.push(esc);
1613                }
1614            }
1615            Expression::Ordered(o) => {
1616                stack.push(&o.this);
1617            }
1618            Expression::Array(a) => {
1619                for e in &a.expressions {
1620                    stack.push(e);
1621                }
1622            }
1623            Expression::Tuple(t) => {
1624                for e in &t.expressions {
1625                    stack.push(e);
1626                }
1627            }
1628            Expression::Struct(s) => {
1629                for (_, e) in &s.fields {
1630                    stack.push(e);
1631                }
1632            }
1633            Expression::Subscript(s) => {
1634                stack.push(&s.this);
1635                stack.push(&s.index);
1636            }
1637            Expression::Dot(d) => {
1638                stack.push(&d.this);
1639            }
1640            Expression::MethodCall(m) => {
1641                stack.push(&m.this);
1642                for arg in &m.args {
1643                    stack.push(arg);
1644                }
1645            }
1646            Expression::ArraySlice(s) => {
1647                stack.push(&s.this);
1648                if let Some(ref start) = s.start {
1649                    stack.push(start);
1650                }
1651                if let Some(ref end) = s.end {
1652                    stack.push(end);
1653                }
1654            }
1655            Expression::Lambda(l) => {
1656                stack.push(&l.body);
1657            }
1658            Expression::NamedArgument(n) => {
1659                stack.push(&n.value);
1660            }
1661            Expression::TryCatch(t) => {
1662                for stmt in &t.try_body {
1663                    stack.push(stmt);
1664                }
1665                if let Some(catch_body) = &t.catch_body {
1666                    for stmt in catch_body {
1667                        stack.push(stmt);
1668                    }
1669                }
1670            }
1671            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
1672                stack.push(e);
1673            }
1674
1675            // === Custom function structs ===
1676            Expression::Substring(f) => {
1677                stack.push(&f.this);
1678                stack.push(&f.start);
1679                if let Some(ref len) = f.length {
1680                    stack.push(len);
1681                }
1682            }
1683            Expression::Trim(f) => {
1684                stack.push(&f.this);
1685                if let Some(ref chars) = f.characters {
1686                    stack.push(chars);
1687                }
1688            }
1689            Expression::Replace(f) => {
1690                stack.push(&f.this);
1691                stack.push(&f.old);
1692                stack.push(&f.new);
1693            }
1694            Expression::IfFunc(f) => {
1695                stack.push(&f.condition);
1696                stack.push(&f.true_value);
1697                if let Some(ref fv) = f.false_value {
1698                    stack.push(fv);
1699                }
1700            }
1701            Expression::Nvl2(f) => {
1702                stack.push(&f.this);
1703                stack.push(&f.true_value);
1704                stack.push(&f.false_value);
1705            }
1706            Expression::ConcatWs(f) => {
1707                stack.push(&f.separator);
1708                for e in &f.expressions {
1709                    stack.push(e);
1710                }
1711            }
1712            Expression::Count(f) => {
1713                if let Some(ref this) = f.this {
1714                    stack.push(this);
1715                }
1716                if let Some(ref filter) = f.filter {
1717                    stack.push(filter);
1718                }
1719            }
1720            Expression::GroupConcat(f) => {
1721                stack.push(&f.this);
1722                if let Some(ref sep) = f.separator {
1723                    stack.push(sep);
1724                }
1725                if let Some(ref filter) = f.filter {
1726                    stack.push(filter);
1727                }
1728            }
1729            Expression::StringAgg(f) => {
1730                stack.push(&f.this);
1731                if let Some(ref sep) = f.separator {
1732                    stack.push(sep);
1733                }
1734                if let Some(ref filter) = f.filter {
1735                    stack.push(filter);
1736                }
1737                if let Some(ref limit) = f.limit {
1738                    stack.push(limit);
1739                }
1740            }
1741            Expression::ListAgg(f) => {
1742                stack.push(&f.this);
1743                if let Some(ref sep) = f.separator {
1744                    stack.push(sep);
1745                }
1746                if let Some(ref filter) = f.filter {
1747                    stack.push(filter);
1748                }
1749            }
1750            Expression::SumIf(f) => {
1751                stack.push(&f.this);
1752                stack.push(&f.condition);
1753                if let Some(ref filter) = f.filter {
1754                    stack.push(filter);
1755                }
1756            }
1757            Expression::DateAdd(f) | Expression::DateSub(f) => {
1758                stack.push(&f.this);
1759                stack.push(&f.interval);
1760            }
1761            Expression::DateDiff(f) => {
1762                stack.push(&f.this);
1763                stack.push(&f.expression);
1764            }
1765            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
1766                stack.push(&f.this);
1767            }
1768            Expression::Extract(f) => {
1769                stack.push(&f.this);
1770            }
1771            Expression::Round(f) => {
1772                stack.push(&f.this);
1773                if let Some(ref d) = f.decimals {
1774                    stack.push(d);
1775                }
1776            }
1777            Expression::Floor(f) => {
1778                stack.push(&f.this);
1779                if let Some(ref s) = f.scale {
1780                    stack.push(s);
1781                }
1782                if let Some(ref t) = f.to {
1783                    stack.push(t);
1784                }
1785            }
1786            Expression::Ceil(f) => {
1787                stack.push(&f.this);
1788                if let Some(ref d) = f.decimals {
1789                    stack.push(d);
1790                }
1791                if let Some(ref t) = f.to {
1792                    stack.push(t);
1793                }
1794            }
1795            Expression::Log(f) => {
1796                stack.push(&f.this);
1797                if let Some(ref b) = f.base {
1798                    stack.push(b);
1799                }
1800            }
1801            Expression::AtTimeZone(f) => {
1802                stack.push(&f.this);
1803                stack.push(&f.zone);
1804            }
1805            Expression::Lead(f) | Expression::Lag(f) => {
1806                stack.push(&f.this);
1807                if let Some(ref off) = f.offset {
1808                    stack.push(off);
1809                }
1810                if let Some(ref def) = f.default {
1811                    stack.push(def);
1812                }
1813            }
1814            Expression::FirstValue(f) | Expression::LastValue(f) => {
1815                stack.push(&f.this);
1816            }
1817            Expression::NthValue(f) => {
1818                stack.push(&f.this);
1819                stack.push(&f.offset);
1820            }
1821            Expression::Position(f) => {
1822                stack.push(&f.substring);
1823                stack.push(&f.string);
1824                if let Some(ref start) = f.start {
1825                    stack.push(start);
1826                }
1827            }
1828            Expression::Decode(f) => {
1829                stack.push(&f.this);
1830                for (search, result) in &f.search_results {
1831                    stack.push(search);
1832                    stack.push(result);
1833                }
1834                if let Some(ref def) = f.default {
1835                    stack.push(def);
1836                }
1837            }
1838            Expression::CharFunc(f) => {
1839                for arg in &f.args {
1840                    stack.push(arg);
1841                }
1842            }
1843            Expression::ArraySort(f) => {
1844                stack.push(&f.this);
1845                if let Some(ref cmp) = f.comparator {
1846                    stack.push(cmp);
1847                }
1848            }
1849            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
1850                stack.push(&f.this);
1851                stack.push(&f.separator);
1852                if let Some(ref nr) = f.null_replacement {
1853                    stack.push(nr);
1854                }
1855            }
1856            Expression::ArrayFilter(f) => {
1857                stack.push(&f.this);
1858                stack.push(&f.filter);
1859            }
1860            Expression::ArrayTransform(f) => {
1861                stack.push(&f.this);
1862                stack.push(&f.transform);
1863            }
1864            Expression::Sequence(f)
1865            | Expression::Generate(f)
1866            | Expression::ExplodingGenerateSeries(f) => {
1867                stack.push(&f.start);
1868                stack.push(&f.stop);
1869                if let Some(ref step) = f.step {
1870                    stack.push(step);
1871                }
1872            }
1873            Expression::JsonExtract(f)
1874            | Expression::JsonExtractScalar(f)
1875            | Expression::JsonQuery(f)
1876            | Expression::JsonValue(f) => {
1877                stack.push(&f.this);
1878                stack.push(&f.path);
1879            }
1880            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
1881                stack.push(&f.this);
1882                for p in &f.paths {
1883                    stack.push(p);
1884                }
1885            }
1886            Expression::JsonObject(f) => {
1887                for (k, v) in &f.pairs {
1888                    stack.push(k);
1889                    stack.push(v);
1890                }
1891            }
1892            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
1893                stack.push(&f.this);
1894                for (path, val) in &f.path_values {
1895                    stack.push(path);
1896                    stack.push(val);
1897                }
1898            }
1899            Expression::Overlay(f) => {
1900                stack.push(&f.this);
1901                stack.push(&f.replacement);
1902                stack.push(&f.from);
1903                if let Some(ref len) = f.length {
1904                    stack.push(len);
1905                }
1906            }
1907            Expression::Convert(f) => {
1908                stack.push(&f.this);
1909                if let Some(ref style) = f.style {
1910                    stack.push(style);
1911                }
1912            }
1913            Expression::ApproxPercentile(f) => {
1914                stack.push(&f.this);
1915                stack.push(&f.percentile);
1916                if let Some(ref acc) = f.accuracy {
1917                    stack.push(acc);
1918                }
1919                if let Some(ref filter) = f.filter {
1920                    stack.push(filter);
1921                }
1922            }
1923            Expression::Percentile(f)
1924            | Expression::PercentileCont(f)
1925            | Expression::PercentileDisc(f) => {
1926                stack.push(&f.this);
1927                stack.push(&f.percentile);
1928                if let Some(ref filter) = f.filter {
1929                    stack.push(filter);
1930                }
1931            }
1932            Expression::WithinGroup(f) => {
1933                stack.push(&f.this);
1934            }
1935            Expression::Left(f) | Expression::Right(f) => {
1936                stack.push(&f.this);
1937                stack.push(&f.length);
1938            }
1939            Expression::Repeat(f) => {
1940                stack.push(&f.this);
1941                stack.push(&f.times);
1942            }
1943            Expression::Lpad(f) | Expression::Rpad(f) => {
1944                stack.push(&f.this);
1945                stack.push(&f.length);
1946                if let Some(ref fill) = f.fill {
1947                    stack.push(fill);
1948                }
1949            }
1950            Expression::Split(f) => {
1951                stack.push(&f.this);
1952                stack.push(&f.delimiter);
1953            }
1954            Expression::RegexpLike(f) => {
1955                stack.push(&f.this);
1956                stack.push(&f.pattern);
1957                if let Some(ref flags) = f.flags {
1958                    stack.push(flags);
1959                }
1960            }
1961            Expression::RegexpReplace(f) => {
1962                stack.push(&f.this);
1963                stack.push(&f.pattern);
1964                stack.push(&f.replacement);
1965                if let Some(ref flags) = f.flags {
1966                    stack.push(flags);
1967                }
1968            }
1969            Expression::RegexpExtract(f) => {
1970                stack.push(&f.this);
1971                stack.push(&f.pattern);
1972                if let Some(ref group) = f.group {
1973                    stack.push(group);
1974                }
1975            }
1976            Expression::ToDate(f) => {
1977                stack.push(&f.this);
1978                if let Some(ref fmt) = f.format {
1979                    stack.push(fmt);
1980                }
1981            }
1982            Expression::ToTimestamp(f) => {
1983                stack.push(&f.this);
1984                if let Some(ref fmt) = f.format {
1985                    stack.push(fmt);
1986                }
1987            }
1988            Expression::DateFormat(f) | Expression::FormatDate(f) => {
1989                stack.push(&f.this);
1990                stack.push(&f.format);
1991            }
1992            Expression::LastDay(f) => {
1993                stack.push(&f.this);
1994            }
1995            Expression::FromUnixtime(f) => {
1996                stack.push(&f.this);
1997                if let Some(ref fmt) = f.format {
1998                    stack.push(fmt);
1999                }
2000            }
2001            Expression::UnixTimestamp(f) => {
2002                if let Some(ref this) = f.this {
2003                    stack.push(this);
2004                }
2005                if let Some(ref fmt) = f.format {
2006                    stack.push(fmt);
2007                }
2008            }
2009            Expression::MakeDate(f) => {
2010                stack.push(&f.year);
2011                stack.push(&f.month);
2012                stack.push(&f.day);
2013            }
2014            Expression::MakeTimestamp(f) => {
2015                stack.push(&f.year);
2016                stack.push(&f.month);
2017                stack.push(&f.day);
2018                stack.push(&f.hour);
2019                stack.push(&f.minute);
2020                stack.push(&f.second);
2021                if let Some(ref tz) = f.timezone {
2022                    stack.push(tz);
2023                }
2024            }
2025            Expression::TruncFunc(f) => {
2026                stack.push(&f.this);
2027                if let Some(ref d) = f.decimals {
2028                    stack.push(d);
2029                }
2030            }
2031            Expression::ArrayFunc(f) => {
2032                for e in &f.expressions {
2033                    stack.push(e);
2034                }
2035            }
2036            Expression::Unnest(f) => {
2037                stack.push(&f.this);
2038                for e in &f.expressions {
2039                    stack.push(e);
2040                }
2041            }
2042            Expression::StructFunc(f) => {
2043                for (_, e) in &f.fields {
2044                    stack.push(e);
2045                }
2046            }
2047            Expression::StructExtract(f) => {
2048                stack.push(&f.this);
2049            }
2050            Expression::NamedStruct(f) => {
2051                for (k, v) in &f.pairs {
2052                    stack.push(k);
2053                    stack.push(v);
2054                }
2055            }
2056            Expression::MapFunc(f) => {
2057                for k in &f.keys {
2058                    stack.push(k);
2059                }
2060                for v in &f.values {
2061                    stack.push(v);
2062                }
2063            }
2064            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2065                stack.push(&f.this);
2066                stack.push(&f.transform);
2067            }
2068            Expression::JsonArrayAgg(f) => {
2069                stack.push(&f.this);
2070                if let Some(ref filter) = f.filter {
2071                    stack.push(filter);
2072                }
2073            }
2074            Expression::JsonObjectAgg(f) => {
2075                stack.push(&f.key);
2076                stack.push(&f.value);
2077                if let Some(ref filter) = f.filter {
2078                    stack.push(filter);
2079                }
2080            }
2081            Expression::NTile(f) => {
2082                if let Some(ref n) = f.num_buckets {
2083                    stack.push(n);
2084                }
2085            }
2086            Expression::Rand(f) => {
2087                if let Some(ref s) = f.seed {
2088                    stack.push(s);
2089                }
2090                if let Some(ref lo) = f.lower {
2091                    stack.push(lo);
2092                }
2093                if let Some(ref hi) = f.upper {
2094                    stack.push(hi);
2095                }
2096            }
2097            Expression::Any(q) | Expression::All(q) => {
2098                stack.push(&q.this);
2099                stack.push(&q.subquery);
2100            }
2101            Expression::Overlaps(o) => {
2102                if let Some(ref this) = o.this {
2103                    stack.push(this);
2104                }
2105                if let Some(ref expr) = o.expression {
2106                    stack.push(expr);
2107                }
2108                if let Some(ref ls) = o.left_start {
2109                    stack.push(ls);
2110                }
2111                if let Some(ref le) = o.left_end {
2112                    stack.push(le);
2113                }
2114                if let Some(ref rs) = o.right_start {
2115                    stack.push(rs);
2116                }
2117                if let Some(ref re) = o.right_end {
2118                    stack.push(re);
2119                }
2120            }
2121            Expression::Interval(i) => {
2122                if let Some(ref this) = i.this {
2123                    stack.push(this);
2124                }
2125            }
2126            Expression::TimeStrToTime(f) => {
2127                stack.push(&f.this);
2128                if let Some(ref zone) = f.zone {
2129                    stack.push(zone);
2130                }
2131            }
2132            Expression::JSONBExtractScalar(f) => {
2133                stack.push(&f.this);
2134                stack.push(&f.expression);
2135                if let Some(ref jt) = f.json_type {
2136                    stack.push(jt);
2137                }
2138            }
2139
2140            // === True leaves and non-expression-bearing nodes ===
2141            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
2142            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
2143            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
2144            _ => {}
2145        }
2146    }
2147}
2148
2149// ---------------------------------------------------------------------------
2150// Tests
2151// ---------------------------------------------------------------------------
2152
2153#[cfg(test)]
2154mod tests {
2155    use super::*;
2156    use crate::dialects::{Dialect, DialectType};
2157    use crate::expressions::DataType;
2158    use crate::optimizer::annotate_types::annotate_types;
2159    use crate::parse_one;
2160    use crate::schema::{MappingSchema, Schema};
2161
2162    fn parse(sql: &str) -> Expression {
2163        let dialect = Dialect::get(DialectType::Generic);
2164        let ast = dialect.parse(sql).unwrap();
2165        ast.into_iter().next().unwrap()
2166    }
2167
2168    #[test]
2169    fn test_simple_lineage() {
2170        let expr = parse("SELECT a FROM t");
2171        let node = lineage("a", &expr, None, false).unwrap();
2172
2173        assert_eq!(node.name, "a");
2174        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2175        // Should trace to t.a
2176        let names = node.downstream_names();
2177        assert!(
2178            names.iter().any(|n| n == "t.a"),
2179            "Expected t.a in downstream, got: {:?}",
2180            names
2181        );
2182    }
2183
2184    #[test]
2185    fn test_lineage_walk() {
2186        let root = LineageNode {
2187            name: "col_a".to_string(),
2188            expression: Expression::Null(crate::expressions::Null),
2189            source: Expression::Null(crate::expressions::Null),
2190            downstream: vec![LineageNode::new(
2191                "t.a",
2192                Expression::Null(crate::expressions::Null),
2193                Expression::Null(crate::expressions::Null),
2194            )],
2195            source_name: String::new(),
2196            reference_node_name: String::new(),
2197        };
2198
2199        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2200        assert_eq!(names.len(), 2);
2201        assert_eq!(names[0], "col_a");
2202        assert_eq!(names[1], "t.a");
2203    }
2204
2205    #[test]
2206    fn test_aliased_column() {
2207        let expr = parse("SELECT a + 1 AS b FROM t");
2208        let node = lineage("b", &expr, None, false).unwrap();
2209
2210        assert_eq!(node.name, "b");
2211        // Should trace through the expression to t.a
2212        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2213        assert!(
2214            all_names.iter().any(|n| n.contains("a")),
2215            "Expected to trace to column a, got: {:?}",
2216            all_names
2217        );
2218    }
2219
2220    #[test]
2221    fn test_qualified_column() {
2222        let expr = parse("SELECT t.a FROM t");
2223        let node = lineage("a", &expr, None, false).unwrap();
2224
2225        assert_eq!(node.name, "a");
2226        let names = node.downstream_names();
2227        assert!(
2228            names.iter().any(|n| n == "t.a"),
2229            "Expected t.a, got: {:?}",
2230            names
2231        );
2232    }
2233
2234    #[test]
2235    fn test_unqualified_column() {
2236        let expr = parse("SELECT a FROM t");
2237        let node = lineage("a", &expr, None, false).unwrap();
2238
2239        // Unqualified but single source → resolved to t.a
2240        let names = node.downstream_names();
2241        assert!(
2242            names.iter().any(|n| n == "t.a"),
2243            "Expected t.a, got: {:?}",
2244            names
2245        );
2246    }
2247
2248    #[test]
2249    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2250        let query = "SELECT name FROM users";
2251        let dialect = Dialect::get(DialectType::BigQuery);
2252        let expr = dialect
2253            .parse(query)
2254            .unwrap()
2255            .into_iter()
2256            .next()
2257            .expect("expected one expression");
2258
2259        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2260        schema
2261            .add_table("users", &[("name".into(), DataType::Text)], None)
2262            .expect("schema setup");
2263
2264        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2265            .expect("lineage without schema");
2266        let mut expr_without = node_without_schema.expression.clone();
2267        annotate_types(
2268            &mut expr_without,
2269            Some(&schema),
2270            Some(DialectType::BigQuery),
2271        );
2272        assert_eq!(
2273            expr_without.inferred_type(),
2274            None,
2275            "Expected unresolved root type without schema-aware lineage qualification"
2276        );
2277
2278        let node_with_schema = lineage_with_schema(
2279            "name",
2280            &expr,
2281            Some(&schema),
2282            Some(DialectType::BigQuery),
2283            false,
2284        )
2285        .expect("lineage with schema");
2286        let mut expr_with = node_with_schema.expression.clone();
2287        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2288
2289        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2290    }
2291
2292    #[test]
2293    fn test_lineage_with_schema_correlated_scalar_subquery() {
2294        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2295        let dialect = Dialect::get(DialectType::BigQuery);
2296        let expr = dialect
2297            .parse(query)
2298            .unwrap()
2299            .into_iter()
2300            .next()
2301            .expect("expected one expression");
2302
2303        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2304        schema
2305            .add_table(
2306                "t1",
2307                &[("id".into(), DataType::BigInt { length: None })],
2308                None,
2309            )
2310            .expect("schema setup");
2311        schema
2312            .add_table(
2313                "t2",
2314                &[
2315                    ("id".into(), DataType::BigInt { length: None }),
2316                    ("val".into(), DataType::BigInt { length: None }),
2317                ],
2318                None,
2319            )
2320            .expect("schema setup");
2321
2322        let node = lineage_with_schema(
2323            "id",
2324            &expr,
2325            Some(&schema),
2326            Some(DialectType::BigQuery),
2327            false,
2328        )
2329        .expect("lineage_with_schema should handle correlated scalar subqueries");
2330
2331        assert_eq!(node.name, "id");
2332    }
2333
2334    #[test]
2335    fn test_lineage_with_schema_join_using() {
2336        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2337        let dialect = Dialect::get(DialectType::BigQuery);
2338        let expr = dialect
2339            .parse(query)
2340            .unwrap()
2341            .into_iter()
2342            .next()
2343            .expect("expected one expression");
2344
2345        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2346        schema
2347            .add_table(
2348                "t1",
2349                &[("a".into(), DataType::BigInt { length: None })],
2350                None,
2351            )
2352            .expect("schema setup");
2353        schema
2354            .add_table(
2355                "t2",
2356                &[("a".into(), DataType::BigInt { length: None })],
2357                None,
2358            )
2359            .expect("schema setup");
2360
2361        let node = lineage_with_schema(
2362            "a",
2363            &expr,
2364            Some(&schema),
2365            Some(DialectType::BigQuery),
2366            false,
2367        )
2368        .expect("lineage_with_schema should handle JOIN USING");
2369
2370        assert_eq!(node.name, "a");
2371    }
2372
2373    #[test]
2374    fn test_lineage_with_schema_qualified_table_name() {
2375        let query = "SELECT a FROM raw.t1";
2376        let dialect = Dialect::get(DialectType::BigQuery);
2377        let expr = dialect
2378            .parse(query)
2379            .unwrap()
2380            .into_iter()
2381            .next()
2382            .expect("expected one expression");
2383
2384        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2385        schema
2386            .add_table(
2387                "raw.t1",
2388                &[("a".into(), DataType::BigInt { length: None })],
2389                None,
2390            )
2391            .expect("schema setup");
2392
2393        let node = lineage_with_schema(
2394            "a",
2395            &expr,
2396            Some(&schema),
2397            Some(DialectType::BigQuery),
2398            false,
2399        )
2400        .expect("lineage_with_schema should handle dotted schema.table names");
2401
2402        assert_eq!(node.name, "a");
2403    }
2404
2405    #[test]
2406    fn test_lineage_with_schema_none_matches_lineage() {
2407        let expr = parse("SELECT a FROM t");
2408        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2409        let with_none =
2410            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2411
2412        assert_eq!(with_none.name, baseline.name);
2413        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2414    }
2415
2416    #[test]
2417    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2418        let dialect = Dialect::get(DialectType::BigQuery);
2419        let expr = dialect
2420            .parse("SELECT Name AS name FROM teams")
2421            .unwrap()
2422            .into_iter()
2423            .next()
2424            .expect("expected one expression");
2425
2426        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2427        schema
2428            .add_table(
2429                "teams",
2430                &[("Name".into(), DataType::String { length: None })],
2431                None,
2432            )
2433            .expect("schema setup");
2434
2435        let node = lineage_with_schema(
2436            "name",
2437            &expr,
2438            Some(&schema),
2439            Some(DialectType::BigQuery),
2440            false,
2441        )
2442        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2443
2444        let names = node.downstream_names();
2445        assert!(
2446            names.iter().any(|n| n == "teams.Name"),
2447            "Expected teams.Name in downstream, got: {:?}",
2448            names
2449        );
2450    }
2451
2452    #[test]
2453    fn test_lineage_bigquery_mixed_case_alias_lookup() {
2454        let dialect = Dialect::get(DialectType::BigQuery);
2455        let expr = dialect
2456            .parse("SELECT Name AS Name FROM teams")
2457            .unwrap()
2458            .into_iter()
2459            .next()
2460            .expect("expected one expression");
2461
2462        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2463            .expect("lineage should resolve mixed-case aliases in BigQuery");
2464
2465        assert_eq!(node.name, "name");
2466    }
2467
2468    #[test]
2469    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
2470        let expr = parse_one(
2471            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2472            DialectType::Snowflake,
2473        )
2474        .expect("parse");
2475
2476        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2477        schema
2478            .add_table(
2479                "fact.some_daily_metrics",
2480                &[("date_utc".to_string(), DataType::Date)],
2481                None,
2482            )
2483            .expect("schema setup");
2484
2485        let node = lineage_with_schema(
2486            "recency",
2487            &expr,
2488            Some(&schema),
2489            Some(DialectType::Snowflake),
2490            false,
2491        )
2492        .expect("lineage_with_schema should not treat date part as a column");
2493
2494        let names = node.downstream_names();
2495        assert!(
2496            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2497            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2498            names
2499        );
2500        assert!(
2501            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2502            "Did not expect date part to appear as lineage column, got: {:?}",
2503            names
2504        );
2505    }
2506
2507    #[test]
2508    fn test_snowflake_datediff_parses_to_typed_ast() {
2509        let expr = parse_one(
2510            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2511            DialectType::Snowflake,
2512        )
2513        .expect("parse");
2514
2515        match expr {
2516            Expression::Select(select) => match &select.expressions[0] {
2517                Expression::Alias(alias) => match &alias.this {
2518                    Expression::DateDiff(f) => {
2519                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
2520                    }
2521                    other => panic!("expected DateDiff, got {other:?}"),
2522                },
2523                other => panic!("expected Alias, got {other:?}"),
2524            },
2525            other => panic!("expected Select, got {other:?}"),
2526        }
2527    }
2528
2529    #[test]
2530    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
2531        let expr = parse_one(
2532            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2533            DialectType::Snowflake,
2534        )
2535        .expect("parse");
2536
2537        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2538        schema
2539            .add_table(
2540                "fact.some_daily_metrics",
2541                &[("date_utc".to_string(), DataType::Date)],
2542                None,
2543            )
2544            .expect("schema setup");
2545
2546        let node = lineage_with_schema(
2547            "next_day",
2548            &expr,
2549            Some(&schema),
2550            Some(DialectType::Snowflake),
2551            false,
2552        )
2553        .expect("lineage_with_schema should not treat DATEADD date part as a column");
2554
2555        let names = node.downstream_names();
2556        assert!(
2557            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2558            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2559            names
2560        );
2561        assert!(
2562            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2563            "Did not expect date part to appear as lineage column, got: {:?}",
2564            names
2565        );
2566    }
2567
2568    #[test]
2569    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
2570        let expr = parse_one(
2571            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2572            DialectType::Snowflake,
2573        )
2574        .expect("parse");
2575
2576        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2577        schema
2578            .add_table(
2579                "fact.some_daily_metrics",
2580                &[("date_utc".to_string(), DataType::Date)],
2581                None,
2582            )
2583            .expect("schema setup");
2584
2585        let node = lineage_with_schema(
2586            "day_part",
2587            &expr,
2588            Some(&schema),
2589            Some(DialectType::Snowflake),
2590            false,
2591        )
2592        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
2593
2594        let names = node.downstream_names();
2595        assert!(
2596            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2597            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2598            names
2599        );
2600        assert!(
2601            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2602            "Did not expect date part to appear as lineage column, got: {:?}",
2603            names
2604        );
2605    }
2606
2607    #[test]
2608    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
2609        let expr = parse_one(
2610            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2611            DialectType::Snowflake,
2612        )
2613        .expect("parse");
2614
2615        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2616        schema
2617            .add_table(
2618                "fact.some_daily_metrics",
2619                &[("date_utc".to_string(), DataType::Date)],
2620                None,
2621            )
2622            .expect("schema setup");
2623
2624        let node = lineage_with_schema(
2625            "day_part",
2626            &expr,
2627            Some(&schema),
2628            Some(DialectType::Snowflake),
2629            false,
2630        )
2631        .expect("quoted DATE_PART should continue to work");
2632
2633        let names = node.downstream_names();
2634        assert!(
2635            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2636            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2637            names
2638        );
2639    }
2640
2641    #[test]
2642    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
2643        let expr = parse_one(
2644            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2645            DialectType::Snowflake,
2646        )
2647        .expect("parse");
2648
2649        match expr {
2650            Expression::Select(select) => match &select.expressions[0] {
2651                Expression::Alias(alias) => match &alias.this {
2652                    Expression::Function(f) => {
2653                        assert_eq!(f.name.to_uppercase(), "DATEADD");
2654                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2655                    }
2656                    other => panic!("expected generic DATEADD function, got {other:?}"),
2657                },
2658                other => panic!("expected Alias, got {other:?}"),
2659            },
2660            other => panic!("expected Select, got {other:?}"),
2661        }
2662    }
2663
2664    #[test]
2665    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
2666        let expr = parse_one(
2667            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2668            DialectType::Snowflake,
2669        )
2670        .expect("parse");
2671
2672        match expr {
2673            Expression::Select(select) => match &select.expressions[0] {
2674                Expression::Alias(alias) => match &alias.this {
2675                    Expression::Function(f) => {
2676                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2677                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2678                    }
2679                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2680                },
2681                other => panic!("expected Alias, got {other:?}"),
2682            },
2683            other => panic!("expected Select, got {other:?}"),
2684        }
2685    }
2686
2687    #[test]
2688    fn test_snowflake_date_part_string_literal_stays_generic_function() {
2689        let expr = parse_one(
2690            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2691            DialectType::Snowflake,
2692        )
2693        .expect("parse");
2694
2695        match expr {
2696            Expression::Select(select) => match &select.expressions[0] {
2697                Expression::Alias(alias) => match &alias.this {
2698                    Expression::Function(f) => {
2699                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2700                    }
2701                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2702                },
2703                other => panic!("expected Alias, got {other:?}"),
2704            },
2705            other => panic!("expected Select, got {other:?}"),
2706        }
2707    }
2708
2709    #[test]
2710    fn test_lineage_join() {
2711        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2712
2713        let node_a = lineage("a", &expr, None, false).unwrap();
2714        let names_a = node_a.downstream_names();
2715        assert!(
2716            names_a.iter().any(|n| n == "t.a"),
2717            "Expected t.a, got: {:?}",
2718            names_a
2719        );
2720
2721        let node_b = lineage("b", &expr, None, false).unwrap();
2722        let names_b = node_b.downstream_names();
2723        assert!(
2724            names_b.iter().any(|n| n == "s.b"),
2725            "Expected s.b, got: {:?}",
2726            names_b
2727        );
2728    }
2729
2730    #[test]
2731    fn test_lineage_alias_leaf_has_resolved_source_name() {
2732        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
2733        let node = lineage("col1", &expr, None, false).unwrap();
2734
2735        // Keep alias in the display lineage edge.
2736        let names = node.downstream_names();
2737        assert!(
2738            names.iter().any(|n| n == "t1.col1"),
2739            "Expected aliased column edge t1.col1, got: {:?}",
2740            names
2741        );
2742
2743        // Leaf should expose the resolved base table for consumers.
2744        let leaf = node
2745            .downstream
2746            .iter()
2747            .find(|n| n.name == "t1.col1")
2748            .expect("Expected t1.col1 leaf");
2749        assert_eq!(leaf.source_name, "table1");
2750        match &leaf.source {
2751            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
2752            _ => panic!("Expected leaf source to be a table expression"),
2753        }
2754    }
2755
2756    #[test]
2757    fn test_lineage_derived_table() {
2758        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
2759        let node = lineage("a", &expr, None, false).unwrap();
2760
2761        assert_eq!(node.name, "a");
2762        // Should trace through the derived table to t.a
2763        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2764        assert!(
2765            all_names.iter().any(|n| n == "t.a"),
2766            "Expected to trace through derived table to t.a, got: {:?}",
2767            all_names
2768        );
2769    }
2770
2771    #[test]
2772    fn test_lineage_cte() {
2773        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
2774        let node = lineage("a", &expr, None, false).unwrap();
2775
2776        assert_eq!(node.name, "a");
2777        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2778        assert!(
2779            all_names.iter().any(|n| n == "t.a"),
2780            "Expected to trace through CTE to t.a, got: {:?}",
2781            all_names
2782        );
2783    }
2784
2785    #[test]
2786    fn test_lineage_union() {
2787        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
2788        let node = lineage("a", &expr, None, false).unwrap();
2789
2790        assert_eq!(node.name, "a");
2791        // Should have 2 downstream branches
2792        assert_eq!(
2793            node.downstream.len(),
2794            2,
2795            "Expected 2 branches for UNION, got {}",
2796            node.downstream.len()
2797        );
2798    }
2799
2800    #[test]
2801    fn test_lineage_cte_union() {
2802        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
2803        let node = lineage("a", &expr, None, false).unwrap();
2804
2805        // Should trace through CTE into both UNION branches
2806        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2807        assert!(
2808            all_names.len() >= 3,
2809            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
2810            all_names
2811        );
2812    }
2813
2814    #[test]
2815    fn test_lineage_star() {
2816        let expr = parse("SELECT * FROM t");
2817        let node = lineage("*", &expr, None, false).unwrap();
2818
2819        assert_eq!(node.name, "*");
2820        // Should have downstream for table t
2821        assert!(
2822            !node.downstream.is_empty(),
2823            "Star should produce downstream nodes"
2824        );
2825    }
2826
2827    #[test]
2828    fn test_lineage_subquery_in_select() {
2829        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
2830        let node = lineage("x", &expr, None, false).unwrap();
2831
2832        assert_eq!(node.name, "x");
2833        // Should have traced into the scalar subquery
2834        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2835        assert!(
2836            all_names.len() >= 2,
2837            "Expected tracing into scalar subquery, got: {:?}",
2838            all_names
2839        );
2840    }
2841
2842    #[test]
2843    fn test_lineage_multiple_columns() {
2844        let expr = parse("SELECT a, b FROM t");
2845
2846        let node_a = lineage("a", &expr, None, false).unwrap();
2847        let node_b = lineage("b", &expr, None, false).unwrap();
2848
2849        assert_eq!(node_a.name, "a");
2850        assert_eq!(node_b.name, "b");
2851
2852        // Each should trace independently
2853        let names_a = node_a.downstream_names();
2854        let names_b = node_b.downstream_names();
2855        assert!(names_a.iter().any(|n| n == "t.a"));
2856        assert!(names_b.iter().any(|n| n == "t.b"));
2857    }
2858
2859    #[test]
2860    fn test_get_source_tables() {
2861        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2862        let node = lineage("a", &expr, None, false).unwrap();
2863
2864        let tables = get_source_tables(&node);
2865        assert!(
2866            tables.contains("t"),
2867            "Expected source table 't', got: {:?}",
2868            tables
2869        );
2870    }
2871
2872    #[test]
2873    fn test_lineage_column_not_found() {
2874        let expr = parse("SELECT a FROM t");
2875        let result = lineage("nonexistent", &expr, None, false);
2876        assert!(result.is_err());
2877    }
2878
2879    #[test]
2880    fn test_lineage_nested_cte() {
2881        let expr = parse(
2882            "WITH cte1 AS (SELECT a FROM t), \
2883             cte2 AS (SELECT a FROM cte1) \
2884             SELECT a FROM cte2",
2885        );
2886        let node = lineage("a", &expr, None, false).unwrap();
2887
2888        // Should trace through cte2 → cte1 → t
2889        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2890        assert!(
2891            all_names.len() >= 3,
2892            "Expected to trace through nested CTEs, got: {:?}",
2893            all_names
2894        );
2895    }
2896
2897    #[test]
2898    fn test_trim_selects_true() {
2899        let expr = parse("SELECT a, b, c FROM t");
2900        let node = lineage("a", &expr, None, true).unwrap();
2901
2902        // The source should be trimmed to only include 'a'
2903        if let Expression::Select(select) = &node.source {
2904            assert_eq!(
2905                select.expressions.len(),
2906                1,
2907                "Trimmed source should have 1 expression, got {}",
2908                select.expressions.len()
2909            );
2910        } else {
2911            panic!("Expected Select source");
2912        }
2913    }
2914
2915    #[test]
2916    fn test_trim_selects_false() {
2917        let expr = parse("SELECT a, b, c FROM t");
2918        let node = lineage("a", &expr, None, false).unwrap();
2919
2920        // The source should keep all columns
2921        if let Expression::Select(select) = &node.source {
2922            assert_eq!(
2923                select.expressions.len(),
2924                3,
2925                "Untrimmed source should have 3 expressions"
2926            );
2927        } else {
2928            panic!("Expected Select source");
2929        }
2930    }
2931
2932    #[test]
2933    fn test_lineage_expression_in_select() {
2934        let expr = parse("SELECT a + b AS c FROM t");
2935        let node = lineage("c", &expr, None, false).unwrap();
2936
2937        // Should trace to both a and b from t
2938        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2939        assert!(
2940            all_names.len() >= 3,
2941            "Expected to trace a + b to both columns, got: {:?}",
2942            all_names
2943        );
2944    }
2945
2946    #[test]
2947    fn test_set_operation_by_index() {
2948        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
2949
2950        // Trace column "a" which is at index 0
2951        let node = lineage("a", &expr, None, false).unwrap();
2952
2953        // UNION branches should be traced by index
2954        assert_eq!(node.downstream.len(), 2);
2955    }
2956
2957    // --- Tests for column lineage inside function calls (issue #18) ---
2958
2959    fn print_node(node: &LineageNode, indent: usize) {
2960        let pad = "  ".repeat(indent);
2961        println!(
2962            "{pad}name={:?} source_name={:?}",
2963            node.name, node.source_name
2964        );
2965        for child in &node.downstream {
2966            print_node(child, indent + 1);
2967        }
2968    }
2969
2970    #[test]
2971    fn test_issue18_repro() {
2972        // Exact scenario from the issue
2973        let query = "SELECT UPPER(name) as upper_name FROM users";
2974        println!("Query: {query}\n");
2975
2976        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
2977        let exprs = dialect.parse(query).unwrap();
2978        let expr = &exprs[0];
2979
2980        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
2981        println!("lineage(\"upper_name\"):");
2982        print_node(&node, 1);
2983
2984        let names = node.downstream_names();
2985        assert!(
2986            names.iter().any(|n| n == "users.name"),
2987            "Expected users.name in downstream, got: {:?}",
2988            names
2989        );
2990    }
2991
2992    #[test]
2993    fn test_lineage_upper_function() {
2994        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
2995        let node = lineage("upper_name", &expr, None, false).unwrap();
2996
2997        let names = node.downstream_names();
2998        assert!(
2999            names.iter().any(|n| n == "users.name"),
3000            "Expected users.name in downstream, got: {:?}",
3001            names
3002        );
3003    }
3004
3005    #[test]
3006    fn test_lineage_round_function() {
3007        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
3008        let node = lineage("rounded", &expr, None, false).unwrap();
3009
3010        let names = node.downstream_names();
3011        assert!(
3012            names.iter().any(|n| n == "products.price"),
3013            "Expected products.price in downstream, got: {:?}",
3014            names
3015        );
3016    }
3017
3018    #[test]
3019    fn test_lineage_coalesce_function() {
3020        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3021        let node = lineage("val", &expr, None, false).unwrap();
3022
3023        let names = node.downstream_names();
3024        assert!(
3025            names.iter().any(|n| n == "t.a"),
3026            "Expected t.a in downstream, got: {:?}",
3027            names
3028        );
3029        assert!(
3030            names.iter().any(|n| n == "t.b"),
3031            "Expected t.b in downstream, got: {:?}",
3032            names
3033        );
3034    }
3035
3036    #[test]
3037    fn test_lineage_count_function() {
3038        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3039        let node = lineage("cnt", &expr, None, false).unwrap();
3040
3041        let names = node.downstream_names();
3042        assert!(
3043            names.iter().any(|n| n == "t.id"),
3044            "Expected t.id in downstream, got: {:?}",
3045            names
3046        );
3047    }
3048
3049    #[test]
3050    fn test_lineage_sum_function() {
3051        let expr = parse("SELECT SUM(amount) AS total FROM t");
3052        let node = lineage("total", &expr, None, false).unwrap();
3053
3054        let names = node.downstream_names();
3055        assert!(
3056            names.iter().any(|n| n == "t.amount"),
3057            "Expected t.amount in downstream, got: {:?}",
3058            names
3059        );
3060    }
3061
3062    #[test]
3063    fn test_lineage_case_with_nested_functions() {
3064        let expr =
3065            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3066        let node = lineage("result", &expr, None, false).unwrap();
3067
3068        let names = node.downstream_names();
3069        assert!(
3070            names.iter().any(|n| n == "t.x"),
3071            "Expected t.x in downstream, got: {:?}",
3072            names
3073        );
3074        assert!(
3075            names.iter().any(|n| n == "t.name"),
3076            "Expected t.name in downstream, got: {:?}",
3077            names
3078        );
3079    }
3080
3081    #[test]
3082    fn test_lineage_substring_function() {
3083        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3084        let node = lineage("short", &expr, None, false).unwrap();
3085
3086        let names = node.downstream_names();
3087        assert!(
3088            names.iter().any(|n| n == "t.name"),
3089            "Expected t.name in downstream, got: {:?}",
3090            names
3091        );
3092    }
3093
3094    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
3095
3096    #[test]
3097    fn test_lineage_cte_select_star() {
3098        // Ported from sqlglot: test_lineage_source_with_star
3099        // WITH y AS (SELECT * FROM x) SELECT a FROM y
3100        // After star expansion: SELECT y.a AS a FROM y
3101        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3102        let node = lineage("a", &expr, None, false).unwrap();
3103
3104        assert_eq!(node.name, "a");
3105        // Should successfully resolve column 'a' through the CTE
3106        // (previously failed with "Cannot find column 'a' in query")
3107        assert!(
3108            !node.downstream.is_empty(),
3109            "Expected downstream nodes tracing through CTE, got none"
3110        );
3111    }
3112
3113    #[test]
3114    fn test_lineage_cte_select_star_renamed_column() {
3115        // dbt standard pattern: CTE with column rename + outer SELECT *
3116        // This is the primary use case for dbt projects (jaffle-shop etc.)
3117        let expr =
3118            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3119        let node = lineage("customer_id", &expr, None, false).unwrap();
3120
3121        assert_eq!(node.name, "customer_id");
3122        // Should trace customer_id → renamed CTE → source.id
3123        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3124        assert!(
3125            all_names.len() >= 2,
3126            "Expected at least 2 nodes (customer_id → source), got: {:?}",
3127            all_names
3128        );
3129    }
3130
3131    #[test]
3132    fn test_lineage_cte_select_star_multiple_columns() {
3133        // CTE exposes multiple columns, outer SELECT * should resolve each
3134        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3135
3136        for col in &["a", "b", "c"] {
3137            let node = lineage(col, &expr, None, false).unwrap();
3138            assert_eq!(node.name, *col);
3139            // Verify lineage resolves without error (star expanded to explicit columns)
3140            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3141            assert!(
3142                all_names.len() >= 2,
3143                "Expected at least 2 nodes for column {}, got: {:?}",
3144                col,
3145                all_names
3146            );
3147        }
3148    }
3149
3150    #[test]
3151    fn test_lineage_nested_cte_select_star() {
3152        // Nested CTE star expansion: cte2 references cte1 via SELECT *
3153        let expr = parse(
3154            "WITH cte1 AS (SELECT a FROM t), \
3155             cte2 AS (SELECT * FROM cte1) \
3156             SELECT * FROM cte2",
3157        );
3158        let node = lineage("a", &expr, None, false).unwrap();
3159
3160        assert_eq!(node.name, "a");
3161        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3162        assert!(
3163            all_names.len() >= 3,
3164            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3165            all_names
3166        );
3167    }
3168
3169    #[test]
3170    fn test_lineage_three_level_nested_cte_star() {
3171        // Three-level nested CTE: cte3 → cte2 → cte1 → t
3172        let expr = parse(
3173            "WITH cte1 AS (SELECT x FROM t), \
3174             cte2 AS (SELECT * FROM cte1), \
3175             cte3 AS (SELECT * FROM cte2) \
3176             SELECT * FROM cte3",
3177        );
3178        let node = lineage("x", &expr, None, false).unwrap();
3179
3180        assert_eq!(node.name, "x");
3181        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3182        assert!(
3183            all_names.len() >= 4,
3184            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3185            all_names
3186        );
3187    }
3188
3189    #[test]
3190    fn test_lineage_cte_union_star() {
3191        // CTE with UNION body, outer SELECT * should resolve from left branch
3192        let expr = parse(
3193            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3194             SELECT * FROM cte",
3195        );
3196        let node = lineage("a", &expr, None, false).unwrap();
3197
3198        assert_eq!(node.name, "a");
3199        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3200        assert!(
3201            all_names.len() >= 2,
3202            "Expected at least 2 nodes for CTE union star, got: {:?}",
3203            all_names
3204        );
3205    }
3206
3207    #[test]
3208    fn test_lineage_cte_star_unknown_table() {
3209        // When CTE references an unknown table, star expansion is skipped gracefully
3210        // and lineage falls back to normal resolution (which may fail)
3211        let expr = parse(
3212            "WITH cte AS (SELECT * FROM unknown_table) \
3213             SELECT * FROM cte",
3214        );
3215        // This should not panic — it may succeed or fail depending on resolution,
3216        // but should not crash
3217        let _result = lineage("x", &expr, None, false);
3218    }
3219
3220    #[test]
3221    fn test_lineage_cte_explicit_columns() {
3222        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
3223        let expr = parse(
3224            "WITH cte(x, y) AS (SELECT a, b FROM t) \
3225             SELECT * FROM cte",
3226        );
3227        let node = lineage("x", &expr, None, false).unwrap();
3228        assert_eq!(node.name, "x");
3229    }
3230
3231    #[test]
3232    fn test_lineage_cte_qualified_star() {
3233        // Qualified star: SELECT cte.* FROM cte
3234        let expr = parse(
3235            "WITH cte AS (SELECT a, b FROM t) \
3236             SELECT cte.* FROM cte",
3237        );
3238        for col in &["a", "b"] {
3239            let node = lineage(col, &expr, None, false).unwrap();
3240            assert_eq!(node.name, *col);
3241            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3242            assert!(
3243                all_names.len() >= 2,
3244                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3245                col,
3246                all_names
3247            );
3248        }
3249    }
3250
3251    #[test]
3252    fn test_lineage_subquery_select_star() {
3253        // Ported from sqlglot: test_select_star
3254        // SELECT x FROM (SELECT * FROM table_a)
3255        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3256        let node = lineage("x", &expr, None, false).unwrap();
3257
3258        assert_eq!(node.name, "x");
3259        assert!(
3260            !node.downstream.is_empty(),
3261            "Expected downstream nodes for subquery with SELECT *, got none"
3262        );
3263    }
3264
3265    #[test]
3266    fn test_lineage_cte_star_with_schema_external_table() {
3267        // CTE references an external table via SELECT * — schema enables expansion
3268        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3269SELECT * FROM orders"#;
3270        let expr = parse(sql);
3271
3272        let mut schema = MappingSchema::new();
3273        let cols = vec![
3274            ("order_id".to_string(), DataType::Unknown),
3275            ("customer_id".to_string(), DataType::Unknown),
3276            ("amount".to_string(), DataType::Unknown),
3277        ];
3278        schema.add_table("stg_orders", &cols, None).unwrap();
3279
3280        let node =
3281            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3282                .unwrap();
3283        assert_eq!(node.name, "order_id");
3284    }
3285
3286    #[test]
3287    fn test_lineage_cte_star_with_schema_three_part_name() {
3288        // CTE references an external table with fully-qualified 3-part name
3289        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3290SELECT * FROM orders"#;
3291        let expr = parse(sql);
3292
3293        let mut schema = MappingSchema::new();
3294        let cols = vec![
3295            ("order_id".to_string(), DataType::Unknown),
3296            ("customer_id".to_string(), DataType::Unknown),
3297        ];
3298        schema
3299            .add_table("db.schema.stg_orders", &cols, None)
3300            .unwrap();
3301
3302        let node = lineage_with_schema(
3303            "customer_id",
3304            &expr,
3305            Some(&schema as &dyn Schema),
3306            None,
3307            false,
3308        )
3309        .unwrap();
3310        assert_eq!(node.name, "customer_id");
3311    }
3312
3313    #[test]
3314    fn test_lineage_cte_star_with_schema_nested() {
3315        // Nested CTEs: outer CTE references inner CTE with SELECT *,
3316        // inner CTE references external table with SELECT *
3317        let sql = r#"WITH
3318            raw AS (SELECT * FROM external_table),
3319            enriched AS (SELECT * FROM raw)
3320        SELECT * FROM enriched"#;
3321        let expr = parse(sql);
3322
3323        let mut schema = MappingSchema::new();
3324        let cols = vec![
3325            ("id".to_string(), DataType::Unknown),
3326            ("name".to_string(), DataType::Unknown),
3327        ];
3328        schema.add_table("external_table", &cols, None).unwrap();
3329
3330        let node =
3331            lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3332        assert_eq!(node.name, "name");
3333    }
3334
3335    #[test]
3336    fn test_lineage_cte_qualified_star_with_schema() {
3337        // CTE uses qualified star (orders.*) from a CTE whose columns
3338        // come from an external table via SELECT *
3339        let sql = r#"WITH
3340            orders AS (SELECT * FROM stg_orders),
3341            enriched AS (
3342                SELECT orders.*, 'extra' AS extra
3343                FROM orders
3344            )
3345        SELECT * FROM enriched"#;
3346        let expr = parse(sql);
3347
3348        let mut schema = MappingSchema::new();
3349        let cols = vec![
3350            ("order_id".to_string(), DataType::Unknown),
3351            ("total".to_string(), DataType::Unknown),
3352        ];
3353        schema.add_table("stg_orders", &cols, None).unwrap();
3354
3355        let node =
3356            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3357                .unwrap();
3358        assert_eq!(node.name, "order_id");
3359
3360        // Also verify the extra column works
3361        let extra =
3362            lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
3363        assert_eq!(extra.name, "extra");
3364    }
3365
3366    #[test]
3367    fn test_lineage_cte_star_without_schema_still_works() {
3368        // Without schema, CTE-to-CTE star expansion still works
3369        let sql = r#"WITH
3370            cte1 AS (SELECT id, name FROM raw_table),
3371            cte2 AS (SELECT * FROM cte1)
3372        SELECT * FROM cte2"#;
3373        let expr = parse(sql);
3374
3375        // No schema — should still resolve through CTE chain
3376        let node = lineage("id", &expr, None, false).unwrap();
3377        assert_eq!(node.name, "id");
3378    }
3379
3380    #[test]
3381    fn test_lineage_nested_cte_star_with_join_and_schema() {
3382        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
3383        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
3384        let sql = r#"WITH
3385base_orders AS (
3386    SELECT * FROM stg_orders
3387),
3388with_payments AS (
3389    SELECT
3390        base_orders.*,
3391        p.amount
3392    FROM base_orders
3393    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
3394),
3395final_cte AS (
3396    SELECT * FROM with_payments
3397)
3398SELECT * FROM final_cte"#;
3399        let expr = parse(sql);
3400
3401        let mut schema = MappingSchema::new();
3402        let order_cols = vec![
3403            (
3404                "order_id".to_string(),
3405                crate::expressions::DataType::Unknown,
3406            ),
3407            (
3408                "customer_id".to_string(),
3409                crate::expressions::DataType::Unknown,
3410            ),
3411            ("status".to_string(), crate::expressions::DataType::Unknown),
3412        ];
3413        let pay_cols = vec![
3414            (
3415                "payment_id".to_string(),
3416                crate::expressions::DataType::Unknown,
3417            ),
3418            (
3419                "order_id".to_string(),
3420                crate::expressions::DataType::Unknown,
3421            ),
3422            ("amount".to_string(), crate::expressions::DataType::Unknown),
3423        ];
3424        schema.add_table("stg_orders", &order_cols, None).unwrap();
3425        schema.add_table("stg_payments", &pay_cols, None).unwrap();
3426
3427        // order_id should trace back to stg_orders
3428        let node =
3429            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
3430                .unwrap();
3431        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3432
3433        // The leaf should be "stg_orders.order_id" (not just "order_id")
3434        let has_table_qualified = all_names
3435            .iter()
3436            .any(|n| n.contains('.') && n.contains("order_id"));
3437        assert!(
3438            has_table_qualified,
3439            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
3440            all_names
3441        );
3442
3443        // amount should trace back to stg_payments
3444        let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
3445            .unwrap();
3446        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3447
3448        let has_table_qualified = all_names
3449            .iter()
3450            .any(|n| n.contains('.') && n.contains("amount"));
3451        assert!(
3452            has_table_qualified,
3453            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
3454            all_names
3455        );
3456    }
3457
3458    #[test]
3459    fn test_lineage_cte_alias_resolution() {
3460        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
3461        let sql = r#"WITH import_stg_items AS (
3462    SELECT item_id, name, status FROM stg_items
3463)
3464SELECT base.item_id, base.status
3465FROM import_stg_items AS base"#;
3466        let expr = parse(sql);
3467
3468        let node = lineage("item_id", &expr, None, false).unwrap();
3469        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3470        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
3471        assert!(
3472            all_names.iter().any(|n| n == "stg_items.item_id"),
3473            "Expected leaf 'stg_items.item_id', got: {:?}",
3474            all_names
3475        );
3476    }
3477
3478    #[test]
3479    fn test_lineage_cte_alias_with_schema_and_star() {
3480        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
3481        let sql = r#"WITH import_stg AS (
3482    SELECT * FROM stg_items
3483)
3484SELECT base.item_id, base.status
3485FROM import_stg AS base"#;
3486        let expr = parse(sql);
3487
3488        let mut schema = MappingSchema::new();
3489        schema
3490            .add_table(
3491                "stg_items",
3492                &[
3493                    ("item_id".to_string(), DataType::Unknown),
3494                    ("name".to_string(), DataType::Unknown),
3495                    ("status".to_string(), DataType::Unknown),
3496                ],
3497                None,
3498            )
3499            .unwrap();
3500
3501        let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
3502            .unwrap();
3503        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3504        assert!(
3505            all_names.iter().any(|n| n == "stg_items.item_id"),
3506            "Expected leaf 'stg_items.item_id', got: {:?}",
3507            all_names
3508        );
3509    }
3510
3511    #[test]
3512    fn test_lineage_cte_alias_with_join() {
3513        // Multiple CTE aliases in a JOIN: each should resolve independently
3514        let sql = r#"WITH
3515    import_users AS (SELECT id, name FROM users),
3516    import_orders AS (SELECT id, user_id, amount FROM orders)
3517SELECT u.name, o.amount
3518FROM import_users AS u
3519LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
3520        let expr = parse(sql);
3521
3522        let node = lineage("name", &expr, None, false).unwrap();
3523        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3524        assert!(
3525            all_names.iter().any(|n| n == "users.name"),
3526            "Expected leaf 'users.name', got: {:?}",
3527            all_names
3528        );
3529
3530        let node = lineage("amount", &expr, None, false).unwrap();
3531        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3532        assert!(
3533            all_names.iter().any(|n| n == "orders.amount"),
3534            "Expected leaf 'orders.amount', got: {:?}",
3535            all_names
3536        );
3537    }
3538
3539    // -----------------------------------------------------------------------
3540    // Quoted CTE name tests — verifying SQL identifier case semantics
3541    // -----------------------------------------------------------------------
3542
3543    #[test]
3544    fn test_lineage_unquoted_cte_case_insensitive() {
3545        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
3546        // MyCte and MYCTE should match.
3547        let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
3548        let node = lineage("col", &expr, None, false).unwrap();
3549        assert_eq!(node.name, "col");
3550        assert!(
3551            !node.downstream.is_empty(),
3552            "Unquoted CTE should resolve case-insensitively"
3553        );
3554    }
3555
3556    #[test]
3557    fn test_lineage_quoted_cte_case_preserved() {
3558        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
3559        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
3560        let node = lineage("col", &expr, None, false).unwrap();
3561        assert_eq!(node.name, "col");
3562        assert!(
3563            !node.downstream.is_empty(),
3564            "Quoted CTE with matching case should resolve"
3565        );
3566    }
3567
3568    #[test]
3569    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
3570        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
3571        // sqlglot treats this as a table reference, not a CTE match.
3572        // Star expansion should NOT resolve through the CTE.
3573        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
3574        // lineage("col", ...) should fail because "mycte" is treated as an external
3575        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
3576        let result = lineage("col", &expr, None, false);
3577        assert!(
3578            result.is_err(),
3579            "Quoted CTE with case mismatch should not expand star: {:?}",
3580            result
3581        );
3582    }
3583
3584    #[test]
3585    fn test_lineage_mixed_quoted_unquoted_cte() {
3586        // Mix of unquoted and quoted CTEs in a nested chain.
3587        let expr = parse(
3588            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
3589        );
3590        let node = lineage("a", &expr, None, false).unwrap();
3591        assert_eq!(node.name, "a");
3592        assert!(
3593            !node.downstream.is_empty(),
3594            "Mixed quoted/unquoted CTE chain should resolve"
3595        );
3596    }
3597
3598    // -----------------------------------------------------------------------
3599    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
3600    // -----------------------------------------------------------------------
3601    //
3602    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
3603    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
3604    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
3605    // direct string comparison for CTE name matching, ignoring the quoted status.
3606    //
3607    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
3608    // and unquoted as case-insensitive. The scope system should do the same.
3609    //
3610    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
3611    // which is broader than the star expansion scope of this PR.
3612
3613    #[test]
3614    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
3615        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
3616        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
3617        // as "mycte" incorrectly resolves to the CTE.
3618        //
3619        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
3620        // case-sensitive: "mycte" should NOT match CTE "MyCte".
3621        //
3622        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
3623        // this test should fail — update the assertion to match correct behavior:
3624        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
3625        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
3626        let node = lineage("col", &expr, None, false).unwrap();
3627        assert!(!node.downstream.is_empty());
3628        let child = &node.downstream[0];
3629        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
3630        assert_eq!(
3631            child.source_name, "MyCte",
3632            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3633             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3634        );
3635    }
3636
3637    #[test]
3638    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
3639        // Known bug: same as above but with qualified column reference ("mycte".col).
3640        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
3641        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
3642        //
3643        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
3644        // this test should fail — update to assert source_name != "MyCte".
3645        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
3646        let node = lineage("col", &expr, None, false).unwrap();
3647        assert!(!node.downstream.is_empty());
3648        let child = &node.downstream[0];
3649        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
3650        assert_eq!(
3651            child.source_name, "MyCte",
3652            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3653             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3654        );
3655    }
3656
3657    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
3658
3659    /// sqlglot: test_node_name_doesnt_contain_comment
3660    /// Comments in column expressions should not affect lineage resolution.
3661    /// NOTE: This test uses SELECT * from a derived table, which is a separate
3662    /// known limitation in polyglot-sql (star expansion in subqueries).
3663    #[test]
3664    #[ignore = "requires derived table star expansion (separate issue)"]
3665    fn test_node_name_doesnt_contain_comment() {
3666        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
3667        let node = lineage("x", &expr, None, false).unwrap();
3668
3669        assert_eq!(node.name, "x");
3670        assert!(!node.downstream.is_empty());
3671    }
3672
3673    /// A line comment between SELECT and the first column wraps the column
3674    /// in an Annotated node. Lineage must unwrap it to find the column name.
3675    /// Verify that commented and uncommented queries produce identical lineage.
3676    #[test]
3677    fn test_comment_before_first_column_in_cte() {
3678        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
3679        let sql_without_comment = "with t as (select 1 as a) select a from t";
3680
3681        // Without comment — baseline
3682        let expr_ok = parse(sql_without_comment);
3683        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
3684
3685        // With comment — should produce identical lineage
3686        let expr_comment = parse(sql_with_comment);
3687        let node_comment = lineage("a", &expr_comment, None, false)
3688            .expect("with comment before first column should succeed");
3689
3690        assert_eq!(node_ok.name, node_comment.name, "node names should match");
3691        assert_eq!(
3692            node_ok.downstream_names(),
3693            node_comment.downstream_names(),
3694            "downstream lineage should be identical with or without comment"
3695        );
3696    }
3697
3698    /// Block comment between SELECT and first column.
3699    #[test]
3700    fn test_block_comment_before_first_column() {
3701        let sql = "with t as (select 1 as a) select /* section */ a from t";
3702        let expr = parse(sql);
3703        let node = lineage("a", &expr, None, false)
3704            .expect("block comment before first column should succeed");
3705        assert_eq!(node.name, "a");
3706        assert!(
3707            !node.downstream.is_empty(),
3708            "should have downstream lineage"
3709        );
3710    }
3711
3712    /// Comment before first column should not affect second column resolution.
3713    #[test]
3714    fn test_comment_before_first_column_second_col_ok() {
3715        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
3716        let expr = parse(sql);
3717
3718        let node_a =
3719            lineage("a", &expr, None, false).expect("column a with comment should succeed");
3720        assert_eq!(node_a.name, "a");
3721
3722        let node_b =
3723            lineage("b", &expr, None, false).expect("column b with comment should succeed");
3724        assert_eq!(node_b.name, "b");
3725    }
3726
3727    /// Aliased column with preceding comment.
3728    #[test]
3729    fn test_comment_before_aliased_column() {
3730        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
3731        let expr = parse(sql);
3732        let node =
3733            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
3734        assert_eq!(node.name, "y");
3735        assert!(
3736            !node.downstream.is_empty(),
3737            "aliased column should have downstream lineage"
3738        );
3739    }
3740}