Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, Select};
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18
19/// A node in the column lineage graph
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22    /// Name of this lineage step (e.g., "table.column")
23    pub name: String,
24    /// The expression at this node
25    pub expression: Expression,
26    /// The source expression (the full query context)
27    pub source: Expression,
28    /// Downstream nodes that depend on this one
29    pub downstream: Vec<LineageNode>,
30    /// Optional source name (e.g., for derived tables)
31    pub source_name: String,
32    /// Optional reference node name (e.g., for CTEs)
33    pub reference_node_name: String,
34}
35
36impl LineageNode {
37    /// Create a new lineage node
38    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
39        Self {
40            name: name.into(),
41            expression,
42            source,
43            downstream: Vec::new(),
44            source_name: String::new(),
45            reference_node_name: String::new(),
46        }
47    }
48
49    /// Iterate over all nodes in the lineage graph using DFS
50    pub fn walk(&self) -> LineageWalker<'_> {
51        LineageWalker { stack: vec![self] }
52    }
53
54    /// Get all downstream column names
55    pub fn downstream_names(&self) -> Vec<String> {
56        self.downstream.iter().map(|n| n.name.clone()).collect()
57    }
58}
59
60/// Iterator for walking the lineage graph
61pub struct LineageWalker<'a> {
62    stack: Vec<&'a LineageNode>,
63}
64
65impl<'a> Iterator for LineageWalker<'a> {
66    type Item = &'a LineageNode;
67
68    fn next(&mut self) -> Option<Self::Item> {
69        if let Some(node) = self.stack.pop() {
70            // Add children in reverse order so they're visited in order
71            for child in node.downstream.iter().rev() {
72                self.stack.push(child);
73            }
74            Some(node)
75        } else {
76            None
77        }
78    }
79}
80
81// ---------------------------------------------------------------------------
82// ColumnRef: name or positional index for column lookup
83// ---------------------------------------------------------------------------
84
85/// Column reference for lineage tracing — by name or positional index.
86enum ColumnRef<'a> {
87    Name(&'a str),
88    Index(usize),
89}
90
91// ---------------------------------------------------------------------------
92// Public API
93// ---------------------------------------------------------------------------
94
95/// Build the lineage graph for a column in a SQL query
96///
97/// # Arguments
98/// * `column` - The column name to trace lineage for
99/// * `sql` - The SQL expression (SELECT, UNION, etc.)
100/// * `dialect` - Optional dialect for parsing
101/// * `trim_selects` - If true, trim the source SELECT to only include the target column
102///
103/// # Returns
104/// The root lineage node for the specified column
105///
106/// # Example
107/// ```ignore
108/// use polyglot_sql::lineage::lineage;
109/// use polyglot_sql::parse_one;
110/// use polyglot_sql::DialectType;
111///
112/// let sql = "SELECT a, b + 1 AS c FROM t";
113/// let expr = parse_one(sql, DialectType::Generic).unwrap();
114/// let node = lineage("c", &expr, None, false).unwrap();
115/// ```
116pub fn lineage(
117    column: &str,
118    sql: &Expression,
119    dialect: Option<DialectType>,
120    trim_selects: bool,
121) -> Result<LineageNode> {
122    // Fast path: skip clone when there are no CTEs to expand
123    let has_with = matches!(sql, Expression::Select(s) if s.with.is_some());
124    if !has_with {
125        return lineage_from_expression(column, sql, dialect, trim_selects);
126    }
127    let mut owned = sql.clone();
128    expand_cte_stars(&mut owned, None);
129    lineage_from_expression(column, &owned, dialect, trim_selects)
130}
131
132/// Build the lineage graph for a column in a SQL query using optional schema metadata.
133///
134/// When `schema` is provided, the query is first qualified with
135/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
136/// ambiguous column references.
137///
138/// # Arguments
139/// * `column` - The column name to trace lineage for
140/// * `sql` - The SQL expression (SELECT, UNION, etc.)
141/// * `schema` - Optional schema used for qualification
142/// * `dialect` - Optional dialect for qualification and lineage handling
143/// * `trim_selects` - If true, trim the source SELECT to only include the target column
144///
145/// # Returns
146/// The root lineage node for the specified column
147pub fn lineage_with_schema(
148    column: &str,
149    sql: &Expression,
150    schema: Option<&dyn Schema>,
151    dialect: Option<DialectType>,
152    trim_selects: bool,
153) -> Result<LineageNode> {
154    let mut qualified_expression = if let Some(schema) = schema {
155        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
156            QualifyColumnsOptions::new().with_dialect(dialect_type)
157        } else {
158            QualifyColumnsOptions::new()
159        };
160
161        qualify_columns(sql.clone(), schema, &options).map_err(|e| {
162            Error::internal(format!("Lineage qualification failed with schema: {}", e))
163        })?
164    } else {
165        sql.clone()
166    };
167
168    // Annotate types in-place so lineage nodes carry type information
169    annotate_types(&mut qualified_expression, schema, dialect);
170
171    // Expand CTE stars on the already-owned expression (no extra clone).
172    // Pass schema so that stars from external tables can also be resolved.
173    expand_cte_stars(&mut qualified_expression, schema);
174
175    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
176}
177
178fn lineage_from_expression(
179    column: &str,
180    sql: &Expression,
181    dialect: Option<DialectType>,
182    trim_selects: bool,
183) -> Result<LineageNode> {
184    let scope = build_scope(sql);
185    to_node(
186        ColumnRef::Name(column),
187        &scope,
188        dialect,
189        "",
190        "",
191        "",
192        trim_selects,
193    )
194}
195
196// ---------------------------------------------------------------------------
197// CTE star expansion
198// ---------------------------------------------------------------------------
199
200/// Normalize an identifier for CTE name matching.
201///
202/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
203/// quoted identifiers preserve their original case. This matches sqlglot's
204/// `normalize_identifiers` behavior.
205fn normalize_cte_name(ident: &Identifier) -> String {
206    if ident.quoted {
207        ident.name.clone()
208    } else {
209        ident.name.to_lowercase()
210    }
211}
212
213/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
214/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
215/// which qualify_columns cannot resolve because it processes each SELECT independently.
216///
217/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
218/// by looking up column names in the schema. This enables correct expansion of patterns
219/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
220///
221/// CTE name matching follows SQL identifier semantics: unquoted names are compared
222/// case-insensitively (lowercased), while quoted names preserve their original case.
223/// This matches sqlglot's `normalize_identifiers` behavior.
224pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
225    let select = match expr {
226        Expression::Select(s) => s,
227        _ => return,
228    };
229
230    let with = match &mut select.with {
231        Some(w) => w,
232        None => return,
233    };
234
235    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
236
237    for cte in &mut with.ctes {
238        let cte_name = normalize_cte_name(&cte.alias);
239
240        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
241        if !cte.columns.is_empty() {
242            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
243            resolved_cte_columns.insert(cte_name, cols);
244            continue;
245        }
246
247        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
248        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
249        // references itself. We detect this conservatively: if the CTE name appears as
250        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
251        // block are still expanded.
252        if with.recursive {
253            let is_self_referencing = if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
254                let body_sources = get_select_sources(body_select);
255                body_sources.iter().any(|s| s.normalized == cte_name)
256            } else {
257                false
258            };
259            if is_self_referencing {
260                continue;
261            }
262        }
263
264        // Get the SELECT from the CTE body (handle UNION by taking left branch)
265        let body_select = match get_leftmost_select_mut(&mut cte.this) {
266            Some(s) => s,
267            None => continue,
268        };
269
270        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
271        resolved_cte_columns.insert(cte_name, columns);
272    }
273
274    // Also expand stars in the outer SELECT itself
275    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
276}
277
278/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
279///
280/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
281/// are determined by the left branch. This matches sqlglot's behavior.
282fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
283    let mut current = expr;
284    for _ in 0..MAX_LINEAGE_DEPTH {
285        match current {
286            Expression::Select(s) => return Some(s),
287            Expression::Union(u) => current = &mut u.left,
288            Expression::Intersect(i) => current = &mut i.left,
289            Expression::Except(e) => current = &mut e.left,
290            Expression::Paren(p) => current = &mut p.this,
291            _ => return None,
292        }
293    }
294    None
295}
296
297/// Rewrite star expressions in a SELECT using resolved CTE column lists.
298/// Falls back to `schema` for external table column lookup.
299/// Returns the list of output column names after expansion.
300fn rewrite_stars_in_select(
301    select: &mut Select,
302    resolved_ctes: &HashMap<String, Vec<String>>,
303    schema: Option<&dyn Schema>,
304) -> Vec<String> {
305    // The AST represents star expressions in two forms depending on syntax:
306    //   - `SELECT *`      → Expression::Star (unqualified star)
307    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
308    // Both must be checked to handle all star patterns.
309    let has_star = select
310        .expressions
311        .iter()
312        .any(|e| matches!(e, Expression::Star(_)));
313    let has_qualified_star = select
314        .expressions
315        .iter()
316        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
317
318    if !has_star && !has_qualified_star {
319        // No stars — just extract column names without rewriting
320        return select
321            .expressions
322            .iter()
323            .filter_map(get_expression_output_name)
324            .collect();
325    }
326
327    let sources = get_select_sources(select);
328    let mut new_expressions = Vec::new();
329    let mut result_columns = Vec::new();
330
331    for expr in &select.expressions {
332        match expr {
333            Expression::Star(star) => {
334                let qual = star.table.as_ref();
335                if let Some(expanded) = expand_star_from_sources(
336                    qual,
337                    &sources,
338                    resolved_ctes,
339                    schema,
340                ) {
341                    for (src_alias, col_name) in &expanded {
342                        let table_id = Identifier::new(src_alias);
343                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
344                        result_columns.push(col_name.clone());
345                    }
346                } else {
347                    new_expressions.push(expr.clone());
348                    result_columns.push("*".to_string());
349                }
350            }
351            Expression::Column(c) if c.name.name == "*" => {
352                let qual = c.table.as_ref();
353                if let Some(expanded) = expand_star_from_sources(
354                    qual,
355                    &sources,
356                    resolved_ctes,
357                    schema,
358                ) {
359                    for (_src_alias, col_name) in &expanded {
360                        // Keep the original table qualifier for qualified stars (table.*)
361                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
362                        result_columns.push(col_name.clone());
363                    }
364                } else {
365                    new_expressions.push(expr.clone());
366                    result_columns.push("*".to_string());
367                }
368            }
369            _ => {
370                new_expressions.push(expr.clone());
371                if let Some(name) = get_expression_output_name(expr) {
372                    result_columns.push(name);
373                }
374            }
375        }
376    }
377
378    select.expressions = new_expressions;
379    result_columns
380}
381
382/// Try to expand a star expression by looking up source columns from resolved CTEs,
383/// falling back to the schema for external tables.
384/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
385/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
386fn expand_star_from_sources(
387    qualifier: Option<&Identifier>,
388    sources: &[SourceInfo],
389    resolved_ctes: &HashMap<String, Vec<String>>,
390    schema: Option<&dyn Schema>,
391) -> Option<Vec<(String, String)>> {
392    let mut expanded = Vec::new();
393
394    if let Some(qual) = qualifier {
395        // Qualified star: table.*
396        let qual_normalized = normalize_cte_name(qual);
397        for src in sources {
398            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
399                // Try CTE first
400                if let Some(cols) = resolved_ctes.get(&src.normalized) {
401                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
402                    return Some(expanded);
403                }
404                // Fall back to schema
405                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
406                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
407                    return Some(expanded);
408                }
409            }
410        }
411        None
412    } else {
413        // Unqualified star: expand all sources.
414        // Intentionally conservative: if any source can't be resolved, the entire
415        // expansion is aborted. Partial expansion would produce an incomplete column
416        // list, causing downstream lineage resolution to silently omit columns.
417        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
418        let mut any_expanded = false;
419        for src in sources {
420            if let Some(cols) = resolved_ctes.get(&src.normalized) {
421                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
422                any_expanded = true;
423            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
424                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
425                any_expanded = true;
426            } else {
427                return None;
428            }
429        }
430        if any_expanded {
431            Some(expanded)
432        } else {
433            None
434        }
435    }
436}
437
438/// Look up column names for a table from the schema.
439fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
440    let schema = schema?;
441    if fq_name.is_empty() {
442        return None;
443    }
444    schema.column_names(fq_name).ok().filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
445}
446
447/// Create a Column expression with the given name and optional table qualifier.
448fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
449    Expression::Column(Box::new(crate::expressions::Column {
450        name: Identifier::new(name),
451        table: table.cloned(),
452        join_mark: false,
453        trailing_comments: Vec::new(),
454        span: None,
455        inferred_type: None,
456    }))
457}
458
459/// Extract the output name of a SELECT expression.
460fn get_expression_output_name(expr: &Expression) -> Option<String> {
461    match expr {
462        Expression::Alias(a) => Some(a.alias.name.clone()),
463        Expression::Column(c) => Some(c.name.name.clone()),
464        Expression::Identifier(id) => Some(id.name.clone()),
465        Expression::Star(_) => Some("*".to_string()),
466        _ => None,
467    }
468}
469
470/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
471struct SourceInfo {
472    alias: String,
473    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
474    normalized: String,
475    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
476    fq_name: String,
477}
478
479/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
480/// SELECT's FROM and JOIN clauses in a single pass.
481fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
482    let mut sources = Vec::new();
483
484    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
485        match expr {
486            Expression::Table(t) => {
487                let normalized = normalize_cte_name(&t.name);
488                let alias = t
489                    .alias
490                    .as_ref()
491                    .map(|a| a.name.clone())
492                    .unwrap_or_else(|| t.name.name.clone());
493                let mut parts = Vec::new();
494                if let Some(catalog) = &t.catalog {
495                    parts.push(catalog.name.clone());
496                }
497                if let Some(schema) = &t.schema {
498                    parts.push(schema.name.clone());
499                }
500                parts.push(t.name.name.clone());
501                let fq_name = parts.join(".");
502                Some(SourceInfo { alias, normalized, fq_name })
503            }
504            Expression::Subquery(s) => {
505                let alias = s.alias.as_ref()?.name.clone();
506                let normalized = alias.to_lowercase();
507                let fq_name = alias.clone();
508                Some(SourceInfo { alias, normalized, fq_name })
509            }
510            Expression::Paren(p) => extract_source(&p.this),
511            _ => None,
512        }
513    }
514
515    if let Some(from) = &select.from {
516        for expr in &from.expressions {
517            if let Some(info) = extract_source(expr) {
518                sources.push(info);
519            }
520        }
521    }
522    for join in &select.joins {
523        if let Some(info) = extract_source(&join.this) {
524            sources.push(info);
525        }
526    }
527    sources
528}
529
530/// Get all source tables from a lineage graph
531pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
532    let mut tables = HashSet::new();
533    collect_source_tables(node, &mut tables);
534    tables
535}
536
537/// Recursively collect source table names from lineage graph
538pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
539    if let Expression::Table(table) = &node.source {
540        tables.insert(table.name.name.clone());
541    }
542    for child in &node.downstream {
543        collect_source_tables(child, tables);
544    }
545}
546
547// ---------------------------------------------------------------------------
548// Core recursive lineage builder
549// ---------------------------------------------------------------------------
550
551/// Maximum recursion depth for lineage tracing to prevent stack overflow
552/// on circular or deeply nested CTE chains.
553const MAX_LINEAGE_DEPTH: usize = 64;
554
555/// Recursively build a lineage node for a column in a scope.
556fn to_node(
557    column: ColumnRef<'_>,
558    scope: &Scope,
559    dialect: Option<DialectType>,
560    scope_name: &str,
561    source_name: &str,
562    reference_node_name: &str,
563    trim_selects: bool,
564) -> Result<LineageNode> {
565    to_node_inner(
566        column,
567        scope,
568        dialect,
569        scope_name,
570        source_name,
571        reference_node_name,
572        trim_selects,
573        &[],
574        0,
575    )
576}
577
578fn to_node_inner(
579    column: ColumnRef<'_>,
580    scope: &Scope,
581    dialect: Option<DialectType>,
582    scope_name: &str,
583    source_name: &str,
584    reference_node_name: &str,
585    trim_selects: bool,
586    ancestor_cte_scopes: &[Scope],
587    depth: usize,
588) -> Result<LineageNode> {
589    if depth > MAX_LINEAGE_DEPTH {
590        return Err(Error::internal(format!(
591            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
592        )));
593    }
594    let scope_expr = &scope.expression;
595
596    // Build combined CTE scopes: current scope's cte_scopes + ancestors
597    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
598    for s in ancestor_cte_scopes {
599        all_cte_scopes.push(s);
600    }
601
602    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
603    //    but we need the inner query (SELECT/UNION) for column lookup.
604    let effective_expr = match scope_expr {
605        Expression::Cte(cte) => &cte.this,
606        other => other,
607    };
608
609    // 1. Set operations (UNION / INTERSECT / EXCEPT)
610    if matches!(
611        effective_expr,
612        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
613    ) {
614        // For CTE wrapping a set op, create a temporary scope with the inner expression
615        if matches!(scope_expr, Expression::Cte(_)) {
616            let mut inner_scope = Scope::new(effective_expr.clone());
617            inner_scope.union_scopes = scope.union_scopes.clone();
618            inner_scope.sources = scope.sources.clone();
619            inner_scope.cte_sources = scope.cte_sources.clone();
620            inner_scope.cte_scopes = scope.cte_scopes.clone();
621            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
622            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
623            return handle_set_operation(
624                &column,
625                &inner_scope,
626                dialect,
627                scope_name,
628                source_name,
629                reference_node_name,
630                trim_selects,
631                ancestor_cte_scopes,
632                depth,
633            );
634        }
635        return handle_set_operation(
636            &column,
637            scope,
638            dialect,
639            scope_name,
640            source_name,
641            reference_node_name,
642            trim_selects,
643            ancestor_cte_scopes,
644            depth,
645        );
646    }
647
648    // 2. Find the select expression for this column
649    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
650    let column_name = resolve_column_name(&column, &select_expr);
651
652    // 3. Trim source if requested
653    let node_source = if trim_selects {
654        trim_source(effective_expr, &select_expr)
655    } else {
656        effective_expr.clone()
657    };
658
659    // 4. Create the lineage node
660    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
661    node.source_name = source_name.to_string();
662    node.reference_node_name = reference_node_name.to_string();
663
664    // 5. Star handling — add downstream for each source
665    if matches!(&select_expr, Expression::Star(_)) {
666        for (name, source_info) in &scope.sources {
667            let child = LineageNode::new(
668                format!("{}.*", name),
669                Expression::Star(crate::expressions::Star {
670                    table: None,
671                    except: None,
672                    replace: None,
673                    rename: None,
674                    trailing_comments: vec![],
675                    span: None,
676                }),
677                source_info.expression.clone(),
678            );
679            node.downstream.push(child);
680        }
681        return Ok(node);
682    }
683
684    // 6. Subqueries in select — trace through scalar subqueries
685    let subqueries: Vec<&Expression> =
686        select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
687    for sq_expr in subqueries {
688        if let Expression::Subquery(sq) = sq_expr {
689            for sq_scope in &scope.subquery_scopes {
690                if sq_scope.expression == sq.this {
691                    if let Ok(child) = to_node_inner(
692                        ColumnRef::Index(0),
693                        sq_scope,
694                        dialect,
695                        &column_name,
696                        "",
697                        "",
698                        trim_selects,
699                        ancestor_cte_scopes,
700                        depth + 1,
701                    ) {
702                        node.downstream.push(child);
703                    }
704                    break;
705                }
706            }
707        }
708    }
709
710    // 7. Column references — trace each column to its source
711    let col_refs = find_column_refs_in_expr(&select_expr);
712    for col_ref in col_refs {
713        let col_name = &col_ref.column;
714        if let Some(ref table_id) = col_ref.table {
715            let tbl = &table_id.name;
716            resolve_qualified_column(
717                &mut node,
718                scope,
719                dialect,
720                tbl,
721                col_name,
722                &column_name,
723                trim_selects,
724                &all_cte_scopes,
725                depth,
726            );
727        } else {
728            resolve_unqualified_column(
729                &mut node,
730                scope,
731                dialect,
732                col_name,
733                &column_name,
734                trim_selects,
735                &all_cte_scopes,
736                depth,
737            );
738        }
739    }
740
741    Ok(node)
742}
743
744// ---------------------------------------------------------------------------
745// Set operation handling
746// ---------------------------------------------------------------------------
747
748fn handle_set_operation(
749    column: &ColumnRef<'_>,
750    scope: &Scope,
751    dialect: Option<DialectType>,
752    scope_name: &str,
753    source_name: &str,
754    reference_node_name: &str,
755    trim_selects: bool,
756    ancestor_cte_scopes: &[Scope],
757    depth: usize,
758) -> Result<LineageNode> {
759    let scope_expr = &scope.expression;
760
761    // Determine column index
762    let col_index = match column {
763        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
764        ColumnRef::Index(i) => *i,
765    };
766
767    let col_name = match column {
768        ColumnRef::Name(name) => name.to_string(),
769        ColumnRef::Index(_) => format!("_{col_index}"),
770    };
771
772    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
773    node.source_name = source_name.to_string();
774    node.reference_node_name = reference_node_name.to_string();
775
776    // Recurse into each union branch
777    for branch_scope in &scope.union_scopes {
778        if let Ok(child) = to_node_inner(
779            ColumnRef::Index(col_index),
780            branch_scope,
781            dialect,
782            scope_name,
783            "",
784            "",
785            trim_selects,
786            ancestor_cte_scopes,
787            depth + 1,
788        ) {
789            node.downstream.push(child);
790        }
791    }
792
793    Ok(node)
794}
795
796// ---------------------------------------------------------------------------
797// Column resolution helpers
798// ---------------------------------------------------------------------------
799
800fn resolve_qualified_column(
801    node: &mut LineageNode,
802    scope: &Scope,
803    dialect: Option<DialectType>,
804    table: &str,
805    col_name: &str,
806    parent_name: &str,
807    trim_selects: bool,
808    all_cte_scopes: &[&Scope],
809    depth: usize,
810) {
811    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
812    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
813    let resolved_cte_name = resolve_cte_alias(scope, table);
814    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
815
816    // Check if table is a CTE reference — check both the current scope's cte_sources
817    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
818    let is_cte = scope.cte_sources.contains_key(effective_table)
819        || all_cte_scopes.iter().any(|s| {
820            matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table)
821        });
822    if is_cte {
823        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
824            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
825            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
826            if let Ok(child) = to_node_inner(
827                ColumnRef::Name(col_name),
828                child_scope,
829                dialect,
830                parent_name,
831                effective_table,
832                parent_name,
833                trim_selects,
834                &ancestors,
835                depth + 1,
836            ) {
837                node.downstream.push(child);
838                return;
839            }
840        }
841    }
842
843    // Check if table is a derived table (is_scope = true in sources)
844    if let Some(source_info) = scope.sources.get(table) {
845        if source_info.is_scope {
846            if let Some(child_scope) = find_child_scope(scope, table) {
847                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
848                if let Ok(child) = to_node_inner(
849                    ColumnRef::Name(col_name),
850                    child_scope,
851                    dialect,
852                    parent_name,
853                    table,
854                    parent_name,
855                    trim_selects,
856                    &ancestors,
857                    depth + 1,
858                ) {
859                    node.downstream.push(child);
860                    return;
861                }
862            }
863        }
864    }
865
866    // Base table source found in current scope: preserve alias in the display name
867    // but store the resolved table expression and name for downstream consumers.
868    if let Some(source_info) = scope.sources.get(table) {
869        if !source_info.is_scope {
870            node.downstream.push(make_table_column_node_from_source(
871                table,
872                col_name,
873                &source_info.expression,
874            ));
875            return;
876        }
877    }
878
879    // Base table or unresolved — terminal node
880    node.downstream
881        .push(make_table_column_node(table, col_name));
882}
883
884/// Resolve a FROM alias to the original CTE name.
885///
886/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
887/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
888/// This function checks if `name` is such an alias and returns the CTE name.
889fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
890    // If it's already a known CTE name, no resolution needed
891    if scope.cte_sources.contains_key(name) {
892        return None;
893    }
894    // Check if the source's expression is a CTE — if so, extract the CTE name
895    if let Some(source_info) = scope.sources.get(name) {
896        if source_info.is_scope {
897            if let Expression::Cte(cte) = &source_info.expression {
898                let cte_name = &cte.alias.name;
899                if scope.cte_sources.contains_key(cte_name) {
900                    return Some(cte_name.clone());
901                }
902            }
903        }
904    }
905    None
906}
907
908fn resolve_unqualified_column(
909    node: &mut LineageNode,
910    scope: &Scope,
911    dialect: Option<DialectType>,
912    col_name: &str,
913    parent_name: &str,
914    trim_selects: bool,
915    all_cte_scopes: &[&Scope],
916    depth: usize,
917) {
918    // Try to find which source this column belongs to.
919    // Build the source list from the actual FROM/JOIN clauses to avoid
920    // mixing in CTE definitions that are in scope but not referenced.
921    let from_source_names = source_names_from_from_join(scope);
922
923    if from_source_names.len() == 1 {
924        let tbl = &from_source_names[0];
925        resolve_qualified_column(
926            node,
927            scope,
928            dialect,
929            tbl,
930            col_name,
931            parent_name,
932            trim_selects,
933            all_cte_scopes,
934            depth,
935        );
936        return;
937    }
938
939    // Multiple sources — can't resolve without schema info, add unqualified node
940    let child = LineageNode::new(
941        col_name.to_string(),
942        Expression::Column(Box::new(crate::expressions::Column {
943            name: crate::expressions::Identifier::new(col_name.to_string()),
944            table: None,
945            join_mark: false,
946            trailing_comments: vec![],
947            span: None,
948            inferred_type: None,
949        })),
950        node.source.clone(),
951    );
952    node.downstream.push(child);
953}
954
955fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
956    fn source_name(expr: &Expression) -> Option<String> {
957        match expr {
958            Expression::Table(table) => Some(
959                table
960                    .alias
961                    .as_ref()
962                    .map(|a| a.name.clone())
963                    .unwrap_or_else(|| table.name.name.clone()),
964            ),
965            Expression::Subquery(subquery) => {
966                subquery.alias.as_ref().map(|alias| alias.name.clone())
967            }
968            Expression::Paren(paren) => source_name(&paren.this),
969            _ => None,
970        }
971    }
972
973    let effective_expr = match &scope.expression {
974        Expression::Cte(cte) => &cte.this,
975        expr => expr,
976    };
977
978    let mut names = Vec::new();
979    let mut seen = std::collections::HashSet::new();
980
981    if let Expression::Select(select) = effective_expr {
982        if let Some(from) = &select.from {
983            for expr in &from.expressions {
984                if let Some(name) = source_name(expr) {
985                    if !name.is_empty() && seen.insert(name.clone()) {
986                        names.push(name);
987                    }
988                }
989            }
990        }
991        for join in &select.joins {
992            if let Some(name) = source_name(&join.this) {
993                if !name.is_empty() && seen.insert(name.clone()) {
994                    names.push(name);
995                }
996            }
997        }
998    }
999
1000    names
1001}
1002
1003// ---------------------------------------------------------------------------
1004// Helper functions
1005// ---------------------------------------------------------------------------
1006
1007/// Get the alias or name of an expression
1008fn get_alias_or_name(expr: &Expression) -> Option<String> {
1009    match expr {
1010        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1011        Expression::Column(col) => Some(col.name.name.clone()),
1012        Expression::Identifier(id) => Some(id.name.clone()),
1013        Expression::Star(_) => Some("*".to_string()),
1014        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
1015        // Unwrap to get the actual column/alias name from the inner expression.
1016        Expression::Annotated(a) => get_alias_or_name(&a.this),
1017        _ => None,
1018    }
1019}
1020
1021/// Resolve the display name for a column reference.
1022fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
1023    match column {
1024        ColumnRef::Name(n) => n.to_string(),
1025        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
1026    }
1027}
1028
1029/// Find the select expression matching a column reference.
1030fn find_select_expr(
1031    scope_expr: &Expression,
1032    column: &ColumnRef<'_>,
1033    dialect: Option<DialectType>,
1034) -> Result<Expression> {
1035    if let Expression::Select(ref select) = scope_expr {
1036        match column {
1037            ColumnRef::Name(name) => {
1038                let normalized_name = normalize_column_name(name, dialect);
1039                for expr in &select.expressions {
1040                    if let Some(alias_or_name) = get_alias_or_name(expr) {
1041                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1042                            return Ok(expr.clone());
1043                        }
1044                    }
1045                }
1046                Err(crate::error::Error::parse(
1047                    format!("Cannot find column '{}' in query", name),
1048                    0,
1049                    0,
1050                    0,
1051                    0,
1052                ))
1053            }
1054            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
1055                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
1056            }),
1057        }
1058    } else {
1059        Err(crate::error::Error::parse(
1060            "Expected SELECT expression for column lookup",
1061            0,
1062            0,
1063            0,
1064            0,
1065        ))
1066    }
1067}
1068
1069/// Find the positional index of a column name in a set operation's first SELECT branch.
1070fn column_to_index(
1071    set_op_expr: &Expression,
1072    name: &str,
1073    dialect: Option<DialectType>,
1074) -> Result<usize> {
1075    let normalized_name = normalize_column_name(name, dialect);
1076    let mut expr = set_op_expr;
1077    loop {
1078        match expr {
1079            Expression::Union(u) => expr = &u.left,
1080            Expression::Intersect(i) => expr = &i.left,
1081            Expression::Except(e) => expr = &e.left,
1082            Expression::Select(select) => {
1083                for (i, e) in select.expressions.iter().enumerate() {
1084                    if let Some(alias_or_name) = get_alias_or_name(e) {
1085                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
1086                            return Ok(i);
1087                        }
1088                    }
1089                }
1090                return Err(crate::error::Error::parse(
1091                    format!("Cannot find column '{}' in set operation", name),
1092                    0,
1093                    0,
1094                    0,
1095                    0,
1096                ));
1097            }
1098            _ => {
1099                return Err(crate::error::Error::parse(
1100                    "Expected SELECT or set operation",
1101                    0,
1102                    0,
1103                    0,
1104                    0,
1105                ))
1106            }
1107        }
1108    }
1109}
1110
1111fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
1112    normalize_name(name, dialect, false, true)
1113}
1114
1115/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
1116fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
1117    if let Expression::Select(select) = select_expr {
1118        let mut trimmed = select.as_ref().clone();
1119        trimmed.expressions = vec![target_expr.clone()];
1120        Expression::Select(Box::new(trimmed))
1121    } else {
1122        select_expr.clone()
1123    }
1124}
1125
1126/// Find the child scope (CTE or derived table) for a given source name.
1127fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
1128    // Check CTE scopes
1129    if scope.cte_sources.contains_key(source_name) {
1130        for cte_scope in &scope.cte_scopes {
1131            if let Expression::Cte(cte) = &cte_scope.expression {
1132                if cte.alias.name == source_name {
1133                    return Some(cte_scope);
1134                }
1135            }
1136        }
1137    }
1138
1139    // Check derived table scopes
1140    if let Some(source_info) = scope.sources.get(source_name) {
1141        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
1142            if let Expression::Subquery(sq) = &source_info.expression {
1143                for dt_scope in &scope.derived_table_scopes {
1144                    if dt_scope.expression == sq.this {
1145                        return Some(dt_scope);
1146                    }
1147                }
1148            }
1149        }
1150    }
1151
1152    None
1153}
1154
1155/// Find a CTE scope by name, searching through a combined list of CTE scopes.
1156/// This handles nested CTEs where the current scope doesn't have the CTE scope
1157/// as a direct child but knows about it via cte_sources.
1158fn find_child_scope_in<'a>(
1159    all_cte_scopes: &[&'a Scope],
1160    scope: &'a Scope,
1161    source_name: &str,
1162) -> Option<&'a Scope> {
1163    // First try the scope's own cte_scopes
1164    for cte_scope in &scope.cte_scopes {
1165        if let Expression::Cte(cte) = &cte_scope.expression {
1166            if cte.alias.name == source_name {
1167                return Some(cte_scope);
1168            }
1169        }
1170    }
1171
1172    // Then search through all ancestor CTE scopes
1173    for cte_scope in all_cte_scopes {
1174        if let Expression::Cte(cte) = &cte_scope.expression {
1175            if cte.alias.name == source_name {
1176                return Some(cte_scope);
1177            }
1178        }
1179    }
1180
1181    // Fall back to derived table scopes
1182    if let Some(source_info) = scope.sources.get(source_name) {
1183        if source_info.is_scope {
1184            if let Expression::Subquery(sq) = &source_info.expression {
1185                for dt_scope in &scope.derived_table_scopes {
1186                    if dt_scope.expression == sq.this {
1187                        return Some(dt_scope);
1188                    }
1189                }
1190            }
1191        }
1192    }
1193
1194    None
1195}
1196
1197/// Create a terminal lineage node for a table.column reference.
1198fn make_table_column_node(table: &str, column: &str) -> LineageNode {
1199    let mut node = LineageNode::new(
1200        format!("{}.{}", table, column),
1201        Expression::Column(Box::new(crate::expressions::Column {
1202            name: crate::expressions::Identifier::new(column.to_string()),
1203            table: Some(crate::expressions::Identifier::new(table.to_string())),
1204            join_mark: false,
1205            trailing_comments: vec![],
1206            span: None,
1207            inferred_type: None,
1208        })),
1209        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
1210    );
1211    node.source_name = table.to_string();
1212    node
1213}
1214
1215fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
1216    let mut parts: Vec<String> = Vec::new();
1217    if let Some(catalog) = &table_ref.catalog {
1218        parts.push(catalog.name.clone());
1219    }
1220    if let Some(schema) = &table_ref.schema {
1221        parts.push(schema.name.clone());
1222    }
1223    parts.push(table_ref.name.name.clone());
1224    parts.join(".")
1225}
1226
1227fn make_table_column_node_from_source(
1228    table_alias: &str,
1229    column: &str,
1230    source: &Expression,
1231) -> LineageNode {
1232    let mut node = LineageNode::new(
1233        format!("{}.{}", table_alias, column),
1234        Expression::Column(Box::new(crate::expressions::Column {
1235            name: crate::expressions::Identifier::new(column.to_string()),
1236            table: Some(crate::expressions::Identifier::new(table_alias.to_string())),
1237            join_mark: false,
1238            trailing_comments: vec![],
1239            span: None,
1240            inferred_type: None,
1241        })),
1242        source.clone(),
1243    );
1244
1245    if let Expression::Table(table_ref) = source {
1246        node.source_name = table_name_from_table_ref(table_ref);
1247    } else {
1248        node.source_name = table_alias.to_string();
1249    }
1250
1251    node
1252}
1253
1254/// Simple column reference extracted from an expression
1255#[derive(Debug, Clone)]
1256struct SimpleColumnRef {
1257    table: Option<crate::expressions::Identifier>,
1258    column: String,
1259}
1260
1261/// Find all column references in an expression (does not recurse into subqueries).
1262fn find_column_refs_in_expr(expr: &Expression) -> Vec<SimpleColumnRef> {
1263    let mut refs = Vec::new();
1264    collect_column_refs(expr, &mut refs);
1265    refs
1266}
1267
1268fn collect_column_refs(expr: &Expression, refs: &mut Vec<SimpleColumnRef>) {
1269    let mut stack: Vec<&Expression> = vec![expr];
1270
1271    while let Some(current) = stack.pop() {
1272        match current {
1273            // === Leaf: collect Column references ===
1274            Expression::Column(col) => {
1275                refs.push(SimpleColumnRef {
1276                    table: col.table.clone(),
1277                    column: col.name.name.clone(),
1278                });
1279            }
1280
1281            // === Boundary: don't recurse into subqueries (handled separately) ===
1282            Expression::Subquery(_) | Expression::Exists(_) => {}
1283
1284            // === BinaryOp variants: left, right ===
1285            Expression::And(op)
1286            | Expression::Or(op)
1287            | Expression::Eq(op)
1288            | Expression::Neq(op)
1289            | Expression::Lt(op)
1290            | Expression::Lte(op)
1291            | Expression::Gt(op)
1292            | Expression::Gte(op)
1293            | Expression::Add(op)
1294            | Expression::Sub(op)
1295            | Expression::Mul(op)
1296            | Expression::Div(op)
1297            | Expression::Mod(op)
1298            | Expression::BitwiseAnd(op)
1299            | Expression::BitwiseOr(op)
1300            | Expression::BitwiseXor(op)
1301            | Expression::BitwiseLeftShift(op)
1302            | Expression::BitwiseRightShift(op)
1303            | Expression::Concat(op)
1304            | Expression::Adjacent(op)
1305            | Expression::TsMatch(op)
1306            | Expression::PropertyEQ(op)
1307            | Expression::ArrayContainsAll(op)
1308            | Expression::ArrayContainedBy(op)
1309            | Expression::ArrayOverlaps(op)
1310            | Expression::JSONBContainsAllTopKeys(op)
1311            | Expression::JSONBContainsAnyTopKeys(op)
1312            | Expression::JSONBDeleteAtPath(op)
1313            | Expression::ExtendsLeft(op)
1314            | Expression::ExtendsRight(op)
1315            | Expression::Is(op)
1316            | Expression::MemberOf(op)
1317            | Expression::NullSafeEq(op)
1318            | Expression::NullSafeNeq(op)
1319            | Expression::Glob(op)
1320            | Expression::Match(op) => {
1321                stack.push(&op.left);
1322                stack.push(&op.right);
1323            }
1324
1325            // === UnaryOp variants: this ===
1326            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
1327                stack.push(&u.this);
1328            }
1329
1330            // === UnaryFunc variants: this ===
1331            Expression::Upper(f)
1332            | Expression::Lower(f)
1333            | Expression::Length(f)
1334            | Expression::LTrim(f)
1335            | Expression::RTrim(f)
1336            | Expression::Reverse(f)
1337            | Expression::Abs(f)
1338            | Expression::Sqrt(f)
1339            | Expression::Cbrt(f)
1340            | Expression::Ln(f)
1341            | Expression::Exp(f)
1342            | Expression::Sign(f)
1343            | Expression::Date(f)
1344            | Expression::Time(f)
1345            | Expression::DateFromUnixDate(f)
1346            | Expression::UnixDate(f)
1347            | Expression::UnixSeconds(f)
1348            | Expression::UnixMillis(f)
1349            | Expression::UnixMicros(f)
1350            | Expression::TimeStrToDate(f)
1351            | Expression::DateToDi(f)
1352            | Expression::DiToDate(f)
1353            | Expression::TsOrDiToDi(f)
1354            | Expression::TsOrDsToDatetime(f)
1355            | Expression::TsOrDsToTimestamp(f)
1356            | Expression::YearOfWeek(f)
1357            | Expression::YearOfWeekIso(f)
1358            | Expression::Initcap(f)
1359            | Expression::Ascii(f)
1360            | Expression::Chr(f)
1361            | Expression::Soundex(f)
1362            | Expression::ByteLength(f)
1363            | Expression::Hex(f)
1364            | Expression::LowerHex(f)
1365            | Expression::Unicode(f)
1366            | Expression::Radians(f)
1367            | Expression::Degrees(f)
1368            | Expression::Sin(f)
1369            | Expression::Cos(f)
1370            | Expression::Tan(f)
1371            | Expression::Asin(f)
1372            | Expression::Acos(f)
1373            | Expression::Atan(f)
1374            | Expression::IsNan(f)
1375            | Expression::IsInf(f)
1376            | Expression::ArrayLength(f)
1377            | Expression::ArraySize(f)
1378            | Expression::Cardinality(f)
1379            | Expression::ArrayReverse(f)
1380            | Expression::ArrayDistinct(f)
1381            | Expression::ArrayFlatten(f)
1382            | Expression::ArrayCompact(f)
1383            | Expression::Explode(f)
1384            | Expression::ExplodeOuter(f)
1385            | Expression::ToArray(f)
1386            | Expression::MapFromEntries(f)
1387            | Expression::MapKeys(f)
1388            | Expression::MapValues(f)
1389            | Expression::JsonArrayLength(f)
1390            | Expression::JsonKeys(f)
1391            | Expression::JsonType(f)
1392            | Expression::ParseJson(f)
1393            | Expression::ToJson(f)
1394            | Expression::Typeof(f)
1395            | Expression::BitwiseCount(f)
1396            | Expression::Year(f)
1397            | Expression::Month(f)
1398            | Expression::Day(f)
1399            | Expression::Hour(f)
1400            | Expression::Minute(f)
1401            | Expression::Second(f)
1402            | Expression::DayOfWeek(f)
1403            | Expression::DayOfWeekIso(f)
1404            | Expression::DayOfMonth(f)
1405            | Expression::DayOfYear(f)
1406            | Expression::WeekOfYear(f)
1407            | Expression::Quarter(f)
1408            | Expression::Epoch(f)
1409            | Expression::EpochMs(f)
1410            | Expression::TimeStrToUnix(f)
1411            | Expression::SHA(f)
1412            | Expression::SHA1Digest(f)
1413            | Expression::TimeToUnix(f)
1414            | Expression::JSONBool(f)
1415            | Expression::Int64(f)
1416            | Expression::MD5NumberLower64(f)
1417            | Expression::MD5NumberUpper64(f)
1418            | Expression::DateStrToDate(f)
1419            | Expression::DateToDateStr(f) => {
1420                stack.push(&f.this);
1421            }
1422
1423            // === BinaryFunc variants: this, expression ===
1424            Expression::Power(f)
1425            | Expression::NullIf(f)
1426            | Expression::IfNull(f)
1427            | Expression::Nvl(f)
1428            | Expression::UnixToTimeStr(f)
1429            | Expression::Contains(f)
1430            | Expression::StartsWith(f)
1431            | Expression::EndsWith(f)
1432            | Expression::Levenshtein(f)
1433            | Expression::ModFunc(f)
1434            | Expression::Atan2(f)
1435            | Expression::IntDiv(f)
1436            | Expression::AddMonths(f)
1437            | Expression::MonthsBetween(f)
1438            | Expression::NextDay(f)
1439            | Expression::ArrayContains(f)
1440            | Expression::ArrayPosition(f)
1441            | Expression::ArrayAppend(f)
1442            | Expression::ArrayPrepend(f)
1443            | Expression::ArrayUnion(f)
1444            | Expression::ArrayExcept(f)
1445            | Expression::ArrayRemove(f)
1446            | Expression::StarMap(f)
1447            | Expression::MapFromArrays(f)
1448            | Expression::MapContainsKey(f)
1449            | Expression::ElementAt(f)
1450            | Expression::JsonMergePatch(f)
1451            | Expression::JSONBContains(f)
1452            | Expression::JSONBExtract(f) => {
1453                stack.push(&f.this);
1454                stack.push(&f.expression);
1455            }
1456
1457            // === VarArgFunc variants: expressions ===
1458            Expression::Greatest(f)
1459            | Expression::Least(f)
1460            | Expression::Coalesce(f)
1461            | Expression::ArrayConcat(f)
1462            | Expression::ArrayIntersect(f)
1463            | Expression::ArrayZip(f)
1464            | Expression::MapConcat(f)
1465            | Expression::JsonArray(f) => {
1466                for e in &f.expressions {
1467                    stack.push(e);
1468                }
1469            }
1470
1471            // === AggFunc variants: this, filter, having_max, limit ===
1472            Expression::Sum(f)
1473            | Expression::Avg(f)
1474            | Expression::Min(f)
1475            | Expression::Max(f)
1476            | Expression::ArrayAgg(f)
1477            | Expression::CountIf(f)
1478            | Expression::Stddev(f)
1479            | Expression::StddevPop(f)
1480            | Expression::StddevSamp(f)
1481            | Expression::Variance(f)
1482            | Expression::VarPop(f)
1483            | Expression::VarSamp(f)
1484            | Expression::Median(f)
1485            | Expression::Mode(f)
1486            | Expression::First(f)
1487            | Expression::Last(f)
1488            | Expression::AnyValue(f)
1489            | Expression::ApproxDistinct(f)
1490            | Expression::ApproxCountDistinct(f)
1491            | Expression::LogicalAnd(f)
1492            | Expression::LogicalOr(f)
1493            | Expression::Skewness(f)
1494            | Expression::ArrayConcatAgg(f)
1495            | Expression::ArrayUniqueAgg(f)
1496            | Expression::BoolXorAgg(f)
1497            | Expression::BitwiseAndAgg(f)
1498            | Expression::BitwiseOrAgg(f)
1499            | Expression::BitwiseXorAgg(f) => {
1500                stack.push(&f.this);
1501                if let Some(ref filter) = f.filter {
1502                    stack.push(filter);
1503                }
1504                if let Some((ref expr, _)) = f.having_max {
1505                    stack.push(expr);
1506                }
1507                if let Some(ref limit) = f.limit {
1508                    stack.push(limit);
1509                }
1510            }
1511
1512            // === Generic Function / AggregateFunction: args ===
1513            Expression::Function(func) => {
1514                for arg in &func.args {
1515                    stack.push(arg);
1516                }
1517            }
1518            Expression::AggregateFunction(func) => {
1519                for arg in &func.args {
1520                    stack.push(arg);
1521                }
1522                if let Some(ref filter) = func.filter {
1523                    stack.push(filter);
1524                }
1525                if let Some(ref limit) = func.limit {
1526                    stack.push(limit);
1527                }
1528            }
1529
1530            // === WindowFunction: this (skip Over for lineage purposes) ===
1531            Expression::WindowFunction(wf) => {
1532                stack.push(&wf.this);
1533            }
1534
1535            // === Containers and special expressions ===
1536            Expression::Alias(a) => {
1537                stack.push(&a.this);
1538            }
1539            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1540                stack.push(&c.this);
1541                if let Some(ref fmt) = c.format {
1542                    stack.push(fmt);
1543                }
1544                if let Some(ref def) = c.default {
1545                    stack.push(def);
1546                }
1547            }
1548            Expression::Paren(p) => {
1549                stack.push(&p.this);
1550            }
1551            Expression::Annotated(a) => {
1552                stack.push(&a.this);
1553            }
1554            Expression::Case(case) => {
1555                if let Some(ref operand) = case.operand {
1556                    stack.push(operand);
1557                }
1558                for (cond, result) in &case.whens {
1559                    stack.push(cond);
1560                    stack.push(result);
1561                }
1562                if let Some(ref else_expr) = case.else_ {
1563                    stack.push(else_expr);
1564                }
1565            }
1566            Expression::Collation(c) => {
1567                stack.push(&c.this);
1568            }
1569            Expression::In(i) => {
1570                stack.push(&i.this);
1571                for e in &i.expressions {
1572                    stack.push(e);
1573                }
1574                if let Some(ref q) = i.query {
1575                    stack.push(q);
1576                }
1577                if let Some(ref u) = i.unnest {
1578                    stack.push(u);
1579                }
1580            }
1581            Expression::Between(b) => {
1582                stack.push(&b.this);
1583                stack.push(&b.low);
1584                stack.push(&b.high);
1585            }
1586            Expression::IsNull(n) => {
1587                stack.push(&n.this);
1588            }
1589            Expression::IsTrue(t) | Expression::IsFalse(t) => {
1590                stack.push(&t.this);
1591            }
1592            Expression::IsJson(j) => {
1593                stack.push(&j.this);
1594            }
1595            Expression::Like(l) | Expression::ILike(l) => {
1596                stack.push(&l.left);
1597                stack.push(&l.right);
1598                if let Some(ref esc) = l.escape {
1599                    stack.push(esc);
1600                }
1601            }
1602            Expression::SimilarTo(s) => {
1603                stack.push(&s.this);
1604                stack.push(&s.pattern);
1605                if let Some(ref esc) = s.escape {
1606                    stack.push(esc);
1607                }
1608            }
1609            Expression::Ordered(o) => {
1610                stack.push(&o.this);
1611            }
1612            Expression::Array(a) => {
1613                for e in &a.expressions {
1614                    stack.push(e);
1615                }
1616            }
1617            Expression::Tuple(t) => {
1618                for e in &t.expressions {
1619                    stack.push(e);
1620                }
1621            }
1622            Expression::Struct(s) => {
1623                for (_, e) in &s.fields {
1624                    stack.push(e);
1625                }
1626            }
1627            Expression::Subscript(s) => {
1628                stack.push(&s.this);
1629                stack.push(&s.index);
1630            }
1631            Expression::Dot(d) => {
1632                stack.push(&d.this);
1633            }
1634            Expression::MethodCall(m) => {
1635                stack.push(&m.this);
1636                for arg in &m.args {
1637                    stack.push(arg);
1638                }
1639            }
1640            Expression::ArraySlice(s) => {
1641                stack.push(&s.this);
1642                if let Some(ref start) = s.start {
1643                    stack.push(start);
1644                }
1645                if let Some(ref end) = s.end {
1646                    stack.push(end);
1647                }
1648            }
1649            Expression::Lambda(l) => {
1650                stack.push(&l.body);
1651            }
1652            Expression::NamedArgument(n) => {
1653                stack.push(&n.value);
1654            }
1655            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
1656                stack.push(e);
1657            }
1658
1659            // === Custom function structs ===
1660            Expression::Substring(f) => {
1661                stack.push(&f.this);
1662                stack.push(&f.start);
1663                if let Some(ref len) = f.length {
1664                    stack.push(len);
1665                }
1666            }
1667            Expression::Trim(f) => {
1668                stack.push(&f.this);
1669                if let Some(ref chars) = f.characters {
1670                    stack.push(chars);
1671                }
1672            }
1673            Expression::Replace(f) => {
1674                stack.push(&f.this);
1675                stack.push(&f.old);
1676                stack.push(&f.new);
1677            }
1678            Expression::IfFunc(f) => {
1679                stack.push(&f.condition);
1680                stack.push(&f.true_value);
1681                if let Some(ref fv) = f.false_value {
1682                    stack.push(fv);
1683                }
1684            }
1685            Expression::Nvl2(f) => {
1686                stack.push(&f.this);
1687                stack.push(&f.true_value);
1688                stack.push(&f.false_value);
1689            }
1690            Expression::ConcatWs(f) => {
1691                stack.push(&f.separator);
1692                for e in &f.expressions {
1693                    stack.push(e);
1694                }
1695            }
1696            Expression::Count(f) => {
1697                if let Some(ref this) = f.this {
1698                    stack.push(this);
1699                }
1700                if let Some(ref filter) = f.filter {
1701                    stack.push(filter);
1702                }
1703            }
1704            Expression::GroupConcat(f) => {
1705                stack.push(&f.this);
1706                if let Some(ref sep) = f.separator {
1707                    stack.push(sep);
1708                }
1709                if let Some(ref filter) = f.filter {
1710                    stack.push(filter);
1711                }
1712            }
1713            Expression::StringAgg(f) => {
1714                stack.push(&f.this);
1715                if let Some(ref sep) = f.separator {
1716                    stack.push(sep);
1717                }
1718                if let Some(ref filter) = f.filter {
1719                    stack.push(filter);
1720                }
1721                if let Some(ref limit) = f.limit {
1722                    stack.push(limit);
1723                }
1724            }
1725            Expression::ListAgg(f) => {
1726                stack.push(&f.this);
1727                if let Some(ref sep) = f.separator {
1728                    stack.push(sep);
1729                }
1730                if let Some(ref filter) = f.filter {
1731                    stack.push(filter);
1732                }
1733            }
1734            Expression::SumIf(f) => {
1735                stack.push(&f.this);
1736                stack.push(&f.condition);
1737                if let Some(ref filter) = f.filter {
1738                    stack.push(filter);
1739                }
1740            }
1741            Expression::DateAdd(f) | Expression::DateSub(f) => {
1742                stack.push(&f.this);
1743                stack.push(&f.interval);
1744            }
1745            Expression::DateDiff(f) => {
1746                stack.push(&f.this);
1747                stack.push(&f.expression);
1748            }
1749            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
1750                stack.push(&f.this);
1751            }
1752            Expression::Extract(f) => {
1753                stack.push(&f.this);
1754            }
1755            Expression::Round(f) => {
1756                stack.push(&f.this);
1757                if let Some(ref d) = f.decimals {
1758                    stack.push(d);
1759                }
1760            }
1761            Expression::Floor(f) => {
1762                stack.push(&f.this);
1763                if let Some(ref s) = f.scale {
1764                    stack.push(s);
1765                }
1766                if let Some(ref t) = f.to {
1767                    stack.push(t);
1768                }
1769            }
1770            Expression::Ceil(f) => {
1771                stack.push(&f.this);
1772                if let Some(ref d) = f.decimals {
1773                    stack.push(d);
1774                }
1775                if let Some(ref t) = f.to {
1776                    stack.push(t);
1777                }
1778            }
1779            Expression::Log(f) => {
1780                stack.push(&f.this);
1781                if let Some(ref b) = f.base {
1782                    stack.push(b);
1783                }
1784            }
1785            Expression::AtTimeZone(f) => {
1786                stack.push(&f.this);
1787                stack.push(&f.zone);
1788            }
1789            Expression::Lead(f) | Expression::Lag(f) => {
1790                stack.push(&f.this);
1791                if let Some(ref off) = f.offset {
1792                    stack.push(off);
1793                }
1794                if let Some(ref def) = f.default {
1795                    stack.push(def);
1796                }
1797            }
1798            Expression::FirstValue(f) | Expression::LastValue(f) => {
1799                stack.push(&f.this);
1800            }
1801            Expression::NthValue(f) => {
1802                stack.push(&f.this);
1803                stack.push(&f.offset);
1804            }
1805            Expression::Position(f) => {
1806                stack.push(&f.substring);
1807                stack.push(&f.string);
1808                if let Some(ref start) = f.start {
1809                    stack.push(start);
1810                }
1811            }
1812            Expression::Decode(f) => {
1813                stack.push(&f.this);
1814                for (search, result) in &f.search_results {
1815                    stack.push(search);
1816                    stack.push(result);
1817                }
1818                if let Some(ref def) = f.default {
1819                    stack.push(def);
1820                }
1821            }
1822            Expression::CharFunc(f) => {
1823                for arg in &f.args {
1824                    stack.push(arg);
1825                }
1826            }
1827            Expression::ArraySort(f) => {
1828                stack.push(&f.this);
1829                if let Some(ref cmp) = f.comparator {
1830                    stack.push(cmp);
1831                }
1832            }
1833            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
1834                stack.push(&f.this);
1835                stack.push(&f.separator);
1836                if let Some(ref nr) = f.null_replacement {
1837                    stack.push(nr);
1838                }
1839            }
1840            Expression::ArrayFilter(f) => {
1841                stack.push(&f.this);
1842                stack.push(&f.filter);
1843            }
1844            Expression::ArrayTransform(f) => {
1845                stack.push(&f.this);
1846                stack.push(&f.transform);
1847            }
1848            Expression::Sequence(f)
1849            | Expression::Generate(f)
1850            | Expression::ExplodingGenerateSeries(f) => {
1851                stack.push(&f.start);
1852                stack.push(&f.stop);
1853                if let Some(ref step) = f.step {
1854                    stack.push(step);
1855                }
1856            }
1857            Expression::JsonExtract(f)
1858            | Expression::JsonExtractScalar(f)
1859            | Expression::JsonQuery(f)
1860            | Expression::JsonValue(f) => {
1861                stack.push(&f.this);
1862                stack.push(&f.path);
1863            }
1864            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
1865                stack.push(&f.this);
1866                for p in &f.paths {
1867                    stack.push(p);
1868                }
1869            }
1870            Expression::JsonObject(f) => {
1871                for (k, v) in &f.pairs {
1872                    stack.push(k);
1873                    stack.push(v);
1874                }
1875            }
1876            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
1877                stack.push(&f.this);
1878                for (path, val) in &f.path_values {
1879                    stack.push(path);
1880                    stack.push(val);
1881                }
1882            }
1883            Expression::Overlay(f) => {
1884                stack.push(&f.this);
1885                stack.push(&f.replacement);
1886                stack.push(&f.from);
1887                if let Some(ref len) = f.length {
1888                    stack.push(len);
1889                }
1890            }
1891            Expression::Convert(f) => {
1892                stack.push(&f.this);
1893                if let Some(ref style) = f.style {
1894                    stack.push(style);
1895                }
1896            }
1897            Expression::ApproxPercentile(f) => {
1898                stack.push(&f.this);
1899                stack.push(&f.percentile);
1900                if let Some(ref acc) = f.accuracy {
1901                    stack.push(acc);
1902                }
1903                if let Some(ref filter) = f.filter {
1904                    stack.push(filter);
1905                }
1906            }
1907            Expression::Percentile(f)
1908            | Expression::PercentileCont(f)
1909            | Expression::PercentileDisc(f) => {
1910                stack.push(&f.this);
1911                stack.push(&f.percentile);
1912                if let Some(ref filter) = f.filter {
1913                    stack.push(filter);
1914                }
1915            }
1916            Expression::WithinGroup(f) => {
1917                stack.push(&f.this);
1918            }
1919            Expression::Left(f) | Expression::Right(f) => {
1920                stack.push(&f.this);
1921                stack.push(&f.length);
1922            }
1923            Expression::Repeat(f) => {
1924                stack.push(&f.this);
1925                stack.push(&f.times);
1926            }
1927            Expression::Lpad(f) | Expression::Rpad(f) => {
1928                stack.push(&f.this);
1929                stack.push(&f.length);
1930                if let Some(ref fill) = f.fill {
1931                    stack.push(fill);
1932                }
1933            }
1934            Expression::Split(f) => {
1935                stack.push(&f.this);
1936                stack.push(&f.delimiter);
1937            }
1938            Expression::RegexpLike(f) => {
1939                stack.push(&f.this);
1940                stack.push(&f.pattern);
1941                if let Some(ref flags) = f.flags {
1942                    stack.push(flags);
1943                }
1944            }
1945            Expression::RegexpReplace(f) => {
1946                stack.push(&f.this);
1947                stack.push(&f.pattern);
1948                stack.push(&f.replacement);
1949                if let Some(ref flags) = f.flags {
1950                    stack.push(flags);
1951                }
1952            }
1953            Expression::RegexpExtract(f) => {
1954                stack.push(&f.this);
1955                stack.push(&f.pattern);
1956                if let Some(ref group) = f.group {
1957                    stack.push(group);
1958                }
1959            }
1960            Expression::ToDate(f) => {
1961                stack.push(&f.this);
1962                if let Some(ref fmt) = f.format {
1963                    stack.push(fmt);
1964                }
1965            }
1966            Expression::ToTimestamp(f) => {
1967                stack.push(&f.this);
1968                if let Some(ref fmt) = f.format {
1969                    stack.push(fmt);
1970                }
1971            }
1972            Expression::DateFormat(f) | Expression::FormatDate(f) => {
1973                stack.push(&f.this);
1974                stack.push(&f.format);
1975            }
1976            Expression::LastDay(f) => {
1977                stack.push(&f.this);
1978            }
1979            Expression::FromUnixtime(f) => {
1980                stack.push(&f.this);
1981                if let Some(ref fmt) = f.format {
1982                    stack.push(fmt);
1983                }
1984            }
1985            Expression::UnixTimestamp(f) => {
1986                if let Some(ref this) = f.this {
1987                    stack.push(this);
1988                }
1989                if let Some(ref fmt) = f.format {
1990                    stack.push(fmt);
1991                }
1992            }
1993            Expression::MakeDate(f) => {
1994                stack.push(&f.year);
1995                stack.push(&f.month);
1996                stack.push(&f.day);
1997            }
1998            Expression::MakeTimestamp(f) => {
1999                stack.push(&f.year);
2000                stack.push(&f.month);
2001                stack.push(&f.day);
2002                stack.push(&f.hour);
2003                stack.push(&f.minute);
2004                stack.push(&f.second);
2005                if let Some(ref tz) = f.timezone {
2006                    stack.push(tz);
2007                }
2008            }
2009            Expression::TruncFunc(f) => {
2010                stack.push(&f.this);
2011                if let Some(ref d) = f.decimals {
2012                    stack.push(d);
2013                }
2014            }
2015            Expression::ArrayFunc(f) => {
2016                for e in &f.expressions {
2017                    stack.push(e);
2018                }
2019            }
2020            Expression::Unnest(f) => {
2021                stack.push(&f.this);
2022                for e in &f.expressions {
2023                    stack.push(e);
2024                }
2025            }
2026            Expression::StructFunc(f) => {
2027                for (_, e) in &f.fields {
2028                    stack.push(e);
2029                }
2030            }
2031            Expression::StructExtract(f) => {
2032                stack.push(&f.this);
2033            }
2034            Expression::NamedStruct(f) => {
2035                for (k, v) in &f.pairs {
2036                    stack.push(k);
2037                    stack.push(v);
2038                }
2039            }
2040            Expression::MapFunc(f) => {
2041                for k in &f.keys {
2042                    stack.push(k);
2043                }
2044                for v in &f.values {
2045                    stack.push(v);
2046                }
2047            }
2048            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
2049                stack.push(&f.this);
2050                stack.push(&f.transform);
2051            }
2052            Expression::JsonArrayAgg(f) => {
2053                stack.push(&f.this);
2054                if let Some(ref filter) = f.filter {
2055                    stack.push(filter);
2056                }
2057            }
2058            Expression::JsonObjectAgg(f) => {
2059                stack.push(&f.key);
2060                stack.push(&f.value);
2061                if let Some(ref filter) = f.filter {
2062                    stack.push(filter);
2063                }
2064            }
2065            Expression::NTile(f) => {
2066                if let Some(ref n) = f.num_buckets {
2067                    stack.push(n);
2068                }
2069            }
2070            Expression::Rand(f) => {
2071                if let Some(ref s) = f.seed {
2072                    stack.push(s);
2073                }
2074                if let Some(ref lo) = f.lower {
2075                    stack.push(lo);
2076                }
2077                if let Some(ref hi) = f.upper {
2078                    stack.push(hi);
2079                }
2080            }
2081            Expression::Any(q) | Expression::All(q) => {
2082                stack.push(&q.this);
2083                stack.push(&q.subquery);
2084            }
2085            Expression::Overlaps(o) => {
2086                if let Some(ref this) = o.this {
2087                    stack.push(this);
2088                }
2089                if let Some(ref expr) = o.expression {
2090                    stack.push(expr);
2091                }
2092                if let Some(ref ls) = o.left_start {
2093                    stack.push(ls);
2094                }
2095                if let Some(ref le) = o.left_end {
2096                    stack.push(le);
2097                }
2098                if let Some(ref rs) = o.right_start {
2099                    stack.push(rs);
2100                }
2101                if let Some(ref re) = o.right_end {
2102                    stack.push(re);
2103                }
2104            }
2105            Expression::Interval(i) => {
2106                if let Some(ref this) = i.this {
2107                    stack.push(this);
2108                }
2109            }
2110            Expression::TimeStrToTime(f) => {
2111                stack.push(&f.this);
2112                if let Some(ref zone) = f.zone {
2113                    stack.push(zone);
2114                }
2115            }
2116            Expression::JSONBExtractScalar(f) => {
2117                stack.push(&f.this);
2118                stack.push(&f.expression);
2119                if let Some(ref jt) = f.json_type {
2120                    stack.push(jt);
2121                }
2122            }
2123
2124            // === True leaves and non-expression-bearing nodes ===
2125            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
2126            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
2127            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
2128            _ => {}
2129        }
2130    }
2131}
2132
2133// ---------------------------------------------------------------------------
2134// Tests
2135// ---------------------------------------------------------------------------
2136
2137#[cfg(test)]
2138mod tests {
2139    use super::*;
2140    use crate::dialects::{Dialect, DialectType};
2141    use crate::expressions::DataType;
2142    use crate::optimizer::annotate_types::annotate_types;
2143    use crate::parse_one;
2144    use crate::schema::{MappingSchema, Schema};
2145
2146    fn parse(sql: &str) -> Expression {
2147        let dialect = Dialect::get(DialectType::Generic);
2148        let ast = dialect.parse(sql).unwrap();
2149        ast.into_iter().next().unwrap()
2150    }
2151
2152    #[test]
2153    fn test_simple_lineage() {
2154        let expr = parse("SELECT a FROM t");
2155        let node = lineage("a", &expr, None, false).unwrap();
2156
2157        assert_eq!(node.name, "a");
2158        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
2159        // Should trace to t.a
2160        let names = node.downstream_names();
2161        assert!(
2162            names.iter().any(|n| n == "t.a"),
2163            "Expected t.a in downstream, got: {:?}",
2164            names
2165        );
2166    }
2167
2168    #[test]
2169    fn test_lineage_walk() {
2170        let root = LineageNode {
2171            name: "col_a".to_string(),
2172            expression: Expression::Null(crate::expressions::Null),
2173            source: Expression::Null(crate::expressions::Null),
2174            downstream: vec![LineageNode::new(
2175                "t.a",
2176                Expression::Null(crate::expressions::Null),
2177                Expression::Null(crate::expressions::Null),
2178            )],
2179            source_name: String::new(),
2180            reference_node_name: String::new(),
2181        };
2182
2183        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
2184        assert_eq!(names.len(), 2);
2185        assert_eq!(names[0], "col_a");
2186        assert_eq!(names[1], "t.a");
2187    }
2188
2189    #[test]
2190    fn test_aliased_column() {
2191        let expr = parse("SELECT a + 1 AS b FROM t");
2192        let node = lineage("b", &expr, None, false).unwrap();
2193
2194        assert_eq!(node.name, "b");
2195        // Should trace through the expression to t.a
2196        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2197        assert!(
2198            all_names.iter().any(|n| n.contains("a")),
2199            "Expected to trace to column a, got: {:?}",
2200            all_names
2201        );
2202    }
2203
2204    #[test]
2205    fn test_qualified_column() {
2206        let expr = parse("SELECT t.a FROM t");
2207        let node = lineage("a", &expr, None, false).unwrap();
2208
2209        assert_eq!(node.name, "a");
2210        let names = node.downstream_names();
2211        assert!(
2212            names.iter().any(|n| n == "t.a"),
2213            "Expected t.a, got: {:?}",
2214            names
2215        );
2216    }
2217
2218    #[test]
2219    fn test_unqualified_column() {
2220        let expr = parse("SELECT a FROM t");
2221        let node = lineage("a", &expr, None, false).unwrap();
2222
2223        // Unqualified but single source → resolved to t.a
2224        let names = node.downstream_names();
2225        assert!(
2226            names.iter().any(|n| n == "t.a"),
2227            "Expected t.a, got: {:?}",
2228            names
2229        );
2230    }
2231
2232    #[test]
2233    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
2234        let query = "SELECT name FROM users";
2235        let dialect = Dialect::get(DialectType::BigQuery);
2236        let expr = dialect
2237            .parse(query)
2238            .unwrap()
2239            .into_iter()
2240            .next()
2241            .expect("expected one expression");
2242
2243        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2244        schema
2245            .add_table("users", &[("name".into(), DataType::Text)], None)
2246            .expect("schema setup");
2247
2248        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
2249            .expect("lineage without schema");
2250        let mut expr_without = node_without_schema.expression.clone();
2251        annotate_types(
2252            &mut expr_without,
2253            Some(&schema),
2254            Some(DialectType::BigQuery),
2255        );
2256        assert_eq!(
2257            expr_without.inferred_type(),
2258            None,
2259            "Expected unresolved root type without schema-aware lineage qualification"
2260        );
2261
2262        let node_with_schema = lineage_with_schema(
2263            "name",
2264            &expr,
2265            Some(&schema),
2266            Some(DialectType::BigQuery),
2267            false,
2268        )
2269        .expect("lineage with schema");
2270        let mut expr_with = node_with_schema.expression.clone();
2271        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
2272
2273        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
2274    }
2275
2276    #[test]
2277    fn test_lineage_with_schema_correlated_scalar_subquery() {
2278        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
2279        let dialect = Dialect::get(DialectType::BigQuery);
2280        let expr = dialect
2281            .parse(query)
2282            .unwrap()
2283            .into_iter()
2284            .next()
2285            .expect("expected one expression");
2286
2287        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2288        schema
2289            .add_table(
2290                "t1",
2291                &[("id".into(), DataType::BigInt { length: None })],
2292                None,
2293            )
2294            .expect("schema setup");
2295        schema
2296            .add_table(
2297                "t2",
2298                &[
2299                    ("id".into(), DataType::BigInt { length: None }),
2300                    ("val".into(), DataType::BigInt { length: None }),
2301                ],
2302                None,
2303            )
2304            .expect("schema setup");
2305
2306        let node = lineage_with_schema(
2307            "id",
2308            &expr,
2309            Some(&schema),
2310            Some(DialectType::BigQuery),
2311            false,
2312        )
2313        .expect("lineage_with_schema should handle correlated scalar subqueries");
2314
2315        assert_eq!(node.name, "id");
2316    }
2317
2318    #[test]
2319    fn test_lineage_with_schema_join_using() {
2320        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
2321        let dialect = Dialect::get(DialectType::BigQuery);
2322        let expr = dialect
2323            .parse(query)
2324            .unwrap()
2325            .into_iter()
2326            .next()
2327            .expect("expected one expression");
2328
2329        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2330        schema
2331            .add_table(
2332                "t1",
2333                &[("a".into(), DataType::BigInt { length: None })],
2334                None,
2335            )
2336            .expect("schema setup");
2337        schema
2338            .add_table(
2339                "t2",
2340                &[("a".into(), DataType::BigInt { length: None })],
2341                None,
2342            )
2343            .expect("schema setup");
2344
2345        let node = lineage_with_schema(
2346            "a",
2347            &expr,
2348            Some(&schema),
2349            Some(DialectType::BigQuery),
2350            false,
2351        )
2352        .expect("lineage_with_schema should handle JOIN USING");
2353
2354        assert_eq!(node.name, "a");
2355    }
2356
2357    #[test]
2358    fn test_lineage_with_schema_qualified_table_name() {
2359        let query = "SELECT a FROM raw.t1";
2360        let dialect = Dialect::get(DialectType::BigQuery);
2361        let expr = dialect
2362            .parse(query)
2363            .unwrap()
2364            .into_iter()
2365            .next()
2366            .expect("expected one expression");
2367
2368        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2369        schema
2370            .add_table(
2371                "raw.t1",
2372                &[("a".into(), DataType::BigInt { length: None })],
2373                None,
2374            )
2375            .expect("schema setup");
2376
2377        let node = lineage_with_schema(
2378            "a",
2379            &expr,
2380            Some(&schema),
2381            Some(DialectType::BigQuery),
2382            false,
2383        )
2384        .expect("lineage_with_schema should handle dotted schema.table names");
2385
2386        assert_eq!(node.name, "a");
2387    }
2388
2389    #[test]
2390    fn test_lineage_with_schema_none_matches_lineage() {
2391        let expr = parse("SELECT a FROM t");
2392        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
2393        let with_none =
2394            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
2395
2396        assert_eq!(with_none.name, baseline.name);
2397        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
2398    }
2399
2400    #[test]
2401    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2402        let dialect = Dialect::get(DialectType::BigQuery);
2403        let expr = dialect
2404            .parse("SELECT Name AS name FROM teams")
2405            .unwrap()
2406            .into_iter()
2407            .next()
2408            .expect("expected one expression");
2409
2410        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2411        schema
2412            .add_table(
2413                "teams",
2414                &[("Name".into(), DataType::String { length: None })],
2415                None,
2416            )
2417            .expect("schema setup");
2418
2419        let node = lineage_with_schema(
2420            "name",
2421            &expr,
2422            Some(&schema),
2423            Some(DialectType::BigQuery),
2424            false,
2425        )
2426        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2427
2428        let names = node.downstream_names();
2429        assert!(
2430            names.iter().any(|n| n == "teams.Name"),
2431            "Expected teams.Name in downstream, got: {:?}",
2432            names
2433        );
2434    }
2435
2436    #[test]
2437    fn test_lineage_bigquery_mixed_case_alias_lookup() {
2438        let dialect = Dialect::get(DialectType::BigQuery);
2439        let expr = dialect
2440            .parse("SELECT Name AS Name FROM teams")
2441            .unwrap()
2442            .into_iter()
2443            .next()
2444            .expect("expected one expression");
2445
2446        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2447            .expect("lineage should resolve mixed-case aliases in BigQuery");
2448
2449        assert_eq!(node.name, "name");
2450    }
2451
2452    #[test]
2453    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
2454        let expr = parse_one(
2455            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2456            DialectType::Snowflake,
2457        )
2458        .expect("parse");
2459
2460        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2461        schema
2462            .add_table(
2463                "fact.some_daily_metrics",
2464                &[("date_utc".to_string(), DataType::Date)],
2465                None,
2466            )
2467            .expect("schema setup");
2468
2469        let node = lineage_with_schema(
2470            "recency",
2471            &expr,
2472            Some(&schema),
2473            Some(DialectType::Snowflake),
2474            false,
2475        )
2476        .expect("lineage_with_schema should not treat date part as a column");
2477
2478        let names = node.downstream_names();
2479        assert!(
2480            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2481            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2482            names
2483        );
2484        assert!(
2485            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2486            "Did not expect date part to appear as lineage column, got: {:?}",
2487            names
2488        );
2489    }
2490
2491    #[test]
2492    fn test_snowflake_datediff_parses_to_typed_ast() {
2493        let expr = parse_one(
2494            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2495            DialectType::Snowflake,
2496        )
2497        .expect("parse");
2498
2499        match expr {
2500            Expression::Select(select) => match &select.expressions[0] {
2501                Expression::Alias(alias) => match &alias.this {
2502                    Expression::DateDiff(f) => {
2503                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
2504                    }
2505                    other => panic!("expected DateDiff, got {other:?}"),
2506                },
2507                other => panic!("expected Alias, got {other:?}"),
2508            },
2509            other => panic!("expected Select, got {other:?}"),
2510        }
2511    }
2512
2513    #[test]
2514    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
2515        let expr = parse_one(
2516            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2517            DialectType::Snowflake,
2518        )
2519        .expect("parse");
2520
2521        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2522        schema
2523            .add_table(
2524                "fact.some_daily_metrics",
2525                &[("date_utc".to_string(), DataType::Date)],
2526                None,
2527            )
2528            .expect("schema setup");
2529
2530        let node = lineage_with_schema(
2531            "next_day",
2532            &expr,
2533            Some(&schema),
2534            Some(DialectType::Snowflake),
2535            false,
2536        )
2537        .expect("lineage_with_schema should not treat DATEADD date part as a column");
2538
2539        let names = node.downstream_names();
2540        assert!(
2541            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2542            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2543            names
2544        );
2545        assert!(
2546            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2547            "Did not expect date part to appear as lineage column, got: {:?}",
2548            names
2549        );
2550    }
2551
2552    #[test]
2553    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
2554        let expr = parse_one(
2555            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2556            DialectType::Snowflake,
2557        )
2558        .expect("parse");
2559
2560        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2561        schema
2562            .add_table(
2563                "fact.some_daily_metrics",
2564                &[("date_utc".to_string(), DataType::Date)],
2565                None,
2566            )
2567            .expect("schema setup");
2568
2569        let node = lineage_with_schema(
2570            "day_part",
2571            &expr,
2572            Some(&schema),
2573            Some(DialectType::Snowflake),
2574            false,
2575        )
2576        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
2577
2578        let names = node.downstream_names();
2579        assert!(
2580            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2581            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2582            names
2583        );
2584        assert!(
2585            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2586            "Did not expect date part to appear as lineage column, got: {:?}",
2587            names
2588        );
2589    }
2590
2591    #[test]
2592    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
2593        let expr = parse_one(
2594            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2595            DialectType::Snowflake,
2596        )
2597        .expect("parse");
2598
2599        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2600        schema
2601            .add_table(
2602                "fact.some_daily_metrics",
2603                &[("date_utc".to_string(), DataType::Date)],
2604                None,
2605            )
2606            .expect("schema setup");
2607
2608        let node = lineage_with_schema(
2609            "day_part",
2610            &expr,
2611            Some(&schema),
2612            Some(DialectType::Snowflake),
2613            false,
2614        )
2615        .expect("quoted DATE_PART should continue to work");
2616
2617        let names = node.downstream_names();
2618        assert!(
2619            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2620            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2621            names
2622        );
2623    }
2624
2625    #[test]
2626    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
2627        let expr = parse_one(
2628            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2629            DialectType::Snowflake,
2630        )
2631        .expect("parse");
2632
2633        match expr {
2634            Expression::Select(select) => match &select.expressions[0] {
2635                Expression::Alias(alias) => match &alias.this {
2636                    Expression::Function(f) => {
2637                        assert_eq!(f.name.to_uppercase(), "DATEADD");
2638                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2639                    }
2640                    other => panic!("expected generic DATEADD function, got {other:?}"),
2641                },
2642                other => panic!("expected Alias, got {other:?}"),
2643            },
2644            other => panic!("expected Select, got {other:?}"),
2645        }
2646    }
2647
2648    #[test]
2649    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
2650        let expr = parse_one(
2651            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2652            DialectType::Snowflake,
2653        )
2654        .expect("parse");
2655
2656        match expr {
2657            Expression::Select(select) => match &select.expressions[0] {
2658                Expression::Alias(alias) => match &alias.this {
2659                    Expression::Function(f) => {
2660                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2661                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2662                    }
2663                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2664                },
2665                other => panic!("expected Alias, got {other:?}"),
2666            },
2667            other => panic!("expected Select, got {other:?}"),
2668        }
2669    }
2670
2671    #[test]
2672    fn test_snowflake_date_part_string_literal_stays_generic_function() {
2673        let expr = parse_one(
2674            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2675            DialectType::Snowflake,
2676        )
2677        .expect("parse");
2678
2679        match expr {
2680            Expression::Select(select) => match &select.expressions[0] {
2681                Expression::Alias(alias) => match &alias.this {
2682                    Expression::Function(f) => {
2683                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2684                    }
2685                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2686                },
2687                other => panic!("expected Alias, got {other:?}"),
2688            },
2689            other => panic!("expected Select, got {other:?}"),
2690        }
2691    }
2692
2693    #[test]
2694    fn test_lineage_join() {
2695        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2696
2697        let node_a = lineage("a", &expr, None, false).unwrap();
2698        let names_a = node_a.downstream_names();
2699        assert!(
2700            names_a.iter().any(|n| n == "t.a"),
2701            "Expected t.a, got: {:?}",
2702            names_a
2703        );
2704
2705        let node_b = lineage("b", &expr, None, false).unwrap();
2706        let names_b = node_b.downstream_names();
2707        assert!(
2708            names_b.iter().any(|n| n == "s.b"),
2709            "Expected s.b, got: {:?}",
2710            names_b
2711        );
2712    }
2713
2714    #[test]
2715    fn test_lineage_alias_leaf_has_resolved_source_name() {
2716        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
2717        let node = lineage("col1", &expr, None, false).unwrap();
2718
2719        // Keep alias in the display lineage edge.
2720        let names = node.downstream_names();
2721        assert!(
2722            names.iter().any(|n| n == "t1.col1"),
2723            "Expected aliased column edge t1.col1, got: {:?}",
2724            names
2725        );
2726
2727        // Leaf should expose the resolved base table for consumers.
2728        let leaf = node
2729            .downstream
2730            .iter()
2731            .find(|n| n.name == "t1.col1")
2732            .expect("Expected t1.col1 leaf");
2733        assert_eq!(leaf.source_name, "table1");
2734        match &leaf.source {
2735            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
2736            _ => panic!("Expected leaf source to be a table expression"),
2737        }
2738    }
2739
2740    #[test]
2741    fn test_lineage_derived_table() {
2742        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
2743        let node = lineage("a", &expr, None, false).unwrap();
2744
2745        assert_eq!(node.name, "a");
2746        // Should trace through the derived table to t.a
2747        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2748        assert!(
2749            all_names.iter().any(|n| n == "t.a"),
2750            "Expected to trace through derived table to t.a, got: {:?}",
2751            all_names
2752        );
2753    }
2754
2755    #[test]
2756    fn test_lineage_cte() {
2757        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
2758        let node = lineage("a", &expr, None, false).unwrap();
2759
2760        assert_eq!(node.name, "a");
2761        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2762        assert!(
2763            all_names.iter().any(|n| n == "t.a"),
2764            "Expected to trace through CTE to t.a, got: {:?}",
2765            all_names
2766        );
2767    }
2768
2769    #[test]
2770    fn test_lineage_union() {
2771        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
2772        let node = lineage("a", &expr, None, false).unwrap();
2773
2774        assert_eq!(node.name, "a");
2775        // Should have 2 downstream branches
2776        assert_eq!(
2777            node.downstream.len(),
2778            2,
2779            "Expected 2 branches for UNION, got {}",
2780            node.downstream.len()
2781        );
2782    }
2783
2784    #[test]
2785    fn test_lineage_cte_union() {
2786        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
2787        let node = lineage("a", &expr, None, false).unwrap();
2788
2789        // Should trace through CTE into both UNION branches
2790        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2791        assert!(
2792            all_names.len() >= 3,
2793            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
2794            all_names
2795        );
2796    }
2797
2798    #[test]
2799    fn test_lineage_star() {
2800        let expr = parse("SELECT * FROM t");
2801        let node = lineage("*", &expr, None, false).unwrap();
2802
2803        assert_eq!(node.name, "*");
2804        // Should have downstream for table t
2805        assert!(
2806            !node.downstream.is_empty(),
2807            "Star should produce downstream nodes"
2808        );
2809    }
2810
2811    #[test]
2812    fn test_lineage_subquery_in_select() {
2813        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
2814        let node = lineage("x", &expr, None, false).unwrap();
2815
2816        assert_eq!(node.name, "x");
2817        // Should have traced into the scalar subquery
2818        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2819        assert!(
2820            all_names.len() >= 2,
2821            "Expected tracing into scalar subquery, got: {:?}",
2822            all_names
2823        );
2824    }
2825
2826    #[test]
2827    fn test_lineage_multiple_columns() {
2828        let expr = parse("SELECT a, b FROM t");
2829
2830        let node_a = lineage("a", &expr, None, false).unwrap();
2831        let node_b = lineage("b", &expr, None, false).unwrap();
2832
2833        assert_eq!(node_a.name, "a");
2834        assert_eq!(node_b.name, "b");
2835
2836        // Each should trace independently
2837        let names_a = node_a.downstream_names();
2838        let names_b = node_b.downstream_names();
2839        assert!(names_a.iter().any(|n| n == "t.a"));
2840        assert!(names_b.iter().any(|n| n == "t.b"));
2841    }
2842
2843    #[test]
2844    fn test_get_source_tables() {
2845        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2846        let node = lineage("a", &expr, None, false).unwrap();
2847
2848        let tables = get_source_tables(&node);
2849        assert!(
2850            tables.contains("t"),
2851            "Expected source table 't', got: {:?}",
2852            tables
2853        );
2854    }
2855
2856    #[test]
2857    fn test_lineage_column_not_found() {
2858        let expr = parse("SELECT a FROM t");
2859        let result = lineage("nonexistent", &expr, None, false);
2860        assert!(result.is_err());
2861    }
2862
2863    #[test]
2864    fn test_lineage_nested_cte() {
2865        let expr = parse(
2866            "WITH cte1 AS (SELECT a FROM t), \
2867             cte2 AS (SELECT a FROM cte1) \
2868             SELECT a FROM cte2",
2869        );
2870        let node = lineage("a", &expr, None, false).unwrap();
2871
2872        // Should trace through cte2 → cte1 → t
2873        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2874        assert!(
2875            all_names.len() >= 3,
2876            "Expected to trace through nested CTEs, got: {:?}",
2877            all_names
2878        );
2879    }
2880
2881    #[test]
2882    fn test_trim_selects_true() {
2883        let expr = parse("SELECT a, b, c FROM t");
2884        let node = lineage("a", &expr, None, true).unwrap();
2885
2886        // The source should be trimmed to only include 'a'
2887        if let Expression::Select(select) = &node.source {
2888            assert_eq!(
2889                select.expressions.len(),
2890                1,
2891                "Trimmed source should have 1 expression, got {}",
2892                select.expressions.len()
2893            );
2894        } else {
2895            panic!("Expected Select source");
2896        }
2897    }
2898
2899    #[test]
2900    fn test_trim_selects_false() {
2901        let expr = parse("SELECT a, b, c FROM t");
2902        let node = lineage("a", &expr, None, false).unwrap();
2903
2904        // The source should keep all columns
2905        if let Expression::Select(select) = &node.source {
2906            assert_eq!(
2907                select.expressions.len(),
2908                3,
2909                "Untrimmed source should have 3 expressions"
2910            );
2911        } else {
2912            panic!("Expected Select source");
2913        }
2914    }
2915
2916    #[test]
2917    fn test_lineage_expression_in_select() {
2918        let expr = parse("SELECT a + b AS c FROM t");
2919        let node = lineage("c", &expr, None, false).unwrap();
2920
2921        // Should trace to both a and b from t
2922        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2923        assert!(
2924            all_names.len() >= 3,
2925            "Expected to trace a + b to both columns, got: {:?}",
2926            all_names
2927        );
2928    }
2929
2930    #[test]
2931    fn test_set_operation_by_index() {
2932        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
2933
2934        // Trace column "a" which is at index 0
2935        let node = lineage("a", &expr, None, false).unwrap();
2936
2937        // UNION branches should be traced by index
2938        assert_eq!(node.downstream.len(), 2);
2939    }
2940
2941    // --- Tests for column lineage inside function calls (issue #18) ---
2942
2943    fn print_node(node: &LineageNode, indent: usize) {
2944        let pad = "  ".repeat(indent);
2945        println!(
2946            "{pad}name={:?} source_name={:?}",
2947            node.name, node.source_name
2948        );
2949        for child in &node.downstream {
2950            print_node(child, indent + 1);
2951        }
2952    }
2953
2954    #[test]
2955    fn test_issue18_repro() {
2956        // Exact scenario from the issue
2957        let query = "SELECT UPPER(name) as upper_name FROM users";
2958        println!("Query: {query}\n");
2959
2960        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
2961        let exprs = dialect.parse(query).unwrap();
2962        let expr = &exprs[0];
2963
2964        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
2965        println!("lineage(\"upper_name\"):");
2966        print_node(&node, 1);
2967
2968        let names = node.downstream_names();
2969        assert!(
2970            names.iter().any(|n| n == "users.name"),
2971            "Expected users.name in downstream, got: {:?}",
2972            names
2973        );
2974    }
2975
2976    #[test]
2977    fn test_lineage_upper_function() {
2978        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
2979        let node = lineage("upper_name", &expr, None, false).unwrap();
2980
2981        let names = node.downstream_names();
2982        assert!(
2983            names.iter().any(|n| n == "users.name"),
2984            "Expected users.name in downstream, got: {:?}",
2985            names
2986        );
2987    }
2988
2989    #[test]
2990    fn test_lineage_round_function() {
2991        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
2992        let node = lineage("rounded", &expr, None, false).unwrap();
2993
2994        let names = node.downstream_names();
2995        assert!(
2996            names.iter().any(|n| n == "products.price"),
2997            "Expected products.price in downstream, got: {:?}",
2998            names
2999        );
3000    }
3001
3002    #[test]
3003    fn test_lineage_coalesce_function() {
3004        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
3005        let node = lineage("val", &expr, None, false).unwrap();
3006
3007        let names = node.downstream_names();
3008        assert!(
3009            names.iter().any(|n| n == "t.a"),
3010            "Expected t.a in downstream, got: {:?}",
3011            names
3012        );
3013        assert!(
3014            names.iter().any(|n| n == "t.b"),
3015            "Expected t.b in downstream, got: {:?}",
3016            names
3017        );
3018    }
3019
3020    #[test]
3021    fn test_lineage_count_function() {
3022        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
3023        let node = lineage("cnt", &expr, None, false).unwrap();
3024
3025        let names = node.downstream_names();
3026        assert!(
3027            names.iter().any(|n| n == "t.id"),
3028            "Expected t.id in downstream, got: {:?}",
3029            names
3030        );
3031    }
3032
3033    #[test]
3034    fn test_lineage_sum_function() {
3035        let expr = parse("SELECT SUM(amount) AS total FROM t");
3036        let node = lineage("total", &expr, None, false).unwrap();
3037
3038        let names = node.downstream_names();
3039        assert!(
3040            names.iter().any(|n| n == "t.amount"),
3041            "Expected t.amount in downstream, got: {:?}",
3042            names
3043        );
3044    }
3045
3046    #[test]
3047    fn test_lineage_case_with_nested_functions() {
3048        let expr =
3049            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
3050        let node = lineage("result", &expr, None, false).unwrap();
3051
3052        let names = node.downstream_names();
3053        assert!(
3054            names.iter().any(|n| n == "t.x"),
3055            "Expected t.x in downstream, got: {:?}",
3056            names
3057        );
3058        assert!(
3059            names.iter().any(|n| n == "t.name"),
3060            "Expected t.name in downstream, got: {:?}",
3061            names
3062        );
3063    }
3064
3065    #[test]
3066    fn test_lineage_substring_function() {
3067        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
3068        let node = lineage("short", &expr, None, false).unwrap();
3069
3070        let names = node.downstream_names();
3071        assert!(
3072            names.iter().any(|n| n == "t.name"),
3073            "Expected t.name in downstream, got: {:?}",
3074            names
3075        );
3076    }
3077
3078    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
3079
3080    #[test]
3081    fn test_lineage_cte_select_star() {
3082        // Ported from sqlglot: test_lineage_source_with_star
3083        // WITH y AS (SELECT * FROM x) SELECT a FROM y
3084        // After star expansion: SELECT y.a AS a FROM y
3085        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
3086        let node = lineage("a", &expr, None, false).unwrap();
3087
3088        assert_eq!(node.name, "a");
3089        // Should successfully resolve column 'a' through the CTE
3090        // (previously failed with "Cannot find column 'a' in query")
3091        assert!(
3092            !node.downstream.is_empty(),
3093            "Expected downstream nodes tracing through CTE, got none"
3094        );
3095    }
3096
3097    #[test]
3098    fn test_lineage_cte_select_star_renamed_column() {
3099        // dbt standard pattern: CTE with column rename + outer SELECT *
3100        // This is the primary use case for dbt projects (jaffle-shop etc.)
3101        let expr =
3102            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
3103        let node = lineage("customer_id", &expr, None, false).unwrap();
3104
3105        assert_eq!(node.name, "customer_id");
3106        // Should trace customer_id → renamed CTE → source.id
3107        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3108        assert!(
3109            all_names.len() >= 2,
3110            "Expected at least 2 nodes (customer_id → source), got: {:?}",
3111            all_names
3112        );
3113    }
3114
3115    #[test]
3116    fn test_lineage_cte_select_star_multiple_columns() {
3117        // CTE exposes multiple columns, outer SELECT * should resolve each
3118        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
3119
3120        for col in &["a", "b", "c"] {
3121            let node = lineage(col, &expr, None, false).unwrap();
3122            assert_eq!(node.name, *col);
3123            // Verify lineage resolves without error (star expanded to explicit columns)
3124            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3125            assert!(
3126                all_names.len() >= 2,
3127                "Expected at least 2 nodes for column {}, got: {:?}",
3128                col,
3129                all_names
3130            );
3131        }
3132    }
3133
3134    #[test]
3135    fn test_lineage_nested_cte_select_star() {
3136        // Nested CTE star expansion: cte2 references cte1 via SELECT *
3137        let expr = parse(
3138            "WITH cte1 AS (SELECT a FROM t), \
3139             cte2 AS (SELECT * FROM cte1) \
3140             SELECT * FROM cte2",
3141        );
3142        let node = lineage("a", &expr, None, false).unwrap();
3143
3144        assert_eq!(node.name, "a");
3145        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3146        assert!(
3147            all_names.len() >= 3,
3148            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
3149            all_names
3150        );
3151    }
3152
3153    #[test]
3154    fn test_lineage_three_level_nested_cte_star() {
3155        // Three-level nested CTE: cte3 → cte2 → cte1 → t
3156        let expr = parse(
3157            "WITH cte1 AS (SELECT x FROM t), \
3158             cte2 AS (SELECT * FROM cte1), \
3159             cte3 AS (SELECT * FROM cte2) \
3160             SELECT * FROM cte3",
3161        );
3162        let node = lineage("x", &expr, None, false).unwrap();
3163
3164        assert_eq!(node.name, "x");
3165        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3166        assert!(
3167            all_names.len() >= 4,
3168            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
3169            all_names
3170        );
3171    }
3172
3173    #[test]
3174    fn test_lineage_cte_union_star() {
3175        // CTE with UNION body, outer SELECT * should resolve from left branch
3176        let expr = parse(
3177            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
3178             SELECT * FROM cte",
3179        );
3180        let node = lineage("a", &expr, None, false).unwrap();
3181
3182        assert_eq!(node.name, "a");
3183        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3184        assert!(
3185            all_names.len() >= 2,
3186            "Expected at least 2 nodes for CTE union star, got: {:?}",
3187            all_names
3188        );
3189    }
3190
3191    #[test]
3192    fn test_lineage_cte_star_unknown_table() {
3193        // When CTE references an unknown table, star expansion is skipped gracefully
3194        // and lineage falls back to normal resolution (which may fail)
3195        let expr = parse(
3196            "WITH cte AS (SELECT * FROM unknown_table) \
3197             SELECT * FROM cte",
3198        );
3199        // This should not panic — it may succeed or fail depending on resolution,
3200        // but should not crash
3201        let _result = lineage("x", &expr, None, false);
3202    }
3203
3204    #[test]
3205    fn test_lineage_cte_explicit_columns() {
3206        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
3207        let expr = parse(
3208            "WITH cte(x, y) AS (SELECT a, b FROM t) \
3209             SELECT * FROM cte",
3210        );
3211        let node = lineage("x", &expr, None, false).unwrap();
3212        assert_eq!(node.name, "x");
3213    }
3214
3215    #[test]
3216    fn test_lineage_cte_qualified_star() {
3217        // Qualified star: SELECT cte.* FROM cte
3218        let expr = parse(
3219            "WITH cte AS (SELECT a, b FROM t) \
3220             SELECT cte.* FROM cte",
3221        );
3222        for col in &["a", "b"] {
3223            let node = lineage(col, &expr, None, false).unwrap();
3224            assert_eq!(node.name, *col);
3225            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3226            assert!(
3227                all_names.len() >= 2,
3228                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
3229                col,
3230                all_names
3231            );
3232        }
3233    }
3234
3235    #[test]
3236    fn test_lineage_subquery_select_star() {
3237        // Ported from sqlglot: test_select_star
3238        // SELECT x FROM (SELECT * FROM table_a)
3239        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
3240        let node = lineage("x", &expr, None, false).unwrap();
3241
3242        assert_eq!(node.name, "x");
3243        assert!(
3244            !node.downstream.is_empty(),
3245            "Expected downstream nodes for subquery with SELECT *, got none"
3246        );
3247    }
3248
3249    #[test]
3250    fn test_lineage_cte_star_with_schema_external_table() {
3251        // CTE references an external table via SELECT * — schema enables expansion
3252        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
3253SELECT * FROM orders"#;
3254        let expr = parse(sql);
3255
3256        let mut schema = MappingSchema::new();
3257        let cols = vec![
3258            ("order_id".to_string(), DataType::Unknown),
3259            ("customer_id".to_string(), DataType::Unknown),
3260            ("amount".to_string(), DataType::Unknown),
3261        ];
3262        schema.add_table("stg_orders", &cols, None).unwrap();
3263
3264        let node = lineage_with_schema(
3265            "order_id",
3266            &expr,
3267            Some(&schema as &dyn Schema),
3268            None,
3269            false,
3270        )
3271        .unwrap();
3272        assert_eq!(node.name, "order_id");
3273    }
3274
3275    #[test]
3276    fn test_lineage_cte_star_with_schema_three_part_name() {
3277        // CTE references an external table with fully-qualified 3-part name
3278        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
3279SELECT * FROM orders"#;
3280        let expr = parse(sql);
3281
3282        let mut schema = MappingSchema::new();
3283        let cols = vec![
3284            ("order_id".to_string(), DataType::Unknown),
3285            ("customer_id".to_string(), DataType::Unknown),
3286        ];
3287        schema.add_table("db.schema.stg_orders", &cols, None).unwrap();
3288
3289        let node = lineage_with_schema(
3290            "customer_id",
3291            &expr,
3292            Some(&schema as &dyn Schema),
3293            None,
3294            false,
3295        )
3296        .unwrap();
3297        assert_eq!(node.name, "customer_id");
3298    }
3299
3300    #[test]
3301    fn test_lineage_cte_star_with_schema_nested() {
3302        // Nested CTEs: outer CTE references inner CTE with SELECT *,
3303        // inner CTE references external table with SELECT *
3304        let sql = r#"WITH
3305            raw AS (SELECT * FROM external_table),
3306            enriched AS (SELECT * FROM raw)
3307        SELECT * FROM enriched"#;
3308        let expr = parse(sql);
3309
3310        let mut schema = MappingSchema::new();
3311        let cols = vec![
3312            ("id".to_string(), DataType::Unknown),
3313            ("name".to_string(), DataType::Unknown),
3314        ];
3315        schema.add_table("external_table", &cols, None).unwrap();
3316
3317        let node = lineage_with_schema(
3318            "name",
3319            &expr,
3320            Some(&schema as &dyn Schema),
3321            None,
3322            false,
3323        )
3324        .unwrap();
3325        assert_eq!(node.name, "name");
3326    }
3327
3328    #[test]
3329    fn test_lineage_cte_qualified_star_with_schema() {
3330        // CTE uses qualified star (orders.*) from a CTE whose columns
3331        // come from an external table via SELECT *
3332        let sql = r#"WITH
3333            orders AS (SELECT * FROM stg_orders),
3334            enriched AS (
3335                SELECT orders.*, 'extra' AS extra
3336                FROM orders
3337            )
3338        SELECT * FROM enriched"#;
3339        let expr = parse(sql);
3340
3341        let mut schema = MappingSchema::new();
3342        let cols = vec![
3343            ("order_id".to_string(), DataType::Unknown),
3344            ("total".to_string(), DataType::Unknown),
3345        ];
3346        schema.add_table("stg_orders", &cols, None).unwrap();
3347
3348        let node = lineage_with_schema(
3349            "order_id",
3350            &expr,
3351            Some(&schema as &dyn Schema),
3352            None,
3353            false,
3354        )
3355        .unwrap();
3356        assert_eq!(node.name, "order_id");
3357
3358        // Also verify the extra column works
3359        let extra = lineage_with_schema(
3360            "extra",
3361            &expr,
3362            Some(&schema as &dyn Schema),
3363            None,
3364            false,
3365        )
3366        .unwrap();
3367        assert_eq!(extra.name, "extra");
3368    }
3369
3370    #[test]
3371    fn test_lineage_cte_star_without_schema_still_works() {
3372        // Without schema, CTE-to-CTE star expansion still works
3373        let sql = r#"WITH
3374            cte1 AS (SELECT id, name FROM raw_table),
3375            cte2 AS (SELECT * FROM cte1)
3376        SELECT * FROM cte2"#;
3377        let expr = parse(sql);
3378
3379        // No schema — should still resolve through CTE chain
3380        let node = lineage("id", &expr, None, false).unwrap();
3381        assert_eq!(node.name, "id");
3382    }
3383
3384    #[test]
3385    fn test_lineage_nested_cte_star_with_join_and_schema() {
3386        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
3387        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
3388        let sql = r#"WITH
3389base_orders AS (
3390    SELECT * FROM stg_orders
3391),
3392with_payments AS (
3393    SELECT
3394        base_orders.*,
3395        p.amount
3396    FROM base_orders
3397    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
3398),
3399final_cte AS (
3400    SELECT * FROM with_payments
3401)
3402SELECT * FROM final_cte"#;
3403        let expr = parse(sql);
3404
3405        let mut schema = MappingSchema::new();
3406        let order_cols = vec![
3407            ("order_id".to_string(), crate::expressions::DataType::Unknown),
3408            ("customer_id".to_string(), crate::expressions::DataType::Unknown),
3409            ("status".to_string(), crate::expressions::DataType::Unknown),
3410        ];
3411        let pay_cols = vec![
3412            ("payment_id".to_string(), crate::expressions::DataType::Unknown),
3413            ("order_id".to_string(), crate::expressions::DataType::Unknown),
3414            ("amount".to_string(), crate::expressions::DataType::Unknown),
3415        ];
3416        schema.add_table("stg_orders", &order_cols, None).unwrap();
3417        schema.add_table("stg_payments", &pay_cols, None).unwrap();
3418
3419        // order_id should trace back to stg_orders
3420        let node = lineage_with_schema(
3421            "order_id", &expr, Some(&schema as &dyn Schema), None, false,
3422        ).unwrap();
3423        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3424
3425        // The leaf should be "stg_orders.order_id" (not just "order_id")
3426        let has_table_qualified = all_names.iter().any(|n| n.contains('.') && n.contains("order_id"));
3427        assert!(has_table_qualified,
3428            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}", all_names);
3429
3430        // amount should trace back to stg_payments
3431        let node = lineage_with_schema(
3432            "amount", &expr, Some(&schema as &dyn Schema), None, false,
3433        ).unwrap();
3434        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3435
3436        let has_table_qualified = all_names.iter().any(|n| n.contains('.') && n.contains("amount"));
3437        assert!(has_table_qualified,
3438            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}", all_names);
3439    }
3440
3441    #[test]
3442    fn test_lineage_cte_alias_resolution() {
3443        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
3444        let sql = r#"WITH import_stg_items AS (
3445    SELECT item_id, name, status FROM stg_items
3446)
3447SELECT base.item_id, base.status
3448FROM import_stg_items AS base"#;
3449        let expr = parse(sql);
3450
3451        let node = lineage("item_id", &expr, None, false).unwrap();
3452        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3453        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
3454        assert!(
3455            all_names.iter().any(|n| n == "stg_items.item_id"),
3456            "Expected leaf 'stg_items.item_id', got: {:?}",
3457            all_names
3458        );
3459    }
3460
3461    #[test]
3462    fn test_lineage_cte_alias_with_schema_and_star() {
3463        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
3464        let sql = r#"WITH import_stg AS (
3465    SELECT * FROM stg_items
3466)
3467SELECT base.item_id, base.status
3468FROM import_stg AS base"#;
3469        let expr = parse(sql);
3470
3471        let mut schema = MappingSchema::new();
3472        schema
3473            .add_table(
3474                "stg_items",
3475                &[
3476                    ("item_id".to_string(), DataType::Unknown),
3477                    ("name".to_string(), DataType::Unknown),
3478                    ("status".to_string(), DataType::Unknown),
3479                ],
3480                None,
3481            )
3482            .unwrap();
3483
3484        let node = lineage_with_schema(
3485            "item_id",
3486            &expr,
3487            Some(&schema as &dyn Schema),
3488            None,
3489            false,
3490        )
3491        .unwrap();
3492        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3493        assert!(
3494            all_names.iter().any(|n| n == "stg_items.item_id"),
3495            "Expected leaf 'stg_items.item_id', got: {:?}",
3496            all_names
3497        );
3498    }
3499
3500    #[test]
3501    fn test_lineage_cte_alias_with_join() {
3502        // Multiple CTE aliases in a JOIN: each should resolve independently
3503        let sql = r#"WITH
3504    import_users AS (SELECT id, name FROM users),
3505    import_orders AS (SELECT id, user_id, amount FROM orders)
3506SELECT u.name, o.amount
3507FROM import_users AS u
3508LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
3509        let expr = parse(sql);
3510
3511        let node = lineage("name", &expr, None, false).unwrap();
3512        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3513        assert!(
3514            all_names.iter().any(|n| n == "users.name"),
3515            "Expected leaf 'users.name', got: {:?}",
3516            all_names
3517        );
3518
3519        let node = lineage("amount", &expr, None, false).unwrap();
3520        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3521        assert!(
3522            all_names.iter().any(|n| n == "orders.amount"),
3523            "Expected leaf 'orders.amount', got: {:?}",
3524            all_names
3525        );
3526    }
3527
3528    // -----------------------------------------------------------------------
3529    // Quoted CTE name tests — verifying SQL identifier case semantics
3530    // -----------------------------------------------------------------------
3531
3532    #[test]
3533    fn test_lineage_unquoted_cte_case_insensitive() {
3534        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
3535        // MyCte and MYCTE should match.
3536        let expr = parse(
3537            "WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE",
3538        );
3539        let node = lineage("col", &expr, None, false).unwrap();
3540        assert_eq!(node.name, "col");
3541        assert!(
3542            !node.downstream.is_empty(),
3543            "Unquoted CTE should resolve case-insensitively"
3544        );
3545    }
3546
3547    #[test]
3548    fn test_lineage_quoted_cte_case_preserved() {
3549        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
3550        let expr = parse(
3551            r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#,
3552        );
3553        let node = lineage("col", &expr, None, false).unwrap();
3554        assert_eq!(node.name, "col");
3555        assert!(
3556            !node.downstream.is_empty(),
3557            "Quoted CTE with matching case should resolve"
3558        );
3559    }
3560
3561    #[test]
3562    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
3563        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
3564        // sqlglot treats this as a table reference, not a CTE match.
3565        // Star expansion should NOT resolve through the CTE.
3566        let expr = parse(
3567            r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#,
3568        );
3569        // lineage("col", ...) should fail because "mycte" is treated as an external
3570        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
3571        let result = lineage("col", &expr, None, false);
3572        assert!(
3573            result.is_err(),
3574            "Quoted CTE with case mismatch should not expand star: {:?}",
3575            result
3576        );
3577    }
3578
3579    #[test]
3580    fn test_lineage_mixed_quoted_unquoted_cte() {
3581        // Mix of unquoted and quoted CTEs in a nested chain.
3582        let expr = parse(
3583            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
3584        );
3585        let node = lineage("a", &expr, None, false).unwrap();
3586        assert_eq!(node.name, "a");
3587        assert!(
3588            !node.downstream.is_empty(),
3589            "Mixed quoted/unquoted CTE chain should resolve"
3590        );
3591    }
3592
3593    // -----------------------------------------------------------------------
3594    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
3595    // -----------------------------------------------------------------------
3596    //
3597    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
3598    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
3599    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
3600    // direct string comparison for CTE name matching, ignoring the quoted status.
3601    //
3602    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
3603    // and unquoted as case-insensitive. The scope system should do the same.
3604    //
3605    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
3606    // which is broader than the star expansion scope of this PR.
3607
3608    #[test]
3609    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
3610        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
3611        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
3612        // as "mycte" incorrectly resolves to the CTE.
3613        //
3614        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
3615        // case-sensitive: "mycte" should NOT match CTE "MyCte".
3616        //
3617        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
3618        // this test should fail — update the assertion to match correct behavior:
3619        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
3620        let expr = parse(
3621            r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#,
3622        );
3623        let node = lineage("col", &expr, None, false).unwrap();
3624        assert!(!node.downstream.is_empty());
3625        let child = &node.downstream[0];
3626        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
3627        assert_eq!(
3628            child.source_name, "MyCte",
3629            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3630             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3631        );
3632    }
3633
3634    #[test]
3635    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
3636        // Known bug: same as above but with qualified column reference ("mycte".col).
3637        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
3638        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
3639        //
3640        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
3641        // this test should fail — update to assert source_name != "MyCte".
3642        let expr = parse(
3643            r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#,
3644        );
3645        let node = lineage("col", &expr, None, false).unwrap();
3646        assert!(!node.downstream.is_empty());
3647        let child = &node.downstream[0];
3648        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
3649        assert_eq!(
3650            child.source_name, "MyCte",
3651            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
3652             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
3653        );
3654    }
3655
3656    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
3657
3658    /// sqlglot: test_node_name_doesnt_contain_comment
3659    /// Comments in column expressions should not affect lineage resolution.
3660    /// NOTE: This test uses SELECT * from a derived table, which is a separate
3661    /// known limitation in polyglot-sql (star expansion in subqueries).
3662    #[test]
3663    #[ignore = "requires derived table star expansion (separate issue)"]
3664    fn test_node_name_doesnt_contain_comment() {
3665        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
3666        let node = lineage("x", &expr, None, false).unwrap();
3667
3668        assert_eq!(node.name, "x");
3669        assert!(!node.downstream.is_empty());
3670    }
3671
3672    /// A line comment between SELECT and the first column wraps the column
3673    /// in an Annotated node. Lineage must unwrap it to find the column name.
3674    /// Verify that commented and uncommented queries produce identical lineage.
3675    #[test]
3676    fn test_comment_before_first_column_in_cte() {
3677        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
3678        let sql_without_comment = "with t as (select 1 as a) select a from t";
3679
3680        // Without comment — baseline
3681        let expr_ok = parse(sql_without_comment);
3682        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
3683
3684        // With comment — should produce identical lineage
3685        let expr_comment = parse(sql_with_comment);
3686        let node_comment = lineage("a", &expr_comment, None, false)
3687            .expect("with comment before first column should succeed");
3688
3689        assert_eq!(node_ok.name, node_comment.name, "node names should match");
3690        assert_eq!(
3691            node_ok.downstream_names(),
3692            node_comment.downstream_names(),
3693            "downstream lineage should be identical with or without comment"
3694        );
3695    }
3696
3697    /// Block comment between SELECT and first column.
3698    #[test]
3699    fn test_block_comment_before_first_column() {
3700        let sql = "with t as (select 1 as a) select /* section */ a from t";
3701        let expr = parse(sql);
3702        let node = lineage("a", &expr, None, false)
3703            .expect("block comment before first column should succeed");
3704        assert_eq!(node.name, "a");
3705        assert!(
3706            !node.downstream.is_empty(),
3707            "should have downstream lineage"
3708        );
3709    }
3710
3711    /// Comment before first column should not affect second column resolution.
3712    #[test]
3713    fn test_comment_before_first_column_second_col_ok() {
3714        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
3715        let expr = parse(sql);
3716
3717        let node_a =
3718            lineage("a", &expr, None, false).expect("column a with comment should succeed");
3719        assert_eq!(node_a.name, "a");
3720
3721        let node_b =
3722            lineage("b", &expr, None, false).expect("column b with comment should succeed");
3723        assert_eq!(node_b.name, "b");
3724    }
3725
3726    /// Aliased column with preceding comment.
3727    #[test]
3728    fn test_comment_before_aliased_column() {
3729        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
3730        let expr = parse(sql);
3731        let node =
3732            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
3733        assert_eq!(node.name, "y");
3734        assert!(
3735            !node.downstream.is_empty(),
3736            "aliased column should have downstream lineage"
3737        );
3738    }
3739}