Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::Expression;
10use crate::optimizer::annotate_types::annotate_types;
11use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
12use crate::schema::{normalize_name, Schema};
13use crate::scope::{build_scope, Scope};
14use crate::traversal::ExpressionWalk;
15use crate::{Error, Result};
16use serde::{Deserialize, Serialize};
17use std::collections::HashSet;
18
19/// A node in the column lineage graph
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct LineageNode {
22    /// Name of this lineage step (e.g., "table.column")
23    pub name: String,
24    /// The expression at this node
25    pub expression: Expression,
26    /// The source expression (the full query context)
27    pub source: Expression,
28    /// Downstream nodes that depend on this one
29    pub downstream: Vec<LineageNode>,
30    /// Optional source name (e.g., for derived tables)
31    pub source_name: String,
32    /// Optional reference node name (e.g., for CTEs)
33    pub reference_node_name: String,
34}
35
36impl LineageNode {
37    /// Create a new lineage node
38    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
39        Self {
40            name: name.into(),
41            expression,
42            source,
43            downstream: Vec::new(),
44            source_name: String::new(),
45            reference_node_name: String::new(),
46        }
47    }
48
49    /// Iterate over all nodes in the lineage graph using DFS
50    pub fn walk(&self) -> LineageWalker<'_> {
51        LineageWalker { stack: vec![self] }
52    }
53
54    /// Get all downstream column names
55    pub fn downstream_names(&self) -> Vec<String> {
56        self.downstream.iter().map(|n| n.name.clone()).collect()
57    }
58}
59
60/// Iterator for walking the lineage graph
61pub struct LineageWalker<'a> {
62    stack: Vec<&'a LineageNode>,
63}
64
65impl<'a> Iterator for LineageWalker<'a> {
66    type Item = &'a LineageNode;
67
68    fn next(&mut self) -> Option<Self::Item> {
69        if let Some(node) = self.stack.pop() {
70            // Add children in reverse order so they're visited in order
71            for child in node.downstream.iter().rev() {
72                self.stack.push(child);
73            }
74            Some(node)
75        } else {
76            None
77        }
78    }
79}
80
81// ---------------------------------------------------------------------------
82// ColumnRef: name or positional index for column lookup
83// ---------------------------------------------------------------------------
84
85/// Column reference for lineage tracing — by name or positional index.
86enum ColumnRef<'a> {
87    Name(&'a str),
88    Index(usize),
89}
90
91// ---------------------------------------------------------------------------
92// Public API
93// ---------------------------------------------------------------------------
94
95/// Build the lineage graph for a column in a SQL query
96///
97/// # Arguments
98/// * `column` - The column name to trace lineage for
99/// * `sql` - The SQL expression (SELECT, UNION, etc.)
100/// * `dialect` - Optional dialect for parsing
101/// * `trim_selects` - If true, trim the source SELECT to only include the target column
102///
103/// # Returns
104/// The root lineage node for the specified column
105///
106/// # Example
107/// ```ignore
108/// use polyglot_sql::lineage::lineage;
109/// use polyglot_sql::parse_one;
110/// use polyglot_sql::DialectType;
111///
112/// let sql = "SELECT a, b + 1 AS c FROM t";
113/// let expr = parse_one(sql, DialectType::Generic).unwrap();
114/// let node = lineage("c", &expr, None, false).unwrap();
115/// ```
116pub fn lineage(
117    column: &str,
118    sql: &Expression,
119    dialect: Option<DialectType>,
120    trim_selects: bool,
121) -> Result<LineageNode> {
122    lineage_from_expression(column, sql, dialect, trim_selects)
123}
124
125/// Build the lineage graph for a column in a SQL query using optional schema metadata.
126///
127/// When `schema` is provided, the query is first qualified with
128/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
129/// ambiguous column references.
130///
131/// # Arguments
132/// * `column` - The column name to trace lineage for
133/// * `sql` - The SQL expression (SELECT, UNION, etc.)
134/// * `schema` - Optional schema used for qualification
135/// * `dialect` - Optional dialect for qualification and lineage handling
136/// * `trim_selects` - If true, trim the source SELECT to only include the target column
137///
138/// # Returns
139/// The root lineage node for the specified column
140pub fn lineage_with_schema(
141    column: &str,
142    sql: &Expression,
143    schema: Option<&dyn Schema>,
144    dialect: Option<DialectType>,
145    trim_selects: bool,
146) -> Result<LineageNode> {
147    let mut qualified_expression = if let Some(schema) = schema {
148        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
149            QualifyColumnsOptions::new().with_dialect(dialect_type)
150        } else {
151            QualifyColumnsOptions::new()
152        };
153
154        qualify_columns(sql.clone(), schema, &options).map_err(|e| {
155            Error::internal(format!("Lineage qualification failed with schema: {}", e))
156        })?
157    } else {
158        sql.clone()
159    };
160
161    // Annotate types in-place so lineage nodes carry type information
162    annotate_types(&mut qualified_expression, schema, dialect);
163
164    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
165}
166
167fn lineage_from_expression(
168    column: &str,
169    sql: &Expression,
170    dialect: Option<DialectType>,
171    trim_selects: bool,
172) -> Result<LineageNode> {
173    let scope = build_scope(sql);
174    to_node(
175        ColumnRef::Name(column),
176        &scope,
177        dialect,
178        "",
179        "",
180        "",
181        trim_selects,
182    )
183}
184
185/// Get all source tables from a lineage graph
186pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
187    let mut tables = HashSet::new();
188    collect_source_tables(node, &mut tables);
189    tables
190}
191
192/// Recursively collect source table names from lineage graph
193pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
194    if let Expression::Table(table) = &node.source {
195        tables.insert(table.name.name.clone());
196    }
197    for child in &node.downstream {
198        collect_source_tables(child, tables);
199    }
200}
201
202// ---------------------------------------------------------------------------
203// Core recursive lineage builder
204// ---------------------------------------------------------------------------
205
206/// Recursively build a lineage node for a column in a scope.
207fn to_node(
208    column: ColumnRef<'_>,
209    scope: &Scope,
210    dialect: Option<DialectType>,
211    scope_name: &str,
212    source_name: &str,
213    reference_node_name: &str,
214    trim_selects: bool,
215) -> Result<LineageNode> {
216    to_node_inner(
217        column,
218        scope,
219        dialect,
220        scope_name,
221        source_name,
222        reference_node_name,
223        trim_selects,
224        &[],
225    )
226}
227
228fn to_node_inner(
229    column: ColumnRef<'_>,
230    scope: &Scope,
231    dialect: Option<DialectType>,
232    scope_name: &str,
233    source_name: &str,
234    reference_node_name: &str,
235    trim_selects: bool,
236    ancestor_cte_scopes: &[Scope],
237) -> Result<LineageNode> {
238    let scope_expr = &scope.expression;
239
240    // Build combined CTE scopes: current scope's cte_scopes + ancestors
241    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
242    for s in ancestor_cte_scopes {
243        all_cte_scopes.push(s);
244    }
245
246    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
247    //    but we need the inner query (SELECT/UNION) for column lookup.
248    let effective_expr = match scope_expr {
249        Expression::Cte(cte) => &cte.this,
250        other => other,
251    };
252
253    // 1. Set operations (UNION / INTERSECT / EXCEPT)
254    if matches!(
255        effective_expr,
256        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
257    ) {
258        // For CTE wrapping a set op, create a temporary scope with the inner expression
259        if matches!(scope_expr, Expression::Cte(_)) {
260            let mut inner_scope = Scope::new(effective_expr.clone());
261            inner_scope.union_scopes = scope.union_scopes.clone();
262            inner_scope.sources = scope.sources.clone();
263            inner_scope.cte_sources = scope.cte_sources.clone();
264            inner_scope.cte_scopes = scope.cte_scopes.clone();
265            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
266            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
267            return handle_set_operation(
268                &column,
269                &inner_scope,
270                dialect,
271                scope_name,
272                source_name,
273                reference_node_name,
274                trim_selects,
275                ancestor_cte_scopes,
276            );
277        }
278        return handle_set_operation(
279            &column,
280            scope,
281            dialect,
282            scope_name,
283            source_name,
284            reference_node_name,
285            trim_selects,
286            ancestor_cte_scopes,
287        );
288    }
289
290    // 2. Find the select expression for this column
291    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
292    let column_name = resolve_column_name(&column, &select_expr);
293
294    // 3. Trim source if requested
295    let node_source = if trim_selects {
296        trim_source(effective_expr, &select_expr)
297    } else {
298        effective_expr.clone()
299    };
300
301    // 4. Create the lineage node
302    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
303    node.source_name = source_name.to_string();
304    node.reference_node_name = reference_node_name.to_string();
305
306    // 5. Star handling — add downstream for each source
307    if matches!(&select_expr, Expression::Star(_)) {
308        for (name, source_info) in &scope.sources {
309            let child = LineageNode::new(
310                format!("{}.*", name),
311                Expression::Star(crate::expressions::Star {
312                    table: None,
313                    except: None,
314                    replace: None,
315                    rename: None,
316                    trailing_comments: vec![],
317                    span: None,
318                }),
319                source_info.expression.clone(),
320            );
321            node.downstream.push(child);
322        }
323        return Ok(node);
324    }
325
326    // 6. Subqueries in select — trace through scalar subqueries
327    let subqueries: Vec<&Expression> =
328        select_expr.find_all(|e| matches!(e, Expression::Subquery(sq) if sq.alias.is_none()));
329    for sq_expr in subqueries {
330        if let Expression::Subquery(sq) = sq_expr {
331            for sq_scope in &scope.subquery_scopes {
332                if sq_scope.expression == sq.this {
333                    if let Ok(child) = to_node_inner(
334                        ColumnRef::Index(0),
335                        sq_scope,
336                        dialect,
337                        &column_name,
338                        "",
339                        "",
340                        trim_selects,
341                        ancestor_cte_scopes,
342                    ) {
343                        node.downstream.push(child);
344                    }
345                    break;
346                }
347            }
348        }
349    }
350
351    // 7. Column references — trace each column to its source
352    let col_refs = find_column_refs_in_expr(&select_expr);
353    for col_ref in col_refs {
354        let col_name = &col_ref.column;
355        if let Some(ref table_id) = col_ref.table {
356            let tbl = &table_id.name;
357            resolve_qualified_column(
358                &mut node,
359                scope,
360                dialect,
361                tbl,
362                col_name,
363                &column_name,
364                trim_selects,
365                &all_cte_scopes,
366            );
367        } else {
368            resolve_unqualified_column(
369                &mut node,
370                scope,
371                dialect,
372                col_name,
373                &column_name,
374                trim_selects,
375                &all_cte_scopes,
376            );
377        }
378    }
379
380    Ok(node)
381}
382
383// ---------------------------------------------------------------------------
384// Set operation handling
385// ---------------------------------------------------------------------------
386
387fn handle_set_operation(
388    column: &ColumnRef<'_>,
389    scope: &Scope,
390    dialect: Option<DialectType>,
391    scope_name: &str,
392    source_name: &str,
393    reference_node_name: &str,
394    trim_selects: bool,
395    ancestor_cte_scopes: &[Scope],
396) -> Result<LineageNode> {
397    let scope_expr = &scope.expression;
398
399    // Determine column index
400    let col_index = match column {
401        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
402        ColumnRef::Index(i) => *i,
403    };
404
405    let col_name = match column {
406        ColumnRef::Name(name) => name.to_string(),
407        ColumnRef::Index(_) => format!("_{col_index}"),
408    };
409
410    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
411    node.source_name = source_name.to_string();
412    node.reference_node_name = reference_node_name.to_string();
413
414    // Recurse into each union branch
415    for branch_scope in &scope.union_scopes {
416        if let Ok(child) = to_node_inner(
417            ColumnRef::Index(col_index),
418            branch_scope,
419            dialect,
420            scope_name,
421            "",
422            "",
423            trim_selects,
424            ancestor_cte_scopes,
425        ) {
426            node.downstream.push(child);
427        }
428    }
429
430    Ok(node)
431}
432
433// ---------------------------------------------------------------------------
434// Column resolution helpers
435// ---------------------------------------------------------------------------
436
437fn resolve_qualified_column(
438    node: &mut LineageNode,
439    scope: &Scope,
440    dialect: Option<DialectType>,
441    table: &str,
442    col_name: &str,
443    parent_name: &str,
444    trim_selects: bool,
445    all_cte_scopes: &[&Scope],
446) {
447    // Check if table is a CTE reference (cte_sources tracks CTE names)
448    if scope.cte_sources.contains_key(table) {
449        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, table) {
450            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
451            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
452            if let Ok(child) = to_node_inner(
453                ColumnRef::Name(col_name),
454                child_scope,
455                dialect,
456                parent_name,
457                table,
458                parent_name,
459                trim_selects,
460                &ancestors,
461            ) {
462                node.downstream.push(child);
463                return;
464            }
465        }
466    }
467
468    // Check if table is a derived table (is_scope = true in sources)
469    if let Some(source_info) = scope.sources.get(table) {
470        if source_info.is_scope {
471            if let Some(child_scope) = find_child_scope(scope, table) {
472                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
473                if let Ok(child) = to_node_inner(
474                    ColumnRef::Name(col_name),
475                    child_scope,
476                    dialect,
477                    parent_name,
478                    table,
479                    parent_name,
480                    trim_selects,
481                    &ancestors,
482                ) {
483                    node.downstream.push(child);
484                    return;
485                }
486            }
487        }
488    }
489
490    // Base table source found in current scope: preserve alias in the display name
491    // but store the resolved table expression and name for downstream consumers.
492    if let Some(source_info) = scope.sources.get(table) {
493        if !source_info.is_scope {
494            node.downstream.push(make_table_column_node_from_source(
495                table,
496                col_name,
497                &source_info.expression,
498            ));
499            return;
500        }
501    }
502
503    // Base table or unresolved — terminal node
504    node.downstream
505        .push(make_table_column_node(table, col_name));
506}
507
508fn resolve_unqualified_column(
509    node: &mut LineageNode,
510    scope: &Scope,
511    dialect: Option<DialectType>,
512    col_name: &str,
513    parent_name: &str,
514    trim_selects: bool,
515    all_cte_scopes: &[&Scope],
516) {
517    // Try to find which source this column belongs to.
518    // Build the source list from the actual FROM/JOIN clauses to avoid
519    // mixing in CTE definitions that are in scope but not referenced.
520    let from_source_names = source_names_from_from_join(scope);
521
522    if from_source_names.len() == 1 {
523        let tbl = &from_source_names[0];
524        resolve_qualified_column(
525            node,
526            scope,
527            dialect,
528            tbl,
529            col_name,
530            parent_name,
531            trim_selects,
532            all_cte_scopes,
533        );
534        return;
535    }
536
537    // Multiple sources — can't resolve without schema info, add unqualified node
538    let child = LineageNode::new(
539        col_name.to_string(),
540        Expression::Column(Box::new(crate::expressions::Column {
541            name: crate::expressions::Identifier::new(col_name.to_string()),
542            table: None,
543            join_mark: false,
544            trailing_comments: vec![],
545            span: None,
546            inferred_type: None,
547        })),
548        node.source.clone(),
549    );
550    node.downstream.push(child);
551}
552
553fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
554    fn source_name(expr: &Expression) -> Option<String> {
555        match expr {
556            Expression::Table(table) => Some(
557                table
558                    .alias
559                    .as_ref()
560                    .map(|a| a.name.clone())
561                    .unwrap_or_else(|| table.name.name.clone()),
562            ),
563            Expression::Subquery(subquery) => {
564                subquery.alias.as_ref().map(|alias| alias.name.clone())
565            }
566            Expression::Paren(paren) => source_name(&paren.this),
567            _ => None,
568        }
569    }
570
571    let effective_expr = match &scope.expression {
572        Expression::Cte(cte) => &cte.this,
573        expr => expr,
574    };
575
576    let mut names = Vec::new();
577    let mut seen = std::collections::HashSet::new();
578
579    if let Expression::Select(select) = effective_expr {
580        if let Some(from) = &select.from {
581            for expr in &from.expressions {
582                if let Some(name) = source_name(expr) {
583                    if !name.is_empty() && seen.insert(name.clone()) {
584                        names.push(name);
585                    }
586                }
587            }
588        }
589        for join in &select.joins {
590            if let Some(name) = source_name(&join.this) {
591                if !name.is_empty() && seen.insert(name.clone()) {
592                    names.push(name);
593                }
594            }
595        }
596    }
597
598    names
599}
600
601// ---------------------------------------------------------------------------
602// Helper functions
603// ---------------------------------------------------------------------------
604
605/// Get the alias or name of an expression
606fn get_alias_or_name(expr: &Expression) -> Option<String> {
607    match expr {
608        Expression::Alias(alias) => Some(alias.alias.name.clone()),
609        Expression::Column(col) => Some(col.name.name.clone()),
610        Expression::Identifier(id) => Some(id.name.clone()),
611        Expression::Star(_) => Some("*".to_string()),
612        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
613        // Unwrap to get the actual column/alias name from the inner expression.
614        Expression::Annotated(a) => get_alias_or_name(&a.this),
615        _ => None,
616    }
617}
618
619/// Resolve the display name for a column reference.
620fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
621    match column {
622        ColumnRef::Name(n) => n.to_string(),
623        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
624    }
625}
626
627/// Find the select expression matching a column reference.
628fn find_select_expr(
629    scope_expr: &Expression,
630    column: &ColumnRef<'_>,
631    dialect: Option<DialectType>,
632) -> Result<Expression> {
633    if let Expression::Select(ref select) = scope_expr {
634        match column {
635            ColumnRef::Name(name) => {
636                let normalized_name = normalize_column_name(name, dialect);
637                for expr in &select.expressions {
638                    if let Some(alias_or_name) = get_alias_or_name(expr) {
639                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
640                            return Ok(expr.clone());
641                        }
642                    }
643                }
644                Err(crate::error::Error::parse(
645                    format!("Cannot find column '{}' in query", name),
646                    0,
647                    0,
648                    0,
649                    0,
650                ))
651            }
652            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
653                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
654            }),
655        }
656    } else {
657        Err(crate::error::Error::parse(
658            "Expected SELECT expression for column lookup",
659            0,
660            0,
661            0,
662            0,
663        ))
664    }
665}
666
667/// Find the positional index of a column name in a set operation's first SELECT branch.
668fn column_to_index(
669    set_op_expr: &Expression,
670    name: &str,
671    dialect: Option<DialectType>,
672) -> Result<usize> {
673    let normalized_name = normalize_column_name(name, dialect);
674    let mut expr = set_op_expr;
675    loop {
676        match expr {
677            Expression::Union(u) => expr = &u.left,
678            Expression::Intersect(i) => expr = &i.left,
679            Expression::Except(e) => expr = &e.left,
680            Expression::Select(select) => {
681                for (i, e) in select.expressions.iter().enumerate() {
682                    if let Some(alias_or_name) = get_alias_or_name(e) {
683                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
684                            return Ok(i);
685                        }
686                    }
687                }
688                return Err(crate::error::Error::parse(
689                    format!("Cannot find column '{}' in set operation", name),
690                    0,
691                    0,
692                    0,
693                    0,
694                ));
695            }
696            _ => {
697                return Err(crate::error::Error::parse(
698                    "Expected SELECT or set operation",
699                    0,
700                    0,
701                    0,
702                    0,
703                ))
704            }
705        }
706    }
707}
708
709fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
710    normalize_name(name, dialect, false, true)
711}
712
713/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
714fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
715    if let Expression::Select(select) = select_expr {
716        let mut trimmed = select.as_ref().clone();
717        trimmed.expressions = vec![target_expr.clone()];
718        Expression::Select(Box::new(trimmed))
719    } else {
720        select_expr.clone()
721    }
722}
723
724/// Find the child scope (CTE or derived table) for a given source name.
725fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
726    // Check CTE scopes
727    if scope.cte_sources.contains_key(source_name) {
728        for cte_scope in &scope.cte_scopes {
729            if let Expression::Cte(cte) = &cte_scope.expression {
730                if cte.alias.name == source_name {
731                    return Some(cte_scope);
732                }
733            }
734        }
735    }
736
737    // Check derived table scopes
738    if let Some(source_info) = scope.sources.get(source_name) {
739        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
740            if let Expression::Subquery(sq) = &source_info.expression {
741                for dt_scope in &scope.derived_table_scopes {
742                    if dt_scope.expression == sq.this {
743                        return Some(dt_scope);
744                    }
745                }
746            }
747        }
748    }
749
750    None
751}
752
753/// Find a CTE scope by name, searching through a combined list of CTE scopes.
754/// This handles nested CTEs where the current scope doesn't have the CTE scope
755/// as a direct child but knows about it via cte_sources.
756fn find_child_scope_in<'a>(
757    all_cte_scopes: &[&'a Scope],
758    scope: &'a Scope,
759    source_name: &str,
760) -> Option<&'a Scope> {
761    // First try the scope's own cte_scopes
762    for cte_scope in &scope.cte_scopes {
763        if let Expression::Cte(cte) = &cte_scope.expression {
764            if cte.alias.name == source_name {
765                return Some(cte_scope);
766            }
767        }
768    }
769
770    // Then search through all ancestor CTE scopes
771    for cte_scope in all_cte_scopes {
772        if let Expression::Cte(cte) = &cte_scope.expression {
773            if cte.alias.name == source_name {
774                return Some(cte_scope);
775            }
776        }
777    }
778
779    // Fall back to derived table scopes
780    if let Some(source_info) = scope.sources.get(source_name) {
781        if source_info.is_scope {
782            if let Expression::Subquery(sq) = &source_info.expression {
783                for dt_scope in &scope.derived_table_scopes {
784                    if dt_scope.expression == sq.this {
785                        return Some(dt_scope);
786                    }
787                }
788            }
789        }
790    }
791
792    None
793}
794
795/// Create a terminal lineage node for a table.column reference.
796fn make_table_column_node(table: &str, column: &str) -> LineageNode {
797    let mut node = LineageNode::new(
798        format!("{}.{}", table, column),
799        Expression::Column(Box::new(crate::expressions::Column {
800            name: crate::expressions::Identifier::new(column.to_string()),
801            table: Some(crate::expressions::Identifier::new(table.to_string())),
802            join_mark: false,
803            trailing_comments: vec![],
804            span: None,
805            inferred_type: None,
806        })),
807        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
808    );
809    node.source_name = table.to_string();
810    node
811}
812
813fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
814    let mut parts: Vec<String> = Vec::new();
815    if let Some(catalog) = &table_ref.catalog {
816        parts.push(catalog.name.clone());
817    }
818    if let Some(schema) = &table_ref.schema {
819        parts.push(schema.name.clone());
820    }
821    parts.push(table_ref.name.name.clone());
822    parts.join(".")
823}
824
825fn make_table_column_node_from_source(
826    table_alias: &str,
827    column: &str,
828    source: &Expression,
829) -> LineageNode {
830    let mut node = LineageNode::new(
831        format!("{}.{}", table_alias, column),
832        Expression::Column(Box::new(crate::expressions::Column {
833            name: crate::expressions::Identifier::new(column.to_string()),
834            table: Some(crate::expressions::Identifier::new(table_alias.to_string())),
835            join_mark: false,
836            trailing_comments: vec![],
837            span: None,
838            inferred_type: None,
839        })),
840        source.clone(),
841    );
842
843    if let Expression::Table(table_ref) = source {
844        node.source_name = table_name_from_table_ref(table_ref);
845    } else {
846        node.source_name = table_alias.to_string();
847    }
848
849    node
850}
851
852/// Simple column reference extracted from an expression
853#[derive(Debug, Clone)]
854struct SimpleColumnRef {
855    table: Option<crate::expressions::Identifier>,
856    column: String,
857}
858
859/// Find all column references in an expression (does not recurse into subqueries).
860fn find_column_refs_in_expr(expr: &Expression) -> Vec<SimpleColumnRef> {
861    let mut refs = Vec::new();
862    collect_column_refs(expr, &mut refs);
863    refs
864}
865
866fn collect_column_refs(expr: &Expression, refs: &mut Vec<SimpleColumnRef>) {
867    let mut stack: Vec<&Expression> = vec![expr];
868
869    while let Some(current) = stack.pop() {
870        match current {
871            // === Leaf: collect Column references ===
872            Expression::Column(col) => {
873                refs.push(SimpleColumnRef {
874                    table: col.table.clone(),
875                    column: col.name.name.clone(),
876                });
877            }
878
879            // === Boundary: don't recurse into subqueries (handled separately) ===
880            Expression::Subquery(_) | Expression::Exists(_) => {}
881
882            // === BinaryOp variants: left, right ===
883            Expression::And(op)
884            | Expression::Or(op)
885            | Expression::Eq(op)
886            | Expression::Neq(op)
887            | Expression::Lt(op)
888            | Expression::Lte(op)
889            | Expression::Gt(op)
890            | Expression::Gte(op)
891            | Expression::Add(op)
892            | Expression::Sub(op)
893            | Expression::Mul(op)
894            | Expression::Div(op)
895            | Expression::Mod(op)
896            | Expression::BitwiseAnd(op)
897            | Expression::BitwiseOr(op)
898            | Expression::BitwiseXor(op)
899            | Expression::BitwiseLeftShift(op)
900            | Expression::BitwiseRightShift(op)
901            | Expression::Concat(op)
902            | Expression::Adjacent(op)
903            | Expression::TsMatch(op)
904            | Expression::PropertyEQ(op)
905            | Expression::ArrayContainsAll(op)
906            | Expression::ArrayContainedBy(op)
907            | Expression::ArrayOverlaps(op)
908            | Expression::JSONBContainsAllTopKeys(op)
909            | Expression::JSONBContainsAnyTopKeys(op)
910            | Expression::JSONBDeleteAtPath(op)
911            | Expression::ExtendsLeft(op)
912            | Expression::ExtendsRight(op)
913            | Expression::Is(op)
914            | Expression::MemberOf(op)
915            | Expression::NullSafeEq(op)
916            | Expression::NullSafeNeq(op)
917            | Expression::Glob(op)
918            | Expression::Match(op) => {
919                stack.push(&op.left);
920                stack.push(&op.right);
921            }
922
923            // === UnaryOp variants: this ===
924            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
925                stack.push(&u.this);
926            }
927
928            // === UnaryFunc variants: this ===
929            Expression::Upper(f)
930            | Expression::Lower(f)
931            | Expression::Length(f)
932            | Expression::LTrim(f)
933            | Expression::RTrim(f)
934            | Expression::Reverse(f)
935            | Expression::Abs(f)
936            | Expression::Sqrt(f)
937            | Expression::Cbrt(f)
938            | Expression::Ln(f)
939            | Expression::Exp(f)
940            | Expression::Sign(f)
941            | Expression::Date(f)
942            | Expression::Time(f)
943            | Expression::DateFromUnixDate(f)
944            | Expression::UnixDate(f)
945            | Expression::UnixSeconds(f)
946            | Expression::UnixMillis(f)
947            | Expression::UnixMicros(f)
948            | Expression::TimeStrToDate(f)
949            | Expression::DateToDi(f)
950            | Expression::DiToDate(f)
951            | Expression::TsOrDiToDi(f)
952            | Expression::TsOrDsToDatetime(f)
953            | Expression::TsOrDsToTimestamp(f)
954            | Expression::YearOfWeek(f)
955            | Expression::YearOfWeekIso(f)
956            | Expression::Initcap(f)
957            | Expression::Ascii(f)
958            | Expression::Chr(f)
959            | Expression::Soundex(f)
960            | Expression::ByteLength(f)
961            | Expression::Hex(f)
962            | Expression::LowerHex(f)
963            | Expression::Unicode(f)
964            | Expression::Radians(f)
965            | Expression::Degrees(f)
966            | Expression::Sin(f)
967            | Expression::Cos(f)
968            | Expression::Tan(f)
969            | Expression::Asin(f)
970            | Expression::Acos(f)
971            | Expression::Atan(f)
972            | Expression::IsNan(f)
973            | Expression::IsInf(f)
974            | Expression::ArrayLength(f)
975            | Expression::ArraySize(f)
976            | Expression::Cardinality(f)
977            | Expression::ArrayReverse(f)
978            | Expression::ArrayDistinct(f)
979            | Expression::ArrayFlatten(f)
980            | Expression::ArrayCompact(f)
981            | Expression::Explode(f)
982            | Expression::ExplodeOuter(f)
983            | Expression::ToArray(f)
984            | Expression::MapFromEntries(f)
985            | Expression::MapKeys(f)
986            | Expression::MapValues(f)
987            | Expression::JsonArrayLength(f)
988            | Expression::JsonKeys(f)
989            | Expression::JsonType(f)
990            | Expression::ParseJson(f)
991            | Expression::ToJson(f)
992            | Expression::Typeof(f)
993            | Expression::BitwiseCount(f)
994            | Expression::Year(f)
995            | Expression::Month(f)
996            | Expression::Day(f)
997            | Expression::Hour(f)
998            | Expression::Minute(f)
999            | Expression::Second(f)
1000            | Expression::DayOfWeek(f)
1001            | Expression::DayOfWeekIso(f)
1002            | Expression::DayOfMonth(f)
1003            | Expression::DayOfYear(f)
1004            | Expression::WeekOfYear(f)
1005            | Expression::Quarter(f)
1006            | Expression::Epoch(f)
1007            | Expression::EpochMs(f)
1008            | Expression::TimeStrToUnix(f)
1009            | Expression::SHA(f)
1010            | Expression::SHA1Digest(f)
1011            | Expression::TimeToUnix(f)
1012            | Expression::JSONBool(f)
1013            | Expression::Int64(f)
1014            | Expression::MD5NumberLower64(f)
1015            | Expression::MD5NumberUpper64(f)
1016            | Expression::DateStrToDate(f)
1017            | Expression::DateToDateStr(f) => {
1018                stack.push(&f.this);
1019            }
1020
1021            // === BinaryFunc variants: this, expression ===
1022            Expression::Power(f)
1023            | Expression::NullIf(f)
1024            | Expression::IfNull(f)
1025            | Expression::Nvl(f)
1026            | Expression::UnixToTimeStr(f)
1027            | Expression::Contains(f)
1028            | Expression::StartsWith(f)
1029            | Expression::EndsWith(f)
1030            | Expression::Levenshtein(f)
1031            | Expression::ModFunc(f)
1032            | Expression::Atan2(f)
1033            | Expression::IntDiv(f)
1034            | Expression::AddMonths(f)
1035            | Expression::MonthsBetween(f)
1036            | Expression::NextDay(f)
1037            | Expression::ArrayContains(f)
1038            | Expression::ArrayPosition(f)
1039            | Expression::ArrayAppend(f)
1040            | Expression::ArrayPrepend(f)
1041            | Expression::ArrayUnion(f)
1042            | Expression::ArrayExcept(f)
1043            | Expression::ArrayRemove(f)
1044            | Expression::StarMap(f)
1045            | Expression::MapFromArrays(f)
1046            | Expression::MapContainsKey(f)
1047            | Expression::ElementAt(f)
1048            | Expression::JsonMergePatch(f)
1049            | Expression::JSONBContains(f)
1050            | Expression::JSONBExtract(f) => {
1051                stack.push(&f.this);
1052                stack.push(&f.expression);
1053            }
1054
1055            // === VarArgFunc variants: expressions ===
1056            Expression::Greatest(f)
1057            | Expression::Least(f)
1058            | Expression::Coalesce(f)
1059            | Expression::ArrayConcat(f)
1060            | Expression::ArrayIntersect(f)
1061            | Expression::ArrayZip(f)
1062            | Expression::MapConcat(f)
1063            | Expression::JsonArray(f) => {
1064                for e in &f.expressions {
1065                    stack.push(e);
1066                }
1067            }
1068
1069            // === AggFunc variants: this, filter, having_max, limit ===
1070            Expression::Sum(f)
1071            | Expression::Avg(f)
1072            | Expression::Min(f)
1073            | Expression::Max(f)
1074            | Expression::ArrayAgg(f)
1075            | Expression::CountIf(f)
1076            | Expression::Stddev(f)
1077            | Expression::StddevPop(f)
1078            | Expression::StddevSamp(f)
1079            | Expression::Variance(f)
1080            | Expression::VarPop(f)
1081            | Expression::VarSamp(f)
1082            | Expression::Median(f)
1083            | Expression::Mode(f)
1084            | Expression::First(f)
1085            | Expression::Last(f)
1086            | Expression::AnyValue(f)
1087            | Expression::ApproxDistinct(f)
1088            | Expression::ApproxCountDistinct(f)
1089            | Expression::LogicalAnd(f)
1090            | Expression::LogicalOr(f)
1091            | Expression::Skewness(f)
1092            | Expression::ArrayConcatAgg(f)
1093            | Expression::ArrayUniqueAgg(f)
1094            | Expression::BoolXorAgg(f)
1095            | Expression::BitwiseAndAgg(f)
1096            | Expression::BitwiseOrAgg(f)
1097            | Expression::BitwiseXorAgg(f) => {
1098                stack.push(&f.this);
1099                if let Some(ref filter) = f.filter {
1100                    stack.push(filter);
1101                }
1102                if let Some((ref expr, _)) = f.having_max {
1103                    stack.push(expr);
1104                }
1105                if let Some(ref limit) = f.limit {
1106                    stack.push(limit);
1107                }
1108            }
1109
1110            // === Generic Function / AggregateFunction: args ===
1111            Expression::Function(func) => {
1112                for arg in &func.args {
1113                    stack.push(arg);
1114                }
1115            }
1116            Expression::AggregateFunction(func) => {
1117                for arg in &func.args {
1118                    stack.push(arg);
1119                }
1120                if let Some(ref filter) = func.filter {
1121                    stack.push(filter);
1122                }
1123                if let Some(ref limit) = func.limit {
1124                    stack.push(limit);
1125                }
1126            }
1127
1128            // === WindowFunction: this (skip Over for lineage purposes) ===
1129            Expression::WindowFunction(wf) => {
1130                stack.push(&wf.this);
1131            }
1132
1133            // === Containers and special expressions ===
1134            Expression::Alias(a) => {
1135                stack.push(&a.this);
1136            }
1137            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
1138                stack.push(&c.this);
1139                if let Some(ref fmt) = c.format {
1140                    stack.push(fmt);
1141                }
1142                if let Some(ref def) = c.default {
1143                    stack.push(def);
1144                }
1145            }
1146            Expression::Paren(p) => {
1147                stack.push(&p.this);
1148            }
1149            Expression::Annotated(a) => {
1150                stack.push(&a.this);
1151            }
1152            Expression::Case(case) => {
1153                if let Some(ref operand) = case.operand {
1154                    stack.push(operand);
1155                }
1156                for (cond, result) in &case.whens {
1157                    stack.push(cond);
1158                    stack.push(result);
1159                }
1160                if let Some(ref else_expr) = case.else_ {
1161                    stack.push(else_expr);
1162                }
1163            }
1164            Expression::Collation(c) => {
1165                stack.push(&c.this);
1166            }
1167            Expression::In(i) => {
1168                stack.push(&i.this);
1169                for e in &i.expressions {
1170                    stack.push(e);
1171                }
1172                if let Some(ref q) = i.query {
1173                    stack.push(q);
1174                }
1175                if let Some(ref u) = i.unnest {
1176                    stack.push(u);
1177                }
1178            }
1179            Expression::Between(b) => {
1180                stack.push(&b.this);
1181                stack.push(&b.low);
1182                stack.push(&b.high);
1183            }
1184            Expression::IsNull(n) => {
1185                stack.push(&n.this);
1186            }
1187            Expression::IsTrue(t) | Expression::IsFalse(t) => {
1188                stack.push(&t.this);
1189            }
1190            Expression::IsJson(j) => {
1191                stack.push(&j.this);
1192            }
1193            Expression::Like(l) | Expression::ILike(l) => {
1194                stack.push(&l.left);
1195                stack.push(&l.right);
1196                if let Some(ref esc) = l.escape {
1197                    stack.push(esc);
1198                }
1199            }
1200            Expression::SimilarTo(s) => {
1201                stack.push(&s.this);
1202                stack.push(&s.pattern);
1203                if let Some(ref esc) = s.escape {
1204                    stack.push(esc);
1205                }
1206            }
1207            Expression::Ordered(o) => {
1208                stack.push(&o.this);
1209            }
1210            Expression::Array(a) => {
1211                for e in &a.expressions {
1212                    stack.push(e);
1213                }
1214            }
1215            Expression::Tuple(t) => {
1216                for e in &t.expressions {
1217                    stack.push(e);
1218                }
1219            }
1220            Expression::Struct(s) => {
1221                for (_, e) in &s.fields {
1222                    stack.push(e);
1223                }
1224            }
1225            Expression::Subscript(s) => {
1226                stack.push(&s.this);
1227                stack.push(&s.index);
1228            }
1229            Expression::Dot(d) => {
1230                stack.push(&d.this);
1231            }
1232            Expression::MethodCall(m) => {
1233                stack.push(&m.this);
1234                for arg in &m.args {
1235                    stack.push(arg);
1236                }
1237            }
1238            Expression::ArraySlice(s) => {
1239                stack.push(&s.this);
1240                if let Some(ref start) = s.start {
1241                    stack.push(start);
1242                }
1243                if let Some(ref end) = s.end {
1244                    stack.push(end);
1245                }
1246            }
1247            Expression::Lambda(l) => {
1248                stack.push(&l.body);
1249            }
1250            Expression::NamedArgument(n) => {
1251                stack.push(&n.value);
1252            }
1253            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
1254                stack.push(e);
1255            }
1256
1257            // === Custom function structs ===
1258            Expression::Substring(f) => {
1259                stack.push(&f.this);
1260                stack.push(&f.start);
1261                if let Some(ref len) = f.length {
1262                    stack.push(len);
1263                }
1264            }
1265            Expression::Trim(f) => {
1266                stack.push(&f.this);
1267                if let Some(ref chars) = f.characters {
1268                    stack.push(chars);
1269                }
1270            }
1271            Expression::Replace(f) => {
1272                stack.push(&f.this);
1273                stack.push(&f.old);
1274                stack.push(&f.new);
1275            }
1276            Expression::IfFunc(f) => {
1277                stack.push(&f.condition);
1278                stack.push(&f.true_value);
1279                if let Some(ref fv) = f.false_value {
1280                    stack.push(fv);
1281                }
1282            }
1283            Expression::Nvl2(f) => {
1284                stack.push(&f.this);
1285                stack.push(&f.true_value);
1286                stack.push(&f.false_value);
1287            }
1288            Expression::ConcatWs(f) => {
1289                stack.push(&f.separator);
1290                for e in &f.expressions {
1291                    stack.push(e);
1292                }
1293            }
1294            Expression::Count(f) => {
1295                if let Some(ref this) = f.this {
1296                    stack.push(this);
1297                }
1298                if let Some(ref filter) = f.filter {
1299                    stack.push(filter);
1300                }
1301            }
1302            Expression::GroupConcat(f) => {
1303                stack.push(&f.this);
1304                if let Some(ref sep) = f.separator {
1305                    stack.push(sep);
1306                }
1307                if let Some(ref filter) = f.filter {
1308                    stack.push(filter);
1309                }
1310            }
1311            Expression::StringAgg(f) => {
1312                stack.push(&f.this);
1313                if let Some(ref sep) = f.separator {
1314                    stack.push(sep);
1315                }
1316                if let Some(ref filter) = f.filter {
1317                    stack.push(filter);
1318                }
1319                if let Some(ref limit) = f.limit {
1320                    stack.push(limit);
1321                }
1322            }
1323            Expression::ListAgg(f) => {
1324                stack.push(&f.this);
1325                if let Some(ref sep) = f.separator {
1326                    stack.push(sep);
1327                }
1328                if let Some(ref filter) = f.filter {
1329                    stack.push(filter);
1330                }
1331            }
1332            Expression::SumIf(f) => {
1333                stack.push(&f.this);
1334                stack.push(&f.condition);
1335                if let Some(ref filter) = f.filter {
1336                    stack.push(filter);
1337                }
1338            }
1339            Expression::DateAdd(f) | Expression::DateSub(f) => {
1340                stack.push(&f.this);
1341                stack.push(&f.interval);
1342            }
1343            Expression::DateDiff(f) => {
1344                stack.push(&f.this);
1345                stack.push(&f.expression);
1346            }
1347            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
1348                stack.push(&f.this);
1349            }
1350            Expression::Extract(f) => {
1351                stack.push(&f.this);
1352            }
1353            Expression::Round(f) => {
1354                stack.push(&f.this);
1355                if let Some(ref d) = f.decimals {
1356                    stack.push(d);
1357                }
1358            }
1359            Expression::Floor(f) => {
1360                stack.push(&f.this);
1361                if let Some(ref s) = f.scale {
1362                    stack.push(s);
1363                }
1364                if let Some(ref t) = f.to {
1365                    stack.push(t);
1366                }
1367            }
1368            Expression::Ceil(f) => {
1369                stack.push(&f.this);
1370                if let Some(ref d) = f.decimals {
1371                    stack.push(d);
1372                }
1373                if let Some(ref t) = f.to {
1374                    stack.push(t);
1375                }
1376            }
1377            Expression::Log(f) => {
1378                stack.push(&f.this);
1379                if let Some(ref b) = f.base {
1380                    stack.push(b);
1381                }
1382            }
1383            Expression::AtTimeZone(f) => {
1384                stack.push(&f.this);
1385                stack.push(&f.zone);
1386            }
1387            Expression::Lead(f) | Expression::Lag(f) => {
1388                stack.push(&f.this);
1389                if let Some(ref off) = f.offset {
1390                    stack.push(off);
1391                }
1392                if let Some(ref def) = f.default {
1393                    stack.push(def);
1394                }
1395            }
1396            Expression::FirstValue(f) | Expression::LastValue(f) => {
1397                stack.push(&f.this);
1398            }
1399            Expression::NthValue(f) => {
1400                stack.push(&f.this);
1401                stack.push(&f.offset);
1402            }
1403            Expression::Position(f) => {
1404                stack.push(&f.substring);
1405                stack.push(&f.string);
1406                if let Some(ref start) = f.start {
1407                    stack.push(start);
1408                }
1409            }
1410            Expression::Decode(f) => {
1411                stack.push(&f.this);
1412                for (search, result) in &f.search_results {
1413                    stack.push(search);
1414                    stack.push(result);
1415                }
1416                if let Some(ref def) = f.default {
1417                    stack.push(def);
1418                }
1419            }
1420            Expression::CharFunc(f) => {
1421                for arg in &f.args {
1422                    stack.push(arg);
1423                }
1424            }
1425            Expression::ArraySort(f) => {
1426                stack.push(&f.this);
1427                if let Some(ref cmp) = f.comparator {
1428                    stack.push(cmp);
1429                }
1430            }
1431            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
1432                stack.push(&f.this);
1433                stack.push(&f.separator);
1434                if let Some(ref nr) = f.null_replacement {
1435                    stack.push(nr);
1436                }
1437            }
1438            Expression::ArrayFilter(f) => {
1439                stack.push(&f.this);
1440                stack.push(&f.filter);
1441            }
1442            Expression::ArrayTransform(f) => {
1443                stack.push(&f.this);
1444                stack.push(&f.transform);
1445            }
1446            Expression::Sequence(f)
1447            | Expression::Generate(f)
1448            | Expression::ExplodingGenerateSeries(f) => {
1449                stack.push(&f.start);
1450                stack.push(&f.stop);
1451                if let Some(ref step) = f.step {
1452                    stack.push(step);
1453                }
1454            }
1455            Expression::JsonExtract(f)
1456            | Expression::JsonExtractScalar(f)
1457            | Expression::JsonQuery(f)
1458            | Expression::JsonValue(f) => {
1459                stack.push(&f.this);
1460                stack.push(&f.path);
1461            }
1462            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
1463                stack.push(&f.this);
1464                for p in &f.paths {
1465                    stack.push(p);
1466                }
1467            }
1468            Expression::JsonObject(f) => {
1469                for (k, v) in &f.pairs {
1470                    stack.push(k);
1471                    stack.push(v);
1472                }
1473            }
1474            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
1475                stack.push(&f.this);
1476                for (path, val) in &f.path_values {
1477                    stack.push(path);
1478                    stack.push(val);
1479                }
1480            }
1481            Expression::Overlay(f) => {
1482                stack.push(&f.this);
1483                stack.push(&f.replacement);
1484                stack.push(&f.from);
1485                if let Some(ref len) = f.length {
1486                    stack.push(len);
1487                }
1488            }
1489            Expression::Convert(f) => {
1490                stack.push(&f.this);
1491                if let Some(ref style) = f.style {
1492                    stack.push(style);
1493                }
1494            }
1495            Expression::ApproxPercentile(f) => {
1496                stack.push(&f.this);
1497                stack.push(&f.percentile);
1498                if let Some(ref acc) = f.accuracy {
1499                    stack.push(acc);
1500                }
1501                if let Some(ref filter) = f.filter {
1502                    stack.push(filter);
1503                }
1504            }
1505            Expression::Percentile(f)
1506            | Expression::PercentileCont(f)
1507            | Expression::PercentileDisc(f) => {
1508                stack.push(&f.this);
1509                stack.push(&f.percentile);
1510                if let Some(ref filter) = f.filter {
1511                    stack.push(filter);
1512                }
1513            }
1514            Expression::WithinGroup(f) => {
1515                stack.push(&f.this);
1516            }
1517            Expression::Left(f) | Expression::Right(f) => {
1518                stack.push(&f.this);
1519                stack.push(&f.length);
1520            }
1521            Expression::Repeat(f) => {
1522                stack.push(&f.this);
1523                stack.push(&f.times);
1524            }
1525            Expression::Lpad(f) | Expression::Rpad(f) => {
1526                stack.push(&f.this);
1527                stack.push(&f.length);
1528                if let Some(ref fill) = f.fill {
1529                    stack.push(fill);
1530                }
1531            }
1532            Expression::Split(f) => {
1533                stack.push(&f.this);
1534                stack.push(&f.delimiter);
1535            }
1536            Expression::RegexpLike(f) => {
1537                stack.push(&f.this);
1538                stack.push(&f.pattern);
1539                if let Some(ref flags) = f.flags {
1540                    stack.push(flags);
1541                }
1542            }
1543            Expression::RegexpReplace(f) => {
1544                stack.push(&f.this);
1545                stack.push(&f.pattern);
1546                stack.push(&f.replacement);
1547                if let Some(ref flags) = f.flags {
1548                    stack.push(flags);
1549                }
1550            }
1551            Expression::RegexpExtract(f) => {
1552                stack.push(&f.this);
1553                stack.push(&f.pattern);
1554                if let Some(ref group) = f.group {
1555                    stack.push(group);
1556                }
1557            }
1558            Expression::ToDate(f) => {
1559                stack.push(&f.this);
1560                if let Some(ref fmt) = f.format {
1561                    stack.push(fmt);
1562                }
1563            }
1564            Expression::ToTimestamp(f) => {
1565                stack.push(&f.this);
1566                if let Some(ref fmt) = f.format {
1567                    stack.push(fmt);
1568                }
1569            }
1570            Expression::DateFormat(f) | Expression::FormatDate(f) => {
1571                stack.push(&f.this);
1572                stack.push(&f.format);
1573            }
1574            Expression::LastDay(f) => {
1575                stack.push(&f.this);
1576            }
1577            Expression::FromUnixtime(f) => {
1578                stack.push(&f.this);
1579                if let Some(ref fmt) = f.format {
1580                    stack.push(fmt);
1581                }
1582            }
1583            Expression::UnixTimestamp(f) => {
1584                if let Some(ref this) = f.this {
1585                    stack.push(this);
1586                }
1587                if let Some(ref fmt) = f.format {
1588                    stack.push(fmt);
1589                }
1590            }
1591            Expression::MakeDate(f) => {
1592                stack.push(&f.year);
1593                stack.push(&f.month);
1594                stack.push(&f.day);
1595            }
1596            Expression::MakeTimestamp(f) => {
1597                stack.push(&f.year);
1598                stack.push(&f.month);
1599                stack.push(&f.day);
1600                stack.push(&f.hour);
1601                stack.push(&f.minute);
1602                stack.push(&f.second);
1603                if let Some(ref tz) = f.timezone {
1604                    stack.push(tz);
1605                }
1606            }
1607            Expression::TruncFunc(f) => {
1608                stack.push(&f.this);
1609                if let Some(ref d) = f.decimals {
1610                    stack.push(d);
1611                }
1612            }
1613            Expression::ArrayFunc(f) => {
1614                for e in &f.expressions {
1615                    stack.push(e);
1616                }
1617            }
1618            Expression::Unnest(f) => {
1619                stack.push(&f.this);
1620                for e in &f.expressions {
1621                    stack.push(e);
1622                }
1623            }
1624            Expression::StructFunc(f) => {
1625                for (_, e) in &f.fields {
1626                    stack.push(e);
1627                }
1628            }
1629            Expression::StructExtract(f) => {
1630                stack.push(&f.this);
1631            }
1632            Expression::NamedStruct(f) => {
1633                for (k, v) in &f.pairs {
1634                    stack.push(k);
1635                    stack.push(v);
1636                }
1637            }
1638            Expression::MapFunc(f) => {
1639                for k in &f.keys {
1640                    stack.push(k);
1641                }
1642                for v in &f.values {
1643                    stack.push(v);
1644                }
1645            }
1646            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
1647                stack.push(&f.this);
1648                stack.push(&f.transform);
1649            }
1650            Expression::JsonArrayAgg(f) => {
1651                stack.push(&f.this);
1652                if let Some(ref filter) = f.filter {
1653                    stack.push(filter);
1654                }
1655            }
1656            Expression::JsonObjectAgg(f) => {
1657                stack.push(&f.key);
1658                stack.push(&f.value);
1659                if let Some(ref filter) = f.filter {
1660                    stack.push(filter);
1661                }
1662            }
1663            Expression::NTile(f) => {
1664                if let Some(ref n) = f.num_buckets {
1665                    stack.push(n);
1666                }
1667            }
1668            Expression::Rand(f) => {
1669                if let Some(ref s) = f.seed {
1670                    stack.push(s);
1671                }
1672                if let Some(ref lo) = f.lower {
1673                    stack.push(lo);
1674                }
1675                if let Some(ref hi) = f.upper {
1676                    stack.push(hi);
1677                }
1678            }
1679            Expression::Any(q) | Expression::All(q) => {
1680                stack.push(&q.this);
1681                stack.push(&q.subquery);
1682            }
1683            Expression::Overlaps(o) => {
1684                if let Some(ref this) = o.this {
1685                    stack.push(this);
1686                }
1687                if let Some(ref expr) = o.expression {
1688                    stack.push(expr);
1689                }
1690                if let Some(ref ls) = o.left_start {
1691                    stack.push(ls);
1692                }
1693                if let Some(ref le) = o.left_end {
1694                    stack.push(le);
1695                }
1696                if let Some(ref rs) = o.right_start {
1697                    stack.push(rs);
1698                }
1699                if let Some(ref re) = o.right_end {
1700                    stack.push(re);
1701                }
1702            }
1703            Expression::Interval(i) => {
1704                if let Some(ref this) = i.this {
1705                    stack.push(this);
1706                }
1707            }
1708            Expression::TimeStrToTime(f) => {
1709                stack.push(&f.this);
1710                if let Some(ref zone) = f.zone {
1711                    stack.push(zone);
1712                }
1713            }
1714            Expression::JSONBExtractScalar(f) => {
1715                stack.push(&f.this);
1716                stack.push(&f.expression);
1717                if let Some(ref jt) = f.json_type {
1718                    stack.push(jt);
1719                }
1720            }
1721
1722            // === True leaves and non-expression-bearing nodes ===
1723            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
1724            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
1725            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
1726            _ => {}
1727        }
1728    }
1729}
1730
1731// ---------------------------------------------------------------------------
1732// Tests
1733// ---------------------------------------------------------------------------
1734
1735#[cfg(test)]
1736mod tests {
1737    use super::*;
1738    use crate::dialects::{Dialect, DialectType};
1739    use crate::expressions::DataType;
1740    use crate::optimizer::annotate_types::annotate_types;
1741    use crate::parse_one;
1742    use crate::schema::{MappingSchema, Schema};
1743
1744    fn parse(sql: &str) -> Expression {
1745        let dialect = Dialect::get(DialectType::Generic);
1746        let ast = dialect.parse(sql).unwrap();
1747        ast.into_iter().next().unwrap()
1748    }
1749
1750    #[test]
1751    fn test_simple_lineage() {
1752        let expr = parse("SELECT a FROM t");
1753        let node = lineage("a", &expr, None, false).unwrap();
1754
1755        assert_eq!(node.name, "a");
1756        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
1757        // Should trace to t.a
1758        let names = node.downstream_names();
1759        assert!(
1760            names.iter().any(|n| n == "t.a"),
1761            "Expected t.a in downstream, got: {:?}",
1762            names
1763        );
1764    }
1765
1766    #[test]
1767    fn test_lineage_walk() {
1768        let root = LineageNode {
1769            name: "col_a".to_string(),
1770            expression: Expression::Null(crate::expressions::Null),
1771            source: Expression::Null(crate::expressions::Null),
1772            downstream: vec![LineageNode::new(
1773                "t.a",
1774                Expression::Null(crate::expressions::Null),
1775                Expression::Null(crate::expressions::Null),
1776            )],
1777            source_name: String::new(),
1778            reference_node_name: String::new(),
1779        };
1780
1781        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
1782        assert_eq!(names.len(), 2);
1783        assert_eq!(names[0], "col_a");
1784        assert_eq!(names[1], "t.a");
1785    }
1786
1787    #[test]
1788    fn test_aliased_column() {
1789        let expr = parse("SELECT a + 1 AS b FROM t");
1790        let node = lineage("b", &expr, None, false).unwrap();
1791
1792        assert_eq!(node.name, "b");
1793        // Should trace through the expression to t.a
1794        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
1795        assert!(
1796            all_names.iter().any(|n| n.contains("a")),
1797            "Expected to trace to column a, got: {:?}",
1798            all_names
1799        );
1800    }
1801
1802    #[test]
1803    fn test_qualified_column() {
1804        let expr = parse("SELECT t.a FROM t");
1805        let node = lineage("a", &expr, None, false).unwrap();
1806
1807        assert_eq!(node.name, "a");
1808        let names = node.downstream_names();
1809        assert!(
1810            names.iter().any(|n| n == "t.a"),
1811            "Expected t.a, got: {:?}",
1812            names
1813        );
1814    }
1815
1816    #[test]
1817    fn test_unqualified_column() {
1818        let expr = parse("SELECT a FROM t");
1819        let node = lineage("a", &expr, None, false).unwrap();
1820
1821        // Unqualified but single source → resolved to t.a
1822        let names = node.downstream_names();
1823        assert!(
1824            names.iter().any(|n| n == "t.a"),
1825            "Expected t.a, got: {:?}",
1826            names
1827        );
1828    }
1829
1830    #[test]
1831    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
1832        let query = "SELECT name FROM users";
1833        let dialect = Dialect::get(DialectType::BigQuery);
1834        let expr = dialect
1835            .parse(query)
1836            .unwrap()
1837            .into_iter()
1838            .next()
1839            .expect("expected one expression");
1840
1841        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
1842        schema
1843            .add_table("users", &[("name".into(), DataType::Text)], None)
1844            .expect("schema setup");
1845
1846        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
1847            .expect("lineage without schema");
1848        let mut expr_without = node_without_schema.expression.clone();
1849        annotate_types(
1850            &mut expr_without,
1851            Some(&schema),
1852            Some(DialectType::BigQuery),
1853        );
1854        assert_eq!(
1855            expr_without.inferred_type(),
1856            None,
1857            "Expected unresolved root type without schema-aware lineage qualification"
1858        );
1859
1860        let node_with_schema = lineage_with_schema(
1861            "name",
1862            &expr,
1863            Some(&schema),
1864            Some(DialectType::BigQuery),
1865            false,
1866        )
1867        .expect("lineage with schema");
1868        let mut expr_with = node_with_schema.expression.clone();
1869        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
1870
1871        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
1872    }
1873
1874    #[test]
1875    fn test_lineage_with_schema_correlated_scalar_subquery() {
1876        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
1877        let dialect = Dialect::get(DialectType::BigQuery);
1878        let expr = dialect
1879            .parse(query)
1880            .unwrap()
1881            .into_iter()
1882            .next()
1883            .expect("expected one expression");
1884
1885        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
1886        schema
1887            .add_table(
1888                "t1",
1889                &[("id".into(), DataType::BigInt { length: None })],
1890                None,
1891            )
1892            .expect("schema setup");
1893        schema
1894            .add_table(
1895                "t2",
1896                &[
1897                    ("id".into(), DataType::BigInt { length: None }),
1898                    ("val".into(), DataType::BigInt { length: None }),
1899                ],
1900                None,
1901            )
1902            .expect("schema setup");
1903
1904        let node = lineage_with_schema(
1905            "id",
1906            &expr,
1907            Some(&schema),
1908            Some(DialectType::BigQuery),
1909            false,
1910        )
1911        .expect("lineage_with_schema should handle correlated scalar subqueries");
1912
1913        assert_eq!(node.name, "id");
1914    }
1915
1916    #[test]
1917    fn test_lineage_with_schema_join_using() {
1918        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
1919        let dialect = Dialect::get(DialectType::BigQuery);
1920        let expr = dialect
1921            .parse(query)
1922            .unwrap()
1923            .into_iter()
1924            .next()
1925            .expect("expected one expression");
1926
1927        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
1928        schema
1929            .add_table(
1930                "t1",
1931                &[("a".into(), DataType::BigInt { length: None })],
1932                None,
1933            )
1934            .expect("schema setup");
1935        schema
1936            .add_table(
1937                "t2",
1938                &[("a".into(), DataType::BigInt { length: None })],
1939                None,
1940            )
1941            .expect("schema setup");
1942
1943        let node = lineage_with_schema(
1944            "a",
1945            &expr,
1946            Some(&schema),
1947            Some(DialectType::BigQuery),
1948            false,
1949        )
1950        .expect("lineage_with_schema should handle JOIN USING");
1951
1952        assert_eq!(node.name, "a");
1953    }
1954
1955    #[test]
1956    fn test_lineage_with_schema_qualified_table_name() {
1957        let query = "SELECT a FROM raw.t1";
1958        let dialect = Dialect::get(DialectType::BigQuery);
1959        let expr = dialect
1960            .parse(query)
1961            .unwrap()
1962            .into_iter()
1963            .next()
1964            .expect("expected one expression");
1965
1966        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
1967        schema
1968            .add_table(
1969                "raw.t1",
1970                &[("a".into(), DataType::BigInt { length: None })],
1971                None,
1972            )
1973            .expect("schema setup");
1974
1975        let node = lineage_with_schema(
1976            "a",
1977            &expr,
1978            Some(&schema),
1979            Some(DialectType::BigQuery),
1980            false,
1981        )
1982        .expect("lineage_with_schema should handle dotted schema.table names");
1983
1984        assert_eq!(node.name, "a");
1985    }
1986
1987    #[test]
1988    fn test_lineage_with_schema_none_matches_lineage() {
1989        let expr = parse("SELECT a FROM t");
1990        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
1991        let with_none =
1992            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
1993
1994        assert_eq!(with_none.name, baseline.name);
1995        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
1996    }
1997
1998    #[test]
1999    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
2000        let dialect = Dialect::get(DialectType::BigQuery);
2001        let expr = dialect
2002            .parse("SELECT Name AS name FROM teams")
2003            .unwrap()
2004            .into_iter()
2005            .next()
2006            .expect("expected one expression");
2007
2008        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
2009        schema
2010            .add_table(
2011                "teams",
2012                &[("Name".into(), DataType::String { length: None })],
2013                None,
2014            )
2015            .expect("schema setup");
2016
2017        let node = lineage_with_schema(
2018            "name",
2019            &expr,
2020            Some(&schema),
2021            Some(DialectType::BigQuery),
2022            false,
2023        )
2024        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
2025
2026        let names = node.downstream_names();
2027        assert!(
2028            names.iter().any(|n| n == "teams.Name"),
2029            "Expected teams.Name in downstream, got: {:?}",
2030            names
2031        );
2032    }
2033
2034    #[test]
2035    fn test_lineage_bigquery_mixed_case_alias_lookup() {
2036        let dialect = Dialect::get(DialectType::BigQuery);
2037        let expr = dialect
2038            .parse("SELECT Name AS Name FROM teams")
2039            .unwrap()
2040            .into_iter()
2041            .next()
2042            .expect("expected one expression");
2043
2044        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
2045            .expect("lineage should resolve mixed-case aliases in BigQuery");
2046
2047        assert_eq!(node.name, "name");
2048    }
2049
2050    #[test]
2051    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
2052        let expr = parse_one(
2053            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2054            DialectType::Snowflake,
2055        )
2056        .expect("parse");
2057
2058        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2059        schema
2060            .add_table(
2061                "fact.some_daily_metrics",
2062                &[("date_utc".to_string(), DataType::Date)],
2063                None,
2064            )
2065            .expect("schema setup");
2066
2067        let node = lineage_with_schema(
2068            "recency",
2069            &expr,
2070            Some(&schema),
2071            Some(DialectType::Snowflake),
2072            false,
2073        )
2074        .expect("lineage_with_schema should not treat date part as a column");
2075
2076        let names = node.downstream_names();
2077        assert!(
2078            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2079            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2080            names
2081        );
2082        assert!(
2083            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2084            "Did not expect date part to appear as lineage column, got: {:?}",
2085            names
2086        );
2087    }
2088
2089    #[test]
2090    fn test_snowflake_datediff_parses_to_typed_ast() {
2091        let expr = parse_one(
2092            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
2093            DialectType::Snowflake,
2094        )
2095        .expect("parse");
2096
2097        match expr {
2098            Expression::Select(select) => match &select.expressions[0] {
2099                Expression::Alias(alias) => match &alias.this {
2100                    Expression::DateDiff(f) => {
2101                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
2102                    }
2103                    other => panic!("expected DateDiff, got {other:?}"),
2104                },
2105                other => panic!("expected Alias, got {other:?}"),
2106            },
2107            other => panic!("expected Select, got {other:?}"),
2108        }
2109    }
2110
2111    #[test]
2112    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
2113        let expr = parse_one(
2114            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2115            DialectType::Snowflake,
2116        )
2117        .expect("parse");
2118
2119        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2120        schema
2121            .add_table(
2122                "fact.some_daily_metrics",
2123                &[("date_utc".to_string(), DataType::Date)],
2124                None,
2125            )
2126            .expect("schema setup");
2127
2128        let node = lineage_with_schema(
2129            "next_day",
2130            &expr,
2131            Some(&schema),
2132            Some(DialectType::Snowflake),
2133            false,
2134        )
2135        .expect("lineage_with_schema should not treat DATEADD date part as a column");
2136
2137        let names = node.downstream_names();
2138        assert!(
2139            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2140            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2141            names
2142        );
2143        assert!(
2144            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2145            "Did not expect date part to appear as lineage column, got: {:?}",
2146            names
2147        );
2148    }
2149
2150    #[test]
2151    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
2152        let expr = parse_one(
2153            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2154            DialectType::Snowflake,
2155        )
2156        .expect("parse");
2157
2158        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2159        schema
2160            .add_table(
2161                "fact.some_daily_metrics",
2162                &[("date_utc".to_string(), DataType::Date)],
2163                None,
2164            )
2165            .expect("schema setup");
2166
2167        let node = lineage_with_schema(
2168            "day_part",
2169            &expr,
2170            Some(&schema),
2171            Some(DialectType::Snowflake),
2172            false,
2173        )
2174        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
2175
2176        let names = node.downstream_names();
2177        assert!(
2178            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2179            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2180            names
2181        );
2182        assert!(
2183            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
2184            "Did not expect date part to appear as lineage column, got: {:?}",
2185            names
2186        );
2187    }
2188
2189    #[test]
2190    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
2191        let expr = parse_one(
2192            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2193            DialectType::Snowflake,
2194        )
2195        .expect("parse");
2196
2197        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
2198        schema
2199            .add_table(
2200                "fact.some_daily_metrics",
2201                &[("date_utc".to_string(), DataType::Date)],
2202                None,
2203            )
2204            .expect("schema setup");
2205
2206        let node = lineage_with_schema(
2207            "day_part",
2208            &expr,
2209            Some(&schema),
2210            Some(DialectType::Snowflake),
2211            false,
2212        )
2213        .expect("quoted DATE_PART should continue to work");
2214
2215        let names = node.downstream_names();
2216        assert!(
2217            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
2218            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
2219            names
2220        );
2221    }
2222
2223    #[test]
2224    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
2225        let expr = parse_one(
2226            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
2227            DialectType::Snowflake,
2228        )
2229        .expect("parse");
2230
2231        match expr {
2232            Expression::Select(select) => match &select.expressions[0] {
2233                Expression::Alias(alias) => match &alias.this {
2234                    Expression::Function(f) => {
2235                        assert_eq!(f.name.to_uppercase(), "DATEADD");
2236                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2237                    }
2238                    other => panic!("expected generic DATEADD function, got {other:?}"),
2239                },
2240                other => panic!("expected Alias, got {other:?}"),
2241            },
2242            other => panic!("expected Select, got {other:?}"),
2243        }
2244    }
2245
2246    #[test]
2247    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
2248        let expr = parse_one(
2249            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
2250            DialectType::Snowflake,
2251        )
2252        .expect("parse");
2253
2254        match expr {
2255            Expression::Select(select) => match &select.expressions[0] {
2256                Expression::Alias(alias) => match &alias.this {
2257                    Expression::Function(f) => {
2258                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2259                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
2260                    }
2261                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2262                },
2263                other => panic!("expected Alias, got {other:?}"),
2264            },
2265            other => panic!("expected Select, got {other:?}"),
2266        }
2267    }
2268
2269    #[test]
2270    fn test_snowflake_date_part_string_literal_stays_generic_function() {
2271        let expr = parse_one(
2272            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
2273            DialectType::Snowflake,
2274        )
2275        .expect("parse");
2276
2277        match expr {
2278            Expression::Select(select) => match &select.expressions[0] {
2279                Expression::Alias(alias) => match &alias.this {
2280                    Expression::Function(f) => {
2281                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
2282                    }
2283                    other => panic!("expected generic DATE_PART function, got {other:?}"),
2284                },
2285                other => panic!("expected Alias, got {other:?}"),
2286            },
2287            other => panic!("expected Select, got {other:?}"),
2288        }
2289    }
2290
2291    #[test]
2292    fn test_lineage_join() {
2293        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2294
2295        let node_a = lineage("a", &expr, None, false).unwrap();
2296        let names_a = node_a.downstream_names();
2297        assert!(
2298            names_a.iter().any(|n| n == "t.a"),
2299            "Expected t.a, got: {:?}",
2300            names_a
2301        );
2302
2303        let node_b = lineage("b", &expr, None, false).unwrap();
2304        let names_b = node_b.downstream_names();
2305        assert!(
2306            names_b.iter().any(|n| n == "s.b"),
2307            "Expected s.b, got: {:?}",
2308            names_b
2309        );
2310    }
2311
2312    #[test]
2313    fn test_lineage_alias_leaf_has_resolved_source_name() {
2314        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
2315        let node = lineage("col1", &expr, None, false).unwrap();
2316
2317        // Keep alias in the display lineage edge.
2318        let names = node.downstream_names();
2319        assert!(
2320            names.iter().any(|n| n == "t1.col1"),
2321            "Expected aliased column edge t1.col1, got: {:?}",
2322            names
2323        );
2324
2325        // Leaf should expose the resolved base table for consumers.
2326        let leaf = node
2327            .downstream
2328            .iter()
2329            .find(|n| n.name == "t1.col1")
2330            .expect("Expected t1.col1 leaf");
2331        assert_eq!(leaf.source_name, "table1");
2332        match &leaf.source {
2333            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
2334            _ => panic!("Expected leaf source to be a table expression"),
2335        }
2336    }
2337
2338    #[test]
2339    fn test_lineage_derived_table() {
2340        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
2341        let node = lineage("a", &expr, None, false).unwrap();
2342
2343        assert_eq!(node.name, "a");
2344        // Should trace through the derived table to t.a
2345        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2346        assert!(
2347            all_names.iter().any(|n| n == "t.a"),
2348            "Expected to trace through derived table to t.a, got: {:?}",
2349            all_names
2350        );
2351    }
2352
2353    #[test]
2354    fn test_lineage_cte() {
2355        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
2356        let node = lineage("a", &expr, None, false).unwrap();
2357
2358        assert_eq!(node.name, "a");
2359        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2360        assert!(
2361            all_names.iter().any(|n| n == "t.a"),
2362            "Expected to trace through CTE to t.a, got: {:?}",
2363            all_names
2364        );
2365    }
2366
2367    #[test]
2368    fn test_lineage_union() {
2369        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
2370        let node = lineage("a", &expr, None, false).unwrap();
2371
2372        assert_eq!(node.name, "a");
2373        // Should have 2 downstream branches
2374        assert_eq!(
2375            node.downstream.len(),
2376            2,
2377            "Expected 2 branches for UNION, got {}",
2378            node.downstream.len()
2379        );
2380    }
2381
2382    #[test]
2383    fn test_lineage_cte_union() {
2384        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
2385        let node = lineage("a", &expr, None, false).unwrap();
2386
2387        // Should trace through CTE into both UNION branches
2388        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2389        assert!(
2390            all_names.len() >= 3,
2391            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
2392            all_names
2393        );
2394    }
2395
2396    #[test]
2397    fn test_lineage_star() {
2398        let expr = parse("SELECT * FROM t");
2399        let node = lineage("*", &expr, None, false).unwrap();
2400
2401        assert_eq!(node.name, "*");
2402        // Should have downstream for table t
2403        assert!(
2404            !node.downstream.is_empty(),
2405            "Star should produce downstream nodes"
2406        );
2407    }
2408
2409    #[test]
2410    fn test_lineage_subquery_in_select() {
2411        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
2412        let node = lineage("x", &expr, None, false).unwrap();
2413
2414        assert_eq!(node.name, "x");
2415        // Should have traced into the scalar subquery
2416        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2417        assert!(
2418            all_names.len() >= 2,
2419            "Expected tracing into scalar subquery, got: {:?}",
2420            all_names
2421        );
2422    }
2423
2424    #[test]
2425    fn test_lineage_multiple_columns() {
2426        let expr = parse("SELECT a, b FROM t");
2427
2428        let node_a = lineage("a", &expr, None, false).unwrap();
2429        let node_b = lineage("b", &expr, None, false).unwrap();
2430
2431        assert_eq!(node_a.name, "a");
2432        assert_eq!(node_b.name, "b");
2433
2434        // Each should trace independently
2435        let names_a = node_a.downstream_names();
2436        let names_b = node_b.downstream_names();
2437        assert!(names_a.iter().any(|n| n == "t.a"));
2438        assert!(names_b.iter().any(|n| n == "t.b"));
2439    }
2440
2441    #[test]
2442    fn test_get_source_tables() {
2443        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
2444        let node = lineage("a", &expr, None, false).unwrap();
2445
2446        let tables = get_source_tables(&node);
2447        assert!(
2448            tables.contains("t"),
2449            "Expected source table 't', got: {:?}",
2450            tables
2451        );
2452    }
2453
2454    #[test]
2455    fn test_lineage_column_not_found() {
2456        let expr = parse("SELECT a FROM t");
2457        let result = lineage("nonexistent", &expr, None, false);
2458        assert!(result.is_err());
2459    }
2460
2461    #[test]
2462    fn test_lineage_nested_cte() {
2463        let expr = parse(
2464            "WITH cte1 AS (SELECT a FROM t), \
2465             cte2 AS (SELECT a FROM cte1) \
2466             SELECT a FROM cte2",
2467        );
2468        let node = lineage("a", &expr, None, false).unwrap();
2469
2470        // Should trace through cte2 → cte1 → t
2471        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2472        assert!(
2473            all_names.len() >= 3,
2474            "Expected to trace through nested CTEs, got: {:?}",
2475            all_names
2476        );
2477    }
2478
2479    #[test]
2480    fn test_trim_selects_true() {
2481        let expr = parse("SELECT a, b, c FROM t");
2482        let node = lineage("a", &expr, None, true).unwrap();
2483
2484        // The source should be trimmed to only include 'a'
2485        if let Expression::Select(select) = &node.source {
2486            assert_eq!(
2487                select.expressions.len(),
2488                1,
2489                "Trimmed source should have 1 expression, got {}",
2490                select.expressions.len()
2491            );
2492        } else {
2493            panic!("Expected Select source");
2494        }
2495    }
2496
2497    #[test]
2498    fn test_trim_selects_false() {
2499        let expr = parse("SELECT a, b, c FROM t");
2500        let node = lineage("a", &expr, None, false).unwrap();
2501
2502        // The source should keep all columns
2503        if let Expression::Select(select) = &node.source {
2504            assert_eq!(
2505                select.expressions.len(),
2506                3,
2507                "Untrimmed source should have 3 expressions"
2508            );
2509        } else {
2510            panic!("Expected Select source");
2511        }
2512    }
2513
2514    #[test]
2515    fn test_lineage_expression_in_select() {
2516        let expr = parse("SELECT a + b AS c FROM t");
2517        let node = lineage("c", &expr, None, false).unwrap();
2518
2519        // Should trace to both a and b from t
2520        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
2521        assert!(
2522            all_names.len() >= 3,
2523            "Expected to trace a + b to both columns, got: {:?}",
2524            all_names
2525        );
2526    }
2527
2528    #[test]
2529    fn test_set_operation_by_index() {
2530        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
2531
2532        // Trace column "a" which is at index 0
2533        let node = lineage("a", &expr, None, false).unwrap();
2534
2535        // UNION branches should be traced by index
2536        assert_eq!(node.downstream.len(), 2);
2537    }
2538
2539    // --- Tests for column lineage inside function calls (issue #18) ---
2540
2541    fn print_node(node: &LineageNode, indent: usize) {
2542        let pad = "  ".repeat(indent);
2543        println!(
2544            "{pad}name={:?} source_name={:?}",
2545            node.name, node.source_name
2546        );
2547        for child in &node.downstream {
2548            print_node(child, indent + 1);
2549        }
2550    }
2551
2552    #[test]
2553    fn test_issue18_repro() {
2554        // Exact scenario from the issue
2555        let query = "SELECT UPPER(name) as upper_name FROM users";
2556        println!("Query: {query}\n");
2557
2558        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
2559        let exprs = dialect.parse(query).unwrap();
2560        let expr = &exprs[0];
2561
2562        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
2563        println!("lineage(\"upper_name\"):");
2564        print_node(&node, 1);
2565
2566        let names = node.downstream_names();
2567        assert!(
2568            names.iter().any(|n| n == "users.name"),
2569            "Expected users.name in downstream, got: {:?}",
2570            names
2571        );
2572    }
2573
2574    #[test]
2575    fn test_lineage_upper_function() {
2576        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
2577        let node = lineage("upper_name", &expr, None, false).unwrap();
2578
2579        let names = node.downstream_names();
2580        assert!(
2581            names.iter().any(|n| n == "users.name"),
2582            "Expected users.name in downstream, got: {:?}",
2583            names
2584        );
2585    }
2586
2587    #[test]
2588    fn test_lineage_round_function() {
2589        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
2590        let node = lineage("rounded", &expr, None, false).unwrap();
2591
2592        let names = node.downstream_names();
2593        assert!(
2594            names.iter().any(|n| n == "products.price"),
2595            "Expected products.price in downstream, got: {:?}",
2596            names
2597        );
2598    }
2599
2600    #[test]
2601    fn test_lineage_coalesce_function() {
2602        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
2603        let node = lineage("val", &expr, None, false).unwrap();
2604
2605        let names = node.downstream_names();
2606        assert!(
2607            names.iter().any(|n| n == "t.a"),
2608            "Expected t.a in downstream, got: {:?}",
2609            names
2610        );
2611        assert!(
2612            names.iter().any(|n| n == "t.b"),
2613            "Expected t.b in downstream, got: {:?}",
2614            names
2615        );
2616    }
2617
2618    #[test]
2619    fn test_lineage_count_function() {
2620        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
2621        let node = lineage("cnt", &expr, None, false).unwrap();
2622
2623        let names = node.downstream_names();
2624        assert!(
2625            names.iter().any(|n| n == "t.id"),
2626            "Expected t.id in downstream, got: {:?}",
2627            names
2628        );
2629    }
2630
2631    #[test]
2632    fn test_lineage_sum_function() {
2633        let expr = parse("SELECT SUM(amount) AS total FROM t");
2634        let node = lineage("total", &expr, None, false).unwrap();
2635
2636        let names = node.downstream_names();
2637        assert!(
2638            names.iter().any(|n| n == "t.amount"),
2639            "Expected t.amount in downstream, got: {:?}",
2640            names
2641        );
2642    }
2643
2644    #[test]
2645    fn test_lineage_case_with_nested_functions() {
2646        let expr =
2647            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
2648        let node = lineage("result", &expr, None, false).unwrap();
2649
2650        let names = node.downstream_names();
2651        assert!(
2652            names.iter().any(|n| n == "t.x"),
2653            "Expected t.x in downstream, got: {:?}",
2654            names
2655        );
2656        assert!(
2657            names.iter().any(|n| n == "t.name"),
2658            "Expected t.name in downstream, got: {:?}",
2659            names
2660        );
2661    }
2662
2663    #[test]
2664    fn test_lineage_substring_function() {
2665        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
2666        let node = lineage("short", &expr, None, false).unwrap();
2667
2668        let names = node.downstream_names();
2669        assert!(
2670            names.iter().any(|n| n == "t.name"),
2671            "Expected t.name in downstream, got: {:?}",
2672            names
2673        );
2674    }
2675
2676    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
2677
2678    /// sqlglot: test_node_name_doesnt_contain_comment
2679    /// Comments in column expressions should not affect lineage resolution.
2680    /// NOTE: This test uses SELECT * from a derived table, which is a separate
2681    /// known limitation in polyglot-sql (star expansion in subqueries).
2682    #[test]
2683    #[ignore = "requires derived table star expansion (separate issue)"]
2684    fn test_node_name_doesnt_contain_comment() {
2685        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
2686        let node = lineage("x", &expr, None, false).unwrap();
2687
2688        assert_eq!(node.name, "x");
2689        assert!(!node.downstream.is_empty());
2690    }
2691
2692    /// A line comment between SELECT and the first column wraps the column
2693    /// in an Annotated node. Lineage must unwrap it to find the column name.
2694    /// Verify that commented and uncommented queries produce identical lineage.
2695    #[test]
2696    fn test_comment_before_first_column_in_cte() {
2697        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
2698        let sql_without_comment = "with t as (select 1 as a) select a from t";
2699
2700        // Without comment — baseline
2701        let expr_ok = parse(sql_without_comment);
2702        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
2703
2704        // With comment — should produce identical lineage
2705        let expr_comment = parse(sql_with_comment);
2706        let node_comment = lineage("a", &expr_comment, None, false)
2707            .expect("with comment before first column should succeed");
2708
2709        assert_eq!(node_ok.name, node_comment.name, "node names should match");
2710        assert_eq!(
2711            node_ok.downstream_names(),
2712            node_comment.downstream_names(),
2713            "downstream lineage should be identical with or without comment"
2714        );
2715    }
2716
2717    /// Block comment between SELECT and first column.
2718    #[test]
2719    fn test_block_comment_before_first_column() {
2720        let sql = "with t as (select 1 as a) select /* section */ a from t";
2721        let expr = parse(sql);
2722        let node = lineage("a", &expr, None, false)
2723            .expect("block comment before first column should succeed");
2724        assert_eq!(node.name, "a");
2725        assert!(
2726            !node.downstream.is_empty(),
2727            "should have downstream lineage"
2728        );
2729    }
2730
2731    /// Comment before first column should not affect second column resolution.
2732    #[test]
2733    fn test_comment_before_first_column_second_col_ok() {
2734        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
2735        let expr = parse(sql);
2736
2737        let node_a =
2738            lineage("a", &expr, None, false).expect("column a with comment should succeed");
2739        assert_eq!(node_a.name, "a");
2740
2741        let node_b =
2742            lineage("b", &expr, None, false).expect("column b with comment should succeed");
2743        assert_eq!(node_b.name, "b");
2744    }
2745
2746    /// Aliased column with preceding comment.
2747    #[test]
2748    fn test_comment_before_aliased_column() {
2749        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
2750        let expr = parse(sql);
2751        let node =
2752            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
2753        assert_eq!(node.name, "y");
2754        assert!(
2755            !node.downstream.is_empty(),
2756            "aliased column should have downstream lineage"
2757        );
2758    }
2759}