Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, JoinKind, NamedWindow, Select};
10use crate::generator::Generator;
11use crate::optimizer::annotate_types::annotate_types;
12use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
13use crate::schema::{normalize_name, Schema};
14use crate::scope::{
15    build_scope, find_all_in_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind,
16};
17use crate::{Error, Result};
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, HashSet};
20
21/// A node in the column lineage graph
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct LineageNode {
24    /// Name of this lineage step (e.g., "table.column")
25    pub name: String,
26    /// The expression at this node
27    pub expression: Expression,
28    /// The source expression (the full query context)
29    pub source: Expression,
30    /// Downstream nodes that depend on this one
31    pub downstream: Vec<LineageNode>,
32    /// Optional source name (e.g., for derived tables)
33    pub source_name: String,
34    /// Semantic source kind for downstream consumers.
35    pub source_kind: SourceKind,
36    /// User-written source alias when different from canonical source name.
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub source_alias: Option<String>,
39    /// Optional reference node name (e.g., for CTEs)
40    pub reference_node_name: String,
41}
42
43impl LineageNode {
44    /// Create a new lineage node
45    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
46        Self {
47            name: name.into(),
48            expression,
49            source,
50            downstream: Vec::new(),
51            source_name: String::new(),
52            source_kind: SourceKind::Unknown,
53            source_alias: None,
54            reference_node_name: String::new(),
55        }
56    }
57
58    /// Iterate over all nodes in the lineage graph using DFS
59    pub fn walk(&self) -> LineageWalker<'_> {
60        LineageWalker { stack: vec![self] }
61    }
62
63    /// Get all downstream column names
64    pub fn downstream_names(&self) -> Vec<String> {
65        self.downstream.iter().map(|n| n.name.clone()).collect()
66    }
67}
68
69fn source_kind_for_scope_context(
70    scope: &Scope,
71    source_name: &str,
72    reference_node_name: &str,
73) -> SourceKind {
74    if source_name.is_empty() && reference_node_name.is_empty() {
75        return SourceKind::Root;
76    }
77    if let Some(source_info) = scope.sources.get(source_name) {
78        return source_info.kind;
79    }
80    if scope.cte_sources.contains_key(source_name) {
81        return SourceKind::Cte;
82    }
83    match scope.scope_type {
84        ScopeType::Cte => SourceKind::Cte,
85        ScopeType::DerivedTable => SourceKind::DerivedTable,
86        ScopeType::Udtf => SourceKind::Virtual,
87        _ => SourceKind::Unknown,
88    }
89}
90
91fn apply_scope_context(
92    node: &mut LineageNode,
93    scope: &Scope,
94    source_name: &str,
95    reference_node_name: &str,
96) {
97    node.source_name = source_name.to_string();
98    node.reference_node_name = reference_node_name.to_string();
99    node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
100}
101
102/// Iterator for walking the lineage graph
103pub struct LineageWalker<'a> {
104    stack: Vec<&'a LineageNode>,
105}
106
107impl<'a> Iterator for LineageWalker<'a> {
108    type Item = &'a LineageNode;
109
110    fn next(&mut self) -> Option<Self::Item> {
111        if let Some(node) = self.stack.pop() {
112            // Add children in reverse order so they're visited in order
113            for child in node.downstream.iter().rev() {
114                self.stack.push(child);
115            }
116            Some(node)
117        } else {
118            None
119        }
120    }
121}
122
123// ---------------------------------------------------------------------------
124// ColumnRef: name or positional index for column lookup
125// ---------------------------------------------------------------------------
126
127/// Column reference for lineage tracing — by name or positional index.
128enum ColumnRef<'a> {
129    Name(&'a str),
130    Index(usize),
131}
132
133// ---------------------------------------------------------------------------
134// Public API
135// ---------------------------------------------------------------------------
136
137/// Build the lineage graph for a column in a SQL query
138///
139/// # Arguments
140/// * `column` - The column name to trace lineage for
141/// * `sql` - The SQL expression (SELECT, UNION, etc.)
142/// * `dialect` - Optional dialect for parsing
143/// * `trim_selects` - If true, trim the source SELECT to only include the target column
144///
145/// # Returns
146/// The root lineage node for the specified column
147///
148/// # Example
149/// ```ignore
150/// use polyglot_sql::lineage::lineage;
151/// use polyglot_sql::parse_one;
152/// use polyglot_sql::DialectType;
153///
154/// let sql = "SELECT a, b + 1 AS c FROM t";
155/// let expr = parse_one(sql, DialectType::Generic).unwrap();
156/// let node = lineage("c", &expr, None, false).unwrap();
157/// ```
158pub fn lineage(
159    column: &str,
160    sql: &Expression,
161    dialect: Option<DialectType>,
162    trim_selects: bool,
163) -> Result<LineageNode> {
164    let mut owned = lineage_normalized_expression(sql);
165    // Fast path: skip clone when there are no CTEs to expand
166    if has_lineage_with_clause(&owned) {
167        expand_cte_stars(&mut owned, None);
168    }
169    lineage_from_expression(column, &owned, dialect, trim_selects)
170}
171
172/// Build the lineage graph for a column in a SQL query using optional schema metadata.
173///
174/// When `schema` is provided, the query is first qualified with
175/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
176/// ambiguous column references.
177///
178/// # Arguments
179/// * `column` - The column name to trace lineage for
180/// * `sql` - The SQL expression (SELECT, UNION, etc.)
181/// * `schema` - Optional schema used for qualification
182/// * `dialect` - Optional dialect for qualification and lineage handling
183/// * `trim_selects` - If true, trim the source SELECT to only include the target column
184///
185/// # Returns
186/// The root lineage node for the specified column
187pub fn lineage_with_schema(
188    column: &str,
189    sql: &Expression,
190    schema: Option<&dyn Schema>,
191    dialect: Option<DialectType>,
192    trim_selects: bool,
193) -> Result<LineageNode> {
194    let normalized_expression = lineage_normalized_expression(sql);
195    let mut qualified_expression = if let Some(schema) = schema {
196        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197            QualifyColumnsOptions::new().with_dialect(dialect_type)
198        } else {
199            QualifyColumnsOptions::new()
200        };
201
202        qualify_columns(normalized_expression.clone(), schema, &options).map_err(|e| {
203            Error::internal(format!("Lineage qualification failed with schema: {}", e))
204        })?
205    } else {
206        normalized_expression
207    };
208
209    // Annotate types in-place so lineage nodes carry type information
210    annotate_types(&mut qualified_expression, schema, dialect);
211
212    // Expand CTE stars on the already-owned expression (no extra clone).
213    // Pass schema so that stars from external tables can also be resolved.
214    expand_cte_stars(&mut qualified_expression, schema);
215
216    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
217}
218
219fn lineage_from_expression(
220    column: &str,
221    sql: &Expression,
222    dialect: Option<DialectType>,
223    trim_selects: bool,
224) -> Result<LineageNode> {
225    let scope = build_scope(sql);
226    to_node(
227        ColumnRef::Name(column),
228        &scope,
229        dialect,
230        "",
231        "",
232        "",
233        trim_selects,
234    )
235}
236
237pub(crate) fn lineage_by_index_from_expression(
238    column_index: usize,
239    sql: &Expression,
240    dialect: Option<DialectType>,
241    trim_selects: bool,
242) -> Result<LineageNode> {
243    let normalized = lineage_normalized_expression(sql);
244    let scope = build_scope(&normalized);
245    to_node(
246        ColumnRef::Index(column_index),
247        &scope,
248        dialect,
249        "",
250        "",
251        "",
252        trim_selects,
253    )
254}
255
256fn lineage_normalized_expression(sql: &Expression) -> Expression {
257    match sql {
258        Expression::Prepare(prepare) => lineage_normalized_expression(&prepare.statement),
259        Expression::CreateTable(create) => create
260            .as_select
261            .as_ref()
262            .map(|query| attach_with_to_query(query.clone(), create.with_cte.clone()))
263            .unwrap_or_else(|| sql.clone()),
264        Expression::CreateView(create) => lineage_normalized_expression(&create.query),
265        Expression::Insert(insert) => insert
266            .query
267            .as_ref()
268            .map(|query| attach_with_to_query(query.clone(), insert.with.clone()))
269            .unwrap_or_else(|| sql.clone()),
270        _ => sql.clone(),
271    }
272}
273
274fn attach_with_to_query(
275    mut query: Expression,
276    with: Option<crate::expressions::With>,
277) -> Expression {
278    if let Some(with) = with {
279        attach_with_to_query_mut(&mut query, with);
280    }
281    query
282}
283
284fn attach_with_to_query_mut(query: &mut Expression, with: crate::expressions::With) {
285    match query {
286        Expression::Select(select) => {
287            if select.with.is_none() {
288                select.with = Some(with);
289            }
290        }
291        Expression::Union(union) => {
292            if union.with.is_none() {
293                union.with = Some(with);
294            }
295        }
296        Expression::Intersect(intersect) => {
297            if intersect.with.is_none() {
298                intersect.with = Some(with);
299            }
300        }
301        Expression::Except(except) => {
302            if except.with.is_none() {
303                except.with = Some(with);
304            }
305        }
306        Expression::Paren(paren) => attach_with_to_query_mut(&mut paren.this, with),
307        _ => {}
308    }
309}
310
311fn has_lineage_with_clause(expr: &Expression) -> bool {
312    match expr {
313        Expression::Select(select) => select.with.is_some(),
314        Expression::Union(union) => {
315            union.with.is_some()
316                || has_lineage_with_clause(&union.left)
317                || has_lineage_with_clause(&union.right)
318        }
319        Expression::Intersect(intersect) => {
320            intersect.with.is_some()
321                || has_lineage_with_clause(&intersect.left)
322                || has_lineage_with_clause(&intersect.right)
323        }
324        Expression::Except(except) => {
325            except.with.is_some()
326                || has_lineage_with_clause(&except.left)
327                || has_lineage_with_clause(&except.right)
328        }
329        Expression::Paren(paren) => has_lineage_with_clause(&paren.this),
330        _ => false,
331    }
332}
333
334// ---------------------------------------------------------------------------
335// CTE star expansion
336// ---------------------------------------------------------------------------
337
338/// Normalize an identifier for CTE name matching.
339///
340/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
341/// quoted identifiers preserve their original case. This matches sqlglot's
342/// `normalize_identifiers` behavior.
343fn normalize_cte_name(ident: &Identifier) -> String {
344    if ident.quoted {
345        ident.name.clone()
346    } else {
347        ident.name.to_lowercase()
348    }
349}
350
351/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
352/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
353/// which qualify_columns cannot resolve because it processes each SELECT independently.
354///
355/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
356/// by looking up column names in the schema. This enables correct expansion of patterns
357/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
358///
359/// CTE name matching follows SQL identifier semantics: unquoted names are compared
360/// case-insensitively (lowercased), while quoted names preserve their original case.
361/// This matches sqlglot's `normalize_identifiers` behavior.
362pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
363    if let Expression::Prepare(prepare) = expr {
364        expand_cte_stars(&mut prepare.statement, schema);
365        return;
366    }
367
368    let select = match expr {
369        Expression::Select(s) => s,
370        _ => return,
371    };
372
373    let with = match &mut select.with {
374        Some(w) => w,
375        None => return,
376    };
377
378    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
379
380    for cte in &mut with.ctes {
381        let cte_name = normalize_cte_name(&cte.alias);
382
383        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
384        if !cte.columns.is_empty() {
385            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
386            resolved_cte_columns.insert(cte_name, cols);
387            continue;
388        }
389
390        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
391        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
392        // references itself. We detect this conservatively: if the CTE name appears as
393        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
394        // block are still expanded.
395        if with.recursive {
396            let is_self_referencing =
397                if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
398                    let body_sources = get_select_sources(body_select);
399                    body_sources.iter().any(|s| s.normalized == cte_name)
400                } else {
401                    false
402                };
403            if is_self_referencing {
404                continue;
405            }
406        }
407
408        // Get the SELECT from the CTE body (handle UNION by taking left branch)
409        let body_select = match get_leftmost_select_mut(&mut cte.this) {
410            Some(s) => s,
411            None => continue,
412        };
413
414        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
415        resolved_cte_columns.insert(cte_name, columns);
416    }
417
418    // Also expand stars in the outer SELECT itself
419    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
420}
421
422/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
423///
424/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
425/// are determined by the left branch. This matches sqlglot's behavior.
426fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
427    let mut current = expr;
428    for _ in 0..MAX_LINEAGE_DEPTH {
429        match current {
430            Expression::Select(s) => return Some(s),
431            Expression::Union(u) => current = &mut u.left,
432            Expression::Intersect(i) => current = &mut i.left,
433            Expression::Except(e) => current = &mut e.left,
434            Expression::Paren(p) => current = &mut p.this,
435            _ => return None,
436        }
437    }
438    None
439}
440
441/// Rewrite star expressions in a SELECT using resolved CTE column lists.
442/// Falls back to `schema` for external table column lookup.
443/// Returns the list of output column names after expansion.
444fn rewrite_stars_in_select(
445    select: &mut Select,
446    resolved_ctes: &HashMap<String, Vec<String>>,
447    schema: Option<&dyn Schema>,
448) -> Vec<String> {
449    // The AST represents star expressions in two forms depending on syntax:
450    //   - `SELECT *`      → Expression::Star (unqualified star)
451    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
452    // Both must be checked to handle all star patterns.
453    let has_star = select
454        .expressions
455        .iter()
456        .any(|e| matches!(e, Expression::Star(_)));
457    let has_qualified_star = select
458        .expressions
459        .iter()
460        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
461
462    if !has_star && !has_qualified_star {
463        // No stars — just extract column names without rewriting
464        return select
465            .expressions
466            .iter()
467            .filter_map(get_expression_output_name)
468            .collect();
469    }
470
471    let sources = get_select_sources(select);
472    let mut new_expressions = Vec::new();
473    let mut result_columns = Vec::new();
474
475    for expr in &select.expressions {
476        match expr {
477            Expression::Star(star) => {
478                let qual = star.table.as_ref();
479                if let Some(expanded) =
480                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
481                {
482                    for (src_alias, col_name) in &expanded {
483                        let table_id = Identifier::new(src_alias);
484                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
485                        result_columns.push(col_name.clone());
486                    }
487                } else {
488                    new_expressions.push(expr.clone());
489                    result_columns.push("*".to_string());
490                }
491            }
492            Expression::Column(c) if c.name.name == "*" => {
493                let qual = c.table.as_ref();
494                if let Some(expanded) =
495                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
496                {
497                    for (_src_alias, col_name) in &expanded {
498                        // Keep the original table qualifier for qualified stars (table.*)
499                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
500                        result_columns.push(col_name.clone());
501                    }
502                } else {
503                    new_expressions.push(expr.clone());
504                    result_columns.push("*".to_string());
505                }
506            }
507            _ => {
508                new_expressions.push(expr.clone());
509                if let Some(name) = get_expression_output_name(expr) {
510                    result_columns.push(name);
511                }
512            }
513        }
514    }
515
516    select.expressions = new_expressions;
517    result_columns
518}
519
520/// Try to expand a star expression by looking up source columns from resolved CTEs,
521/// falling back to the schema for external tables.
522/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
523/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
524fn expand_star_from_sources(
525    qualifier: Option<&Identifier>,
526    sources: &[SourceInfo],
527    resolved_ctes: &HashMap<String, Vec<String>>,
528    schema: Option<&dyn Schema>,
529) -> Option<Vec<(String, String)>> {
530    let mut expanded = Vec::new();
531
532    if let Some(qual) = qualifier {
533        // Qualified star: table.*
534        let qual_normalized = normalize_cte_name(qual);
535        for src in sources {
536            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
537                // Try CTE first
538                if let Some(cols) = resolved_ctes.get(&src.normalized) {
539                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
540                    return Some(expanded);
541                }
542                // Fall back to schema
543                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
544                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
545                    return Some(expanded);
546                }
547            }
548        }
549        None
550    } else {
551        // Unqualified star: expand all sources.
552        // Intentionally conservative: if any source can't be resolved, the entire
553        // expansion is aborted. Partial expansion would produce an incomplete column
554        // list, causing downstream lineage resolution to silently omit columns.
555        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
556        let mut any_expanded = false;
557        for src in sources {
558            if let Some(cols) = resolved_ctes.get(&src.normalized) {
559                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
560                any_expanded = true;
561            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
562                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
563                any_expanded = true;
564            } else {
565                return None;
566            }
567        }
568        if any_expanded {
569            Some(expanded)
570        } else {
571            None
572        }
573    }
574}
575
576/// Look up column names for a table from the schema.
577fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
578    let schema = schema?;
579    if fq_name.is_empty() {
580        return None;
581    }
582    schema
583        .column_names(fq_name)
584        .ok()
585        .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
586}
587
588/// Create a Column expression with the given name and optional table qualifier.
589fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
590    Expression::Column(Box::new(crate::expressions::Column {
591        name: Identifier::new(name),
592        table: table.cloned(),
593        join_mark: false,
594        trailing_comments: Vec::new(),
595        span: None,
596        inferred_type: None,
597    }))
598}
599
600/// Extract the output name of a SELECT expression.
601fn get_expression_output_name(expr: &Expression) -> Option<String> {
602    match expr {
603        Expression::Alias(a) => Some(a.alias.name.clone()),
604        Expression::Column(c) => Some(c.name.name.clone()),
605        Expression::Identifier(id) => Some(id.name.clone()),
606        Expression::Star(_) => Some("*".to_string()),
607        _ => None,
608    }
609}
610
611/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
612struct SourceInfo {
613    alias: String,
614    /// Whether this source was introduced through a quoted identifier.
615    ///
616    /// The schema-less star passthrough heuristic must stay conservative for
617    /// quoted sources because unresolved quoted table names can be distinct
618    /// from similarly named CTEs that older scope paths still compare
619    /// case-insensitively.
620    quoted: bool,
621    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
622    normalized: String,
623    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
624    fq_name: String,
625}
626
627/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
628/// SELECT's FROM and JOIN clauses in a single pass.
629fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
630    let mut sources = Vec::new();
631
632    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
633        fn virtual_source_info(alias: &Identifier) -> SourceInfo {
634            SourceInfo {
635                alias: alias.name.clone(),
636                quoted: alias.quoted,
637                normalized: normalize_cte_name(alias),
638                fq_name: alias.name.clone(),
639            }
640        }
641
642        fn named_virtual_source_info(alias: &str) -> SourceInfo {
643            SourceInfo {
644                alias: alias.to_string(),
645                quoted: false,
646                normalized: alias.to_lowercase(),
647                fq_name: alias.to_string(),
648            }
649        }
650
651        match expr {
652            Expression::Table(t) => {
653                let normalized = normalize_cte_name(&t.name);
654                let alias = t
655                    .alias
656                    .as_ref()
657                    .map(|a| a.name.clone())
658                    .unwrap_or_else(|| t.name.name.clone());
659                let mut parts = Vec::new();
660                if let Some(catalog) = &t.catalog {
661                    parts.push(catalog.name.clone());
662                }
663                if let Some(schema) = &t.schema {
664                    parts.push(schema.name.clone());
665                }
666                parts.push(t.name.name.clone());
667                let fq_name = parts.join(".");
668                Some(SourceInfo {
669                    alias,
670                    quoted: t.name.quoted,
671                    normalized,
672                    fq_name,
673                })
674            }
675            Expression::Subquery(s) => {
676                let alias_identifier = s.alias.as_ref()?;
677                let alias = alias_identifier.name.clone();
678                let normalized = alias.to_lowercase();
679                let fq_name = alias.clone();
680                Some(SourceInfo {
681                    alias,
682                    quoted: alias_identifier.quoted,
683                    normalized,
684                    fq_name,
685                })
686            }
687            Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
688            Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
689                Some(virtual_source_info(&a.alias))
690            }
691            Expression::Alias(a) if is_query_like_relation(&a.this) => {
692                Some(virtual_source_info(&a.alias))
693            }
694            Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
695            Expression::LateralView(lateral_view) => lateral_view
696                .table_alias
697                .as_ref()
698                .or_else(|| lateral_view.column_aliases.first())
699                .map(virtual_source_info),
700            Expression::Pivot(pivot) => {
701                let alias = pivot_lineage_source_name(
702                    &pivot.this,
703                    pivot.alias.as_ref().map(|alias| alias.name.as_str()),
704                );
705                Some(SourceInfo {
706                    alias: alias.clone(),
707                    quoted: false,
708                    normalized: alias.to_lowercase(),
709                    fq_name: alias,
710                })
711            }
712            Expression::Unpivot(unpivot) => {
713                let alias = pivot_lineage_source_name(
714                    &unpivot.this,
715                    unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
716                );
717                Some(SourceInfo {
718                    alias: alias.clone(),
719                    quoted: false,
720                    normalized: alias.to_lowercase(),
721                    fq_name: alias,
722                })
723            }
724            Expression::Paren(p) => extract_source(&p.this),
725            _ => None,
726        }
727    }
728
729    if let Some(from) = &select.from {
730        for expr in &from.expressions {
731            if let Some(info) = extract_source(expr) {
732                sources.push(info);
733            }
734        }
735    }
736    for join in &select.joins {
737        if is_semi_or_anti_join_kind(join.kind) {
738            continue;
739        }
740        if let Some(info) = extract_source(&join.this) {
741            sources.push(info);
742        }
743    }
744    for lateral_view in &select.lateral_views {
745        if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
746        {
747            sources.push(info);
748        }
749    }
750    sources
751}
752
753fn pivot_lineage_source_name(source: &Expression, explicit_alias: Option<&str>) -> String {
754    if let Some(alias) = explicit_alias {
755        return alias.to_string();
756    }
757
758    match source {
759        Expression::Table(table) => table
760            .alias
761            .as_ref()
762            .map(|alias| alias.name.clone())
763            .unwrap_or_else(|| table.name.name.clone()),
764        Expression::Subquery(subquery) => subquery
765            .alias
766            .as_ref()
767            .map(|alias| alias.name.clone())
768            .unwrap_or_else(|| "_0".to_string()),
769        Expression::Paren(paren) => pivot_lineage_source_name(&paren.this, explicit_alias),
770        _ => "_0".to_string(),
771    }
772}
773
774/// Get all source tables from a lineage graph
775pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
776    let mut tables = HashSet::new();
777    collect_source_tables(node, &mut tables);
778    tables
779}
780
781/// Recursively collect source table names from lineage graph
782pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
783    if let Expression::Table(table) = &node.source {
784        tables.insert(table.name.name.clone());
785    }
786    for child in &node.downstream {
787        collect_source_tables(child, tables);
788    }
789}
790
791// ---------------------------------------------------------------------------
792// Core recursive lineage builder
793// ---------------------------------------------------------------------------
794
795/// Maximum recursion depth for lineage tracing to prevent stack overflow
796/// on circular or deeply nested CTE chains.
797const MAX_LINEAGE_DEPTH: usize = 64;
798
799/// Recursively build a lineage node for a column in a scope.
800fn to_node(
801    column: ColumnRef<'_>,
802    scope: &Scope,
803    dialect: Option<DialectType>,
804    scope_name: &str,
805    source_name: &str,
806    reference_node_name: &str,
807    trim_selects: bool,
808) -> Result<LineageNode> {
809    to_node_inner(
810        column,
811        scope,
812        dialect,
813        scope_name,
814        source_name,
815        reference_node_name,
816        trim_selects,
817        &[],
818        0,
819    )
820}
821
822fn to_node_inner(
823    column: ColumnRef<'_>,
824    scope: &Scope,
825    dialect: Option<DialectType>,
826    scope_name: &str,
827    source_name: &str,
828    reference_node_name: &str,
829    trim_selects: bool,
830    ancestor_cte_scopes: &[Scope],
831    depth: usize,
832) -> Result<LineageNode> {
833    if depth > MAX_LINEAGE_DEPTH {
834        return Err(Error::internal(format!(
835            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
836        )));
837    }
838    let scope_expr = &scope.expression;
839
840    // Build combined CTE scopes: current scope's cte_scopes + ancestors
841    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
842    for s in ancestor_cte_scopes {
843        all_cte_scopes.push(s);
844    }
845    let descendant_cte_scopes = descendant_cte_scope_clones(&all_cte_scopes, scope);
846
847    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
848    //    but we need the inner query (SELECT/UNION) for column lookup.
849    let effective_expr = effective_scope_expression(scope_expr);
850
851    // 1. Set operations (UNION / INTERSECT / EXCEPT)
852    if matches!(
853        effective_expr,
854        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
855    ) {
856        // For CTE wrapping a set op, create a temporary scope with the inner expression
857        if matches!(scope_expr, Expression::Cte(_)) {
858            let mut inner_scope = Scope::new(effective_expr.clone());
859            inner_scope.union_scopes = scope.union_scopes.clone();
860            inner_scope.sources = scope.sources.clone();
861            inner_scope.cte_sources = scope.cte_sources.clone();
862            inner_scope.cte_scopes = scope.cte_scopes.clone();
863            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
864            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
865            return handle_set_operation(
866                &column,
867                &inner_scope,
868                dialect,
869                scope_name,
870                source_name,
871                reference_node_name,
872                trim_selects,
873                &descendant_cte_scopes,
874                depth,
875            );
876        }
877        return handle_set_operation(
878            &column,
879            scope,
880            dialect,
881            scope_name,
882            source_name,
883            reference_node_name,
884            trim_selects,
885            &descendant_cte_scopes,
886            depth,
887        );
888    }
889
890    // 2. Find the select expression for this column
891    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
892    let column_name = resolve_column_name(&column, &select_expr);
893
894    // 3. Trim source if requested
895    let node_source = if trim_selects {
896        trim_source(effective_expr, &select_expr)
897    } else {
898        effective_expr.clone()
899    };
900
901    // 4. Create the lineage node
902    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
903    apply_scope_context(&mut node, scope, source_name, reference_node_name);
904
905    // 5. Star handling — add downstream for each source
906    if let Expression::Star(star) = &select_expr {
907        let star_table = star
908            .table
909            .as_ref()
910            .map(|identifier| identifier.name.as_str());
911        for (name, source_info) in &scope.sources {
912            if let Some(star_table) = star_table {
913                let table_matches = name.eq_ignore_ascii_case(star_table)
914                    || source_info
915                        .alias
916                        .as_deref()
917                        .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
918                    || matches!(
919                        &source_info.expression,
920                        Expression::Table(table_ref)
921                            if table_name_from_table_ref(table_ref).eq_ignore_ascii_case(star_table)
922                    );
923                if !table_matches {
924                    continue;
925                }
926            }
927
928            let mut child = LineageNode::new(
929                format!("{}.*", name),
930                Expression::Star(crate::expressions::Star {
931                    table: star.table.clone(),
932                    except: None,
933                    replace: None,
934                    rename: None,
935                    trailing_comments: vec![],
936                    span: None,
937                }),
938                source_info.expression.clone(),
939            );
940            apply_source_info_context(&mut child, name, source_info);
941            node.downstream.push(child);
942        }
943        return Ok(node);
944    }
945
946    // 6. Subqueries in select — trace through scalar subqueries
947    for query in query_expressions_in_scope(&select_expr) {
948        for sq_scope in &scope.subquery_scopes {
949            if sq_scope.expression == *query {
950                if let Ok(child) = to_node_inner(
951                    ColumnRef::Index(0),
952                    sq_scope,
953                    dialect,
954                    &column_name,
955                    "",
956                    "",
957                    trim_selects,
958                    &descendant_cte_scopes,
959                    depth + 1,
960                ) {
961                    node.downstream.push(child);
962                }
963                break;
964            }
965        }
966    }
967
968    // 7. Column references — trace each column to its source
969    let col_refs = find_column_refs_in_expr_with_select(&select_expr, effective_expr, dialect);
970    for col_ref in col_refs {
971        let col_name = &col_ref.column;
972        if let Some(ref table_id) = col_ref.table {
973            let tbl = &table_id.name;
974            resolve_qualified_column(
975                &mut node,
976                scope,
977                dialect,
978                tbl,
979                col_name,
980                &column_name,
981                trim_selects,
982                &all_cte_scopes,
983                depth,
984            );
985        } else {
986            if let Some(alias_expr) =
987                find_prior_select_alias_expr(effective_expr, &select_expr, col_name, dialect)
988            {
989                for alias_ref in
990                    find_column_refs_in_expr_with_select(&alias_expr, effective_expr, dialect)
991                {
992                    if let Some(ref table_id) = alias_ref.table {
993                        resolve_qualified_column(
994                            &mut node,
995                            scope,
996                            dialect,
997                            &table_id.name,
998                            &alias_ref.column,
999                            &column_name,
1000                            trim_selects,
1001                            &all_cte_scopes,
1002                            depth,
1003                        );
1004                    } else {
1005                        resolve_unqualified_column(
1006                            &mut node,
1007                            scope,
1008                            dialect,
1009                            &alias_ref.column,
1010                            &column_name,
1011                            trim_selects,
1012                            &all_cte_scopes,
1013                            depth,
1014                        );
1015                    }
1016                }
1017                continue;
1018            }
1019
1020            resolve_unqualified_column(
1021                &mut node,
1022                scope,
1023                dialect,
1024                col_name,
1025                &column_name,
1026                trim_selects,
1027                &all_cte_scopes,
1028                depth,
1029            );
1030        }
1031    }
1032
1033    Ok(node)
1034}
1035
1036fn descendant_cte_scope_clones(all_cte_scopes: &[&Scope], current_scope: &Scope) -> Vec<Scope> {
1037    all_cte_scopes
1038        .iter()
1039        .filter(|scope| scope.expression != current_scope.expression)
1040        .map(|scope| (*scope).clone())
1041        .collect()
1042}
1043
1044fn effective_scope_expression(expr: &Expression) -> &Expression {
1045    match expr {
1046        Expression::Cte(cte) => effective_scope_expression(&cte.this),
1047        Expression::Subquery(subquery) => effective_scope_expression(&subquery.this),
1048        Expression::Paren(paren) => effective_scope_expression(&paren.this),
1049        other => other,
1050    }
1051}
1052
1053fn query_expressions_in_scope(expr: &Expression) -> Vec<&Expression> {
1054    let mut queries = Vec::new();
1055    let mut seen = HashSet::new();
1056
1057    for node in find_all_in_scope(
1058        expr,
1059        |node| {
1060            matches!(
1061                node,
1062                Expression::Subquery(subquery) if subquery.alias.is_none()
1063            ) || matches!(
1064                node,
1065                Expression::Exists(_) | Expression::In(_) | Expression::Any(_) | Expression::All(_)
1066            )
1067        },
1068        false,
1069    ) {
1070        let query = match node {
1071            Expression::Subquery(subquery) if subquery.alias.is_none() => Some(&subquery.this),
1072            Expression::Exists(exists) => Some(&exists.this),
1073            Expression::In(in_expr) => in_expr.query.as_ref(),
1074            Expression::Any(quantified) | Expression::All(quantified) => Some(&quantified.subquery),
1075            _ => None,
1076        };
1077
1078        if let Some(query) = query {
1079            let key = query as *const Expression as usize;
1080            if seen.insert(key) {
1081                queries.push(query);
1082            }
1083        }
1084    }
1085
1086    queries
1087}
1088
1089// ---------------------------------------------------------------------------
1090// Set operation handling
1091// ---------------------------------------------------------------------------
1092
1093fn handle_set_operation(
1094    column: &ColumnRef<'_>,
1095    scope: &Scope,
1096    dialect: Option<DialectType>,
1097    scope_name: &str,
1098    source_name: &str,
1099    reference_node_name: &str,
1100    trim_selects: bool,
1101    ancestor_cte_scopes: &[Scope],
1102    depth: usize,
1103) -> Result<LineageNode> {
1104    let scope_expr = &scope.expression;
1105
1106    // Determine column index
1107    let col_index = match column {
1108        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
1109        ColumnRef::Index(i) => *i,
1110    };
1111
1112    let col_name = match column {
1113        ColumnRef::Name(name) => name.to_string(),
1114        ColumnRef::Index(_) => format!("_{col_index}"),
1115    };
1116
1117    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
1118    apply_scope_context(&mut node, scope, source_name, reference_node_name);
1119
1120    // Recurse into each union branch
1121    for branch_scope in &scope.union_scopes {
1122        if let Ok(child) = to_node_inner(
1123            ColumnRef::Index(col_index),
1124            branch_scope,
1125            dialect,
1126            scope_name,
1127            "",
1128            "",
1129            trim_selects,
1130            ancestor_cte_scopes,
1131            depth + 1,
1132        ) {
1133            node.downstream.push(child);
1134        }
1135    }
1136
1137    Ok(node)
1138}
1139
1140// ---------------------------------------------------------------------------
1141// Column resolution helpers
1142// ---------------------------------------------------------------------------
1143
1144fn resolve_qualified_column(
1145    node: &mut LineageNode,
1146    scope: &Scope,
1147    dialect: Option<DialectType>,
1148    table: &str,
1149    col_name: &str,
1150    parent_name: &str,
1151    trim_selects: bool,
1152    all_cte_scopes: &[&Scope],
1153    depth: usize,
1154) {
1155    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
1156    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
1157    let resolved_cte_name = resolve_cte_alias(scope, table);
1158    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
1159
1160    if let Some(source_info) = scope
1161        .sources
1162        .get(table)
1163        .or_else(|| scope.sources.get(effective_table))
1164    {
1165        match &source_info.expression {
1166            Expression::Pivot(pivot) => {
1167                if attach_pivot_dependencies(
1168                    node,
1169                    scope,
1170                    dialect,
1171                    pivot,
1172                    col_name,
1173                    trim_selects,
1174                    all_cte_scopes,
1175                    depth,
1176                ) {
1177                    return;
1178                }
1179            }
1180            Expression::Unpivot(unpivot) => {
1181                if attach_unpivot_dependencies(
1182                    node,
1183                    scope,
1184                    dialect,
1185                    unpivot,
1186                    col_name,
1187                    trim_selects,
1188                    all_cte_scopes,
1189                    depth,
1190                ) {
1191                    return;
1192                }
1193            }
1194            _ => {}
1195        }
1196    }
1197
1198    // Check if table is a CTE reference — check both the current scope's cte_sources
1199    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
1200    let is_cte = scope.cte_sources.contains_key(effective_table)
1201        || all_cte_scopes.iter().any(
1202            |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
1203        );
1204    if is_cte {
1205        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
1206            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
1207            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1208            if let Ok(child) = to_node_inner(
1209                ColumnRef::Name(col_name),
1210                child_scope,
1211                dialect,
1212                parent_name,
1213                effective_table,
1214                parent_name,
1215                trim_selects,
1216                &ancestors,
1217                depth + 1,
1218            ) {
1219                node.downstream.push(child);
1220                return;
1221            }
1222        }
1223
1224        if let Some(source_info) = scope
1225            .sources
1226            .get(table)
1227            .or_else(|| scope.sources.get(effective_table))
1228            .filter(|source_info| source_info.kind == SourceKind::Cte)
1229        {
1230            node.downstream.push(make_table_column_node_from_source(
1231                effective_table,
1232                col_name,
1233                source_info,
1234            ));
1235            return;
1236        }
1237    }
1238
1239    // Check if table is a derived table (is_scope = true in sources)
1240    if let Some(source_info) = scope.sources.get(table) {
1241        if source_info.is_scope {
1242            if let Some(child_scope) = find_child_scope(scope, table) {
1243                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1244                if let Ok(child) = to_node_inner(
1245                    ColumnRef::Name(col_name),
1246                    child_scope,
1247                    dialect,
1248                    parent_name,
1249                    table,
1250                    parent_name,
1251                    trim_selects,
1252                    &ancestors,
1253                    depth + 1,
1254                ) {
1255                    node.downstream.push(child);
1256                    return;
1257                }
1258            }
1259        }
1260    }
1261
1262    // Base table source found in current scope: preserve alias in the display name
1263    // but store the resolved table expression and name for downstream consumers.
1264    if let Some(source_info) = scope.sources.get(table) {
1265        if !source_info.is_scope {
1266            let mut child = make_table_column_node_from_source(table, col_name, source_info);
1267            if source_info.kind == SourceKind::Virtual {
1268                attach_virtual_source_dependencies(
1269                    &mut child,
1270                    scope,
1271                    dialect,
1272                    table,
1273                    &source_info.expression,
1274                    trim_selects,
1275                    all_cte_scopes,
1276                    depth,
1277                );
1278            }
1279            node.downstream.push(child);
1280            return;
1281        }
1282    }
1283
1284    // Base table or unresolved — terminal node
1285    node.downstream
1286        .push(make_table_column_node(table, col_name));
1287}
1288
1289fn attach_pivot_dependencies(
1290    node: &mut LineageNode,
1291    scope: &Scope,
1292    dialect: Option<DialectType>,
1293    pivot: &crate::expressions::Pivot,
1294    col_name: &str,
1295    trim_selects: bool,
1296    all_cte_scopes: &[&Scope],
1297    depth: usize,
1298) -> bool {
1299    if pivot.unpivot {
1300        return false;
1301    }
1302
1303    let mapping = pivot_lineage_column_mapping(pivot, scope, dialect);
1304    let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1305        if pivot_implicit_source_column(pivot, col_name) {
1306            let col_ref = SimpleColumnRef {
1307                table: None,
1308                column: col_name.to_string(),
1309            };
1310            attach_pivot_input_column(
1311                node,
1312                scope,
1313                dialect,
1314                &pivot.this,
1315                &col_ref,
1316                trim_selects,
1317                all_cte_scopes,
1318                depth,
1319            );
1320            return true;
1321        }
1322        return false;
1323    };
1324
1325    for col_ref in input_columns {
1326        attach_pivot_input_column(
1327            node,
1328            scope,
1329            dialect,
1330            &pivot.this,
1331            col_ref,
1332            trim_selects,
1333            all_cte_scopes,
1334            depth,
1335        );
1336    }
1337    true
1338}
1339
1340fn attach_unpivot_dependencies(
1341    node: &mut LineageNode,
1342    scope: &Scope,
1343    dialect: Option<DialectType>,
1344    unpivot: &crate::expressions::Unpivot,
1345    col_name: &str,
1346    trim_selects: bool,
1347    all_cte_scopes: &[&Scope],
1348    depth: usize,
1349) -> bool {
1350    let mapping = unpivot_column_mapping(unpivot, dialect);
1351    let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1352        return false;
1353    };
1354
1355    for col_ref in input_columns {
1356        attach_pivot_input_column(
1357            node,
1358            scope,
1359            dialect,
1360            &unpivot.this,
1361            col_ref,
1362            trim_selects,
1363            all_cte_scopes,
1364            depth,
1365        );
1366    }
1367    true
1368}
1369
1370fn pivot_column_mapping(
1371    pivot: &crate::expressions::Pivot,
1372    dialect: Option<DialectType>,
1373) -> HashMap<String, Vec<SimpleColumnRef>> {
1374    let aggregations = pivot_aggregation_expressions(pivot);
1375    let output_columns = pivot_generated_output_columns(pivot, dialect);
1376    if aggregations.is_empty() || output_columns.is_empty() {
1377        return HashMap::new();
1378    }
1379
1380    let mut mapping = HashMap::new();
1381    for (agg_index, agg) in aggregations.iter().enumerate() {
1382        let input_columns = find_column_refs_in_expr(agg, dialect);
1383        if input_columns.is_empty() {
1384            continue;
1385        }
1386        for col_index in (agg_index..output_columns.len()).step_by(aggregations.len()) {
1387            mapping.insert(
1388                normalize_column_name(&output_columns[col_index], dialect),
1389                input_columns.clone(),
1390            );
1391        }
1392    }
1393    mapping
1394}
1395
1396fn pivot_lineage_column_mapping(
1397    pivot: &crate::expressions::Pivot,
1398    scope: &Scope,
1399    dialect: Option<DialectType>,
1400) -> HashMap<String, Vec<SimpleColumnRef>> {
1401    let mut mapping = pivot_column_mapping(pivot, dialect);
1402    let Some(pre_pivot_columns) = pre_pivot_output_columns(&pivot.this, scope) else {
1403        return mapping;
1404    };
1405
1406    let output_columns = pivot_output_columns(pivot, &pre_pivot_columns, dialect);
1407    if output_columns.is_empty() {
1408        return mapping;
1409    }
1410
1411    let base_mapping = mapping.clone();
1412    for (post_name, pre_name) in output_columns {
1413        let normalized_pre = normalize_column_name(&pre_name, dialect);
1414        let normalized_post = normalize_column_name(&post_name, dialect);
1415
1416        if let Some(input_columns) = base_mapping.get(&normalized_pre) {
1417            mapping.insert(normalized_post, input_columns.clone());
1418        } else {
1419            mapping.insert(
1420                normalized_post,
1421                vec![SimpleColumnRef {
1422                    table: None,
1423                    column: pre_name,
1424                }],
1425            );
1426        }
1427    }
1428
1429    mapping
1430}
1431
1432fn pre_pivot_output_columns(source: &Expression, scope: &Scope) -> Option<Vec<String>> {
1433    match source {
1434        Expression::Subquery(subquery) => known_output_columns(&subquery.this),
1435        Expression::Table(table) if table.schema.is_none() && table.catalog.is_none() => scope
1436            .cte_sources
1437            .get(&table.name.name)
1438            .and_then(|source| known_output_columns(&source.expression)),
1439        Expression::Paren(paren) => pre_pivot_output_columns(&paren.this, scope),
1440        _ => None,
1441    }
1442}
1443
1444fn known_output_columns(expression: &Expression) -> Option<Vec<String>> {
1445    let expression = match expression {
1446        Expression::Cte(cte) => &cte.this,
1447        Expression::Subquery(subquery) => &subquery.this,
1448        other => other,
1449    };
1450    let columns = crate::ast_transforms::get_output_column_names(expression);
1451    if columns.is_empty() || columns.iter().any(|column| column == "*") {
1452        None
1453    } else {
1454        Some(columns)
1455    }
1456}
1457
1458fn pivot_output_columns(
1459    pivot: &crate::expressions::Pivot,
1460    pre_pivot_columns: &[String],
1461    dialect: Option<DialectType>,
1462) -> Vec<(String, String)> {
1463    let generated_outputs = pivot_generated_output_columns(pivot, dialect);
1464    let excluded = pivot_excluded_source_columns(pivot, dialect);
1465
1466    if excluded.is_empty() || generated_outputs.is_empty() {
1467        return Vec::new();
1468    }
1469
1470    let mut pre_rename: Vec<String> = pre_pivot_columns
1471        .iter()
1472        .filter(|column| !excluded.contains(&normalize_column_name(column, dialect)))
1473        .cloned()
1474        .collect();
1475    pre_rename.extend(generated_outputs);
1476
1477    let post_rename = if pivot.alias_columns.is_empty() {
1478        pre_rename.clone()
1479    } else {
1480        let mut names: Vec<String> = pivot
1481            .alias_columns
1482            .iter()
1483            .map(|column| column.name.clone())
1484            .collect();
1485        names.extend(pre_rename.iter().skip(names.len()).cloned());
1486        names
1487    };
1488
1489    post_rename.into_iter().zip(pre_rename).collect()
1490}
1491
1492fn pivot_excluded_source_columns(
1493    pivot: &crate::expressions::Pivot,
1494    dialect: Option<DialectType>,
1495) -> HashSet<String> {
1496    pivot
1497        .fields
1498        .iter()
1499        .chain(pivot.expressions.iter())
1500        .chain(pivot.using.iter())
1501        .flat_map(|expr| find_column_refs_in_expr(expr, dialect))
1502        .map(|column| normalize_column_name(&column.column, dialect))
1503        .collect()
1504}
1505
1506fn pivot_generated_output_columns(
1507    pivot: &crate::expressions::Pivot,
1508    _dialect: Option<DialectType>,
1509) -> Vec<String> {
1510    let fields = pivot_field_output_names(pivot);
1511    if fields.is_empty() {
1512        return Vec::new();
1513    }
1514
1515    let aggregations = pivot_aggregation_expressions(pivot);
1516    if aggregations.is_empty() {
1517        return Vec::new();
1518    }
1519
1520    let needs_suffix = aggregations.len() > 1;
1521    let mut outputs = Vec::new();
1522    for field in fields {
1523        for aggregation in aggregations {
1524            if let Some(suffix) = pivot_aggregation_output_suffix(aggregation, needs_suffix) {
1525                outputs.push(format!("{field}_{suffix}"));
1526            } else {
1527                outputs.push(field.clone());
1528            }
1529        }
1530    }
1531    outputs
1532}
1533
1534fn pivot_aggregation_expressions(pivot: &crate::expressions::Pivot) -> &[Expression] {
1535    if pivot.using.is_empty() {
1536        &pivot.expressions
1537    } else {
1538        &pivot.using
1539    }
1540}
1541
1542fn pivot_aggregation_output_suffix(expr: &Expression, needs_suffix: bool) -> Option<String> {
1543    match expr {
1544        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1545        _ if needs_suffix => Generator::sql(expr).ok().map(|sql| sql.to_lowercase()),
1546        _ => None,
1547    }
1548}
1549
1550fn pivot_field_output_names(pivot: &crate::expressions::Pivot) -> Vec<String> {
1551    let mut names = Vec::new();
1552    for field in &pivot.fields {
1553        if let Expression::In(in_expr) = field {
1554            for expr in &in_expr.expressions {
1555                if let Some(name) = pivot_expr_output_name(expr) {
1556                    names.push(name);
1557                }
1558            }
1559        }
1560    }
1561    names
1562}
1563
1564fn pivot_expr_output_name(expr: &Expression) -> Option<String> {
1565    match expr {
1566        Expression::PivotAlias(alias) => pivot_expr_output_name(&alias.alias),
1567        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1568        Expression::Identifier(identifier) => Some(identifier.name.clone()),
1569        Expression::Column(column) => Some(column.name.name.clone()),
1570        Expression::Literal(literal) => Some(literal.value_str().to_string()),
1571        Expression::Var(var) => Some(var.this.clone()),
1572        Expression::Tuple(tuple) => tuple.expressions.first().and_then(pivot_expr_output_name),
1573        _ => None,
1574    }
1575}
1576
1577fn pivot_implicit_source_column(pivot: &crate::expressions::Pivot, col_name: &str) -> bool {
1578    let pivot_columns: HashSet<String> = pivot
1579        .fields
1580        .iter()
1581        .filter_map(|field| match field {
1582            Expression::In(in_expr) => Some(&in_expr.this),
1583            _ => None,
1584        })
1585        .flat_map(|expr| find_column_refs_in_expr(expr, None))
1586        .map(|col| col.column.to_lowercase())
1587        .collect();
1588    let aggregation_columns: HashSet<String> = pivot
1589        .expressions
1590        .iter()
1591        .flat_map(|expr| find_column_refs_in_expr(expr, None))
1592        .map(|col| col.column.to_lowercase())
1593        .collect();
1594
1595    let normalized = col_name.to_lowercase();
1596    !pivot_columns.contains(&normalized) && !aggregation_columns.contains(&normalized)
1597}
1598
1599fn unpivot_column_mapping(
1600    unpivot: &crate::expressions::Unpivot,
1601    dialect: Option<DialectType>,
1602) -> HashMap<String, Vec<SimpleColumnRef>> {
1603    let value_columns: Vec<String> = std::iter::once(unpivot.value_column.name.clone())
1604        .chain(
1605            unpivot
1606                .extra_value_columns
1607                .iter()
1608                .map(|column| column.name.clone()),
1609        )
1610        .collect();
1611    let mut all_input_columns = Vec::new();
1612    let mut value_input_columns: Vec<Vec<SimpleColumnRef>> = vec![Vec::new(); value_columns.len()];
1613
1614    for entry in &unpivot.columns {
1615        let columns = unpivot_entry_columns(entry);
1616        all_input_columns.extend(columns.clone());
1617        if columns.len() == value_columns.len() {
1618            for (idx, col_ref) in columns.into_iter().enumerate() {
1619                value_input_columns[idx].push(col_ref);
1620            }
1621        } else {
1622            for inputs in &mut value_input_columns {
1623                inputs.extend(columns.clone());
1624            }
1625        }
1626    }
1627
1628    let mut mapping = HashMap::new();
1629    mapping.insert(
1630        normalize_column_name(&unpivot.name_column.name, dialect),
1631        all_input_columns.clone(),
1632    );
1633    for (idx, value_column) in value_columns.iter().enumerate() {
1634        mapping.insert(
1635            normalize_column_name(value_column, dialect),
1636            value_input_columns.get(idx).cloned().unwrap_or_default(),
1637        );
1638    }
1639    mapping
1640}
1641
1642fn unpivot_entry_columns(expr: &Expression) -> Vec<SimpleColumnRef> {
1643    match expr {
1644        Expression::PivotAlias(alias) => unpivot_entry_columns(&alias.this),
1645        Expression::Tuple(tuple) => tuple
1646            .expressions
1647            .iter()
1648            .flat_map(unpivot_entry_columns)
1649            .collect(),
1650        Expression::Column(column) => vec![SimpleColumnRef {
1651            table: column.table.clone(),
1652            column: column.name.name.clone(),
1653        }],
1654        Expression::Identifier(identifier) => vec![SimpleColumnRef {
1655            table: None,
1656            column: identifier.name.clone(),
1657        }],
1658        _ => find_column_refs_in_expr(expr, None),
1659    }
1660}
1661
1662fn attach_pivot_input_column(
1663    node: &mut LineageNode,
1664    scope: &Scope,
1665    dialect: Option<DialectType>,
1666    source_expr: &Expression,
1667    col_ref: &SimpleColumnRef,
1668    trim_selects: bool,
1669    all_cte_scopes: &[&Scope],
1670    depth: usize,
1671) {
1672    match source_expr {
1673        Expression::Table(table) => {
1674            let table_name = col_ref
1675                .table
1676                .as_ref()
1677                .map(|identifier| identifier.name.as_str())
1678                .unwrap_or(table.name.name.as_str());
1679            if scope.cte_sources.contains_key(table_name) {
1680                resolve_qualified_column(
1681                    node,
1682                    scope,
1683                    dialect,
1684                    table_name,
1685                    &col_ref.column,
1686                    &node.name.clone(),
1687                    trim_selects,
1688                    all_cte_scopes,
1689                    depth + 1,
1690                );
1691            } else {
1692                let mut source = ScopeSourceInfo::new(
1693                    Expression::Table(Box::new(table.as_ref().clone())),
1694                    false,
1695                    SourceKind::Table,
1696                );
1697                if let Some(alias) = &table.alias {
1698                    source = source.with_alias(alias.name.clone());
1699                }
1700                let source_key = table
1701                    .alias
1702                    .as_ref()
1703                    .map(|alias| alias.name.as_str())
1704                    .unwrap_or(table.name.name.as_str());
1705                node.downstream.push(make_table_column_node_from_source(
1706                    source_key,
1707                    &col_ref.column,
1708                    &source,
1709                ));
1710            }
1711        }
1712        Expression::Subquery(subquery) => {
1713            let source_scope = build_scope(&subquery.this);
1714            let child = if let Some(table) = &col_ref.table {
1715                let mut child_node = LineageNode::new(
1716                    &col_ref.column,
1717                    subquery.this.clone(),
1718                    subquery.this.clone(),
1719                );
1720                resolve_qualified_column(
1721                    &mut child_node,
1722                    &source_scope,
1723                    dialect,
1724                    &table.name,
1725                    &col_ref.column,
1726                    &node.name.clone(),
1727                    trim_selects,
1728                    all_cte_scopes,
1729                    depth + 1,
1730                );
1731                Ok(child_node)
1732            } else {
1733                to_node_inner(
1734                    ColumnRef::Name(&col_ref.column),
1735                    &source_scope,
1736                    dialect,
1737                    "",
1738                    "",
1739                    "",
1740                    trim_selects,
1741                    &all_cte_scopes
1742                        .iter()
1743                        .map(|scope| (*scope).clone())
1744                        .collect::<Vec<_>>(),
1745                    depth + 1,
1746                )
1747            };
1748            if let Ok(child) = child {
1749                node.downstream.push(child);
1750            }
1751        }
1752        Expression::Paren(paren) => attach_pivot_input_column(
1753            node,
1754            scope,
1755            dialect,
1756            &paren.this,
1757            col_ref,
1758            trim_selects,
1759            all_cte_scopes,
1760            depth,
1761        ),
1762        _ => {
1763            if let Some(table) = &col_ref.table {
1764                resolve_qualified_column(
1765                    node,
1766                    scope,
1767                    dialect,
1768                    &table.name,
1769                    &col_ref.column,
1770                    &node.name.clone(),
1771                    trim_selects,
1772                    all_cte_scopes,
1773                    depth + 1,
1774                );
1775            } else {
1776                node.downstream
1777                    .push(make_table_column_node("_", &col_ref.column));
1778            }
1779        }
1780    }
1781}
1782
1783/// Resolve a FROM alias to the original CTE name.
1784///
1785/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
1786/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
1787/// This function checks if `name` is such an alias and returns the CTE name.
1788fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1789    // If it's already a known CTE name, no resolution needed
1790    if scope.cte_sources.contains_key(name) {
1791        return None;
1792    }
1793    // Check if the source's expression is a CTE — if so, extract the CTE name
1794    if let Some(source_info) = scope.sources.get(name) {
1795        if source_info.is_scope {
1796            if let Expression::Cte(cte) = &source_info.expression {
1797                let cte_name = &cte.alias.name;
1798                if scope.cte_sources.contains_key(cte_name) {
1799                    return Some(cte_name.clone());
1800                }
1801            }
1802        }
1803    }
1804    None
1805}
1806
1807fn resolve_unqualified_column(
1808    node: &mut LineageNode,
1809    scope: &Scope,
1810    dialect: Option<DialectType>,
1811    col_name: &str,
1812    parent_name: &str,
1813    trim_selects: bool,
1814    all_cte_scopes: &[&Scope],
1815    depth: usize,
1816) {
1817    // Try to find which source this column belongs to.
1818    // Build the source list from the actual FROM/JOIN clauses to avoid
1819    // mixing in CTE definitions that are in scope but not referenced.
1820    let from_source_names = source_names_from_from_join(scope);
1821
1822    if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1823        resolve_qualified_column(
1824            node,
1825            scope,
1826            dialect,
1827            &tbl,
1828            col_name,
1829            parent_name,
1830            trim_selects,
1831            all_cte_scopes,
1832            depth,
1833        );
1834        return;
1835    }
1836
1837    if from_source_names.len() == 1 {
1838        let tbl = &from_source_names[0];
1839        resolve_qualified_column(
1840            node,
1841            scope,
1842            dialect,
1843            tbl,
1844            col_name,
1845            parent_name,
1846            trim_selects,
1847            all_cte_scopes,
1848            depth,
1849        );
1850        return;
1851    }
1852
1853    // Multiple sources — can't resolve without schema info, add unqualified node
1854    let child = LineageNode::new(
1855        col_name.to_string(),
1856        Expression::Column(Box::new(crate::expressions::Column {
1857            name: crate::expressions::Identifier::new(col_name.to_string()),
1858            table: None,
1859            join_mark: false,
1860            trailing_comments: vec![],
1861            span: None,
1862            inferred_type: None,
1863        })),
1864        node.source.clone(),
1865    );
1866    node.downstream.push(child);
1867}
1868
1869fn unique_virtual_source_for_column(
1870    scope: &Scope,
1871    source_names: &[String],
1872    col_name: &str,
1873) -> Option<String> {
1874    let mut matches = source_names.iter().filter_map(|source_name| {
1875        let source = scope.sources.get(source_name)?;
1876        if source.kind == SourceKind::Virtual
1877            && virtual_source_output_columns(source)
1878                .any(|column| column.eq_ignore_ascii_case(col_name))
1879        {
1880            Some(source_name.clone())
1881        } else {
1882            None
1883        }
1884    });
1885
1886    let first = matches.next()?;
1887    if matches.next().is_none() {
1888        Some(first)
1889    } else {
1890        None
1891    }
1892}
1893
1894fn virtual_source_output_columns(
1895    source_info: &ScopeSourceInfo,
1896) -> Box<dyn Iterator<Item = String> + '_> {
1897    match &source_info.expression {
1898        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1899        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1900            Box::new(alias_output_columns(alias))
1901        }
1902        Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1903        Expression::LateralView(lateral_view) => {
1904            Box::new(lateral_view_output_columns(lateral_view))
1905        }
1906        _ => Box::new(source_info.alias.clone().into_iter()),
1907    }
1908}
1909
1910fn unnest_output_columns(
1911    unnest: &crate::expressions::UnnestFunc,
1912) -> impl Iterator<Item = String> + '_ {
1913    unnest
1914        .alias
1915        .iter()
1916        .map(|alias| alias.name.clone())
1917        .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1918}
1919
1920fn alias_output_columns(
1921    alias: &crate::expressions::Alias,
1922) -> Box<dyn Iterator<Item = String> + '_> {
1923    if alias.column_aliases.is_empty() {
1924        Box::new(std::iter::once(alias.alias.name.clone()))
1925    } else {
1926        Box::new(
1927            alias
1928                .column_aliases
1929                .iter()
1930                .map(|column| column.name.clone()),
1931        )
1932    }
1933}
1934
1935fn lateral_output_columns(
1936    lateral: &crate::expressions::Lateral,
1937) -> Box<dyn Iterator<Item = String> + '_> {
1938    if lateral.column_aliases.is_empty() {
1939        default_virtual_output_columns(&lateral.this)
1940    } else {
1941        Box::new(lateral.column_aliases.iter().cloned())
1942    }
1943}
1944
1945fn lateral_view_output_columns(
1946    lateral_view: &crate::expressions::LateralView,
1947) -> Box<dyn Iterator<Item = String> + '_> {
1948    Box::new(
1949        lateral_view
1950            .column_aliases
1951            .iter()
1952            .map(|column| column.name.clone()),
1953    )
1954}
1955
1956fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1957    match expr {
1958        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1959        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1960            alias_output_columns(alias)
1961        }
1962        Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1963            Box::new(
1964                ["seq", "key", "path", "index", "value", "this"]
1965                    .into_iter()
1966                    .map(String::from),
1967            )
1968        }
1969        _ => Box::new(std::iter::empty()),
1970    }
1971}
1972
1973fn attach_virtual_source_dependencies(
1974    node: &mut LineageNode,
1975    scope: &Scope,
1976    dialect: Option<DialectType>,
1977    source_alias: &str,
1978    source_expr: &Expression,
1979    trim_selects: bool,
1980    all_cte_scopes: &[&Scope],
1981    depth: usize,
1982) {
1983    let parent_name = node.name.clone();
1984    let mut seen = HashSet::new();
1985    for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1986        let key = (
1987            col_ref.table.as_ref().map(|t| t.name.clone()),
1988            col_ref.column.clone(),
1989        );
1990        if !seen.insert(key) {
1991            continue;
1992        }
1993
1994        if let Some(table_id) = col_ref.table {
1995            let table = table_id.name;
1996            if table == source_alias {
1997                continue;
1998            }
1999            resolve_qualified_column(
2000                node,
2001                scope,
2002                dialect,
2003                &table,
2004                &col_ref.column,
2005                &parent_name,
2006                trim_selects,
2007                all_cte_scopes,
2008                depth + 1,
2009            );
2010        } else {
2011            let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
2012            if non_virtual_sources.len() == 1 {
2013                resolve_qualified_column(
2014                    node,
2015                    scope,
2016                    dialect,
2017                    &non_virtual_sources[0],
2018                    &col_ref.column,
2019                    &parent_name,
2020                    trim_selects,
2021                    all_cte_scopes,
2022                    depth + 1,
2023                );
2024            }
2025        }
2026    }
2027}
2028
2029fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
2030    fn source_name(expr: &Expression) -> Option<String> {
2031        match expr {
2032            Expression::Table(table) => Some(
2033                table
2034                    .alias
2035                    .as_ref()
2036                    .map(|a| a.name.clone())
2037                    .unwrap_or_else(|| table.name.name.clone()),
2038            ),
2039            Expression::Subquery(subquery) => {
2040                subquery.alias.as_ref().map(|alias| alias.name.clone())
2041            }
2042            Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
2043            Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
2044                Some(alias.alias.name.clone())
2045            }
2046            Expression::Alias(alias) if is_query_like_relation(&alias.this) => {
2047                Some(alias.alias.name.clone())
2048            }
2049            Expression::Lateral(lateral) => lateral.alias.clone(),
2050            Expression::LateralView(lateral_view) => lateral_view
2051                .table_alias
2052                .as_ref()
2053                .or_else(|| lateral_view.column_aliases.first())
2054                .map(|alias| alias.name.clone()),
2055            Expression::Pivot(pivot) => Some(pivot_lineage_source_name(
2056                &pivot.this,
2057                pivot.alias.as_ref().map(|alias| alias.name.as_str()),
2058            )),
2059            Expression::Unpivot(unpivot) => Some(pivot_lineage_source_name(
2060                &unpivot.this,
2061                unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
2062            )),
2063            Expression::Paren(paren) => source_name(&paren.this),
2064            _ => None,
2065        }
2066    }
2067
2068    let effective_expr = match &scope.expression {
2069        Expression::Cte(cte) => &cte.this,
2070        expr => expr,
2071    };
2072
2073    let mut names = Vec::new();
2074    let mut seen = std::collections::HashSet::new();
2075
2076    if let Expression::Select(select) = effective_expr {
2077        if let Some(from) = &select.from {
2078            for expr in &from.expressions {
2079                if let Some(name) = source_name(expr) {
2080                    if !name.is_empty() && seen.insert(name.clone()) {
2081                        names.push(name);
2082                    }
2083                }
2084            }
2085        }
2086        for join in &select.joins {
2087            if is_semi_or_anti_join_kind(join.kind) {
2088                continue;
2089            }
2090            if let Some(name) = source_name(&join.this) {
2091                if !name.is_empty() && seen.insert(name.clone()) {
2092                    names.push(name);
2093                }
2094            }
2095        }
2096        for lateral_view in &select.lateral_views {
2097            if let Some(name) =
2098                source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
2099            {
2100                if !name.is_empty() && seen.insert(name.clone()) {
2101                    names.push(name);
2102                }
2103            }
2104        }
2105    }
2106
2107    names
2108}
2109
2110fn is_semi_or_anti_join_kind(kind: JoinKind) -> bool {
2111    matches!(
2112        kind,
2113        JoinKind::Semi
2114            | JoinKind::Anti
2115            | JoinKind::LeftSemi
2116            | JoinKind::LeftAnti
2117            | JoinKind::RightSemi
2118            | JoinKind::RightAnti
2119    )
2120}
2121
2122fn is_query_like_relation(expr: &Expression) -> bool {
2123    match expr {
2124        Expression::Select(_)
2125        | Expression::Subquery(_)
2126        | Expression::Union(_)
2127        | Expression::Intersect(_)
2128        | Expression::Except(_) => true,
2129        Expression::Paren(paren) => is_query_like_relation(&paren.this),
2130        _ => false,
2131    }
2132}
2133
2134fn derived_source_query(expr: &Expression) -> Option<&Expression> {
2135    match expr {
2136        Expression::Subquery(subquery) => Some(&subquery.this),
2137        Expression::Alias(alias) if is_query_like_relation(&alias.this) => Some(&alias.this),
2138        Expression::Select(_)
2139        | Expression::Union(_)
2140        | Expression::Intersect(_)
2141        | Expression::Except(_) => Some(expr),
2142        Expression::Paren(paren) => derived_source_query(&paren.this),
2143        _ => None,
2144    }
2145}
2146
2147fn expressions_equivalent_after_wrappers(left: &Expression, right: &Expression) -> bool {
2148    left == right || effective_scope_expression(left) == effective_scope_expression(right)
2149}
2150
2151fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
2152    source_names_from_from_join(scope)
2153        .into_iter()
2154        .filter(|name| {
2155            !matches!(
2156                scope.sources.get(name).map(|source| source.kind),
2157                Some(SourceKind::Virtual)
2158            )
2159        })
2160        .collect()
2161}
2162
2163// ---------------------------------------------------------------------------
2164// Helper functions
2165// ---------------------------------------------------------------------------
2166
2167/// Get the alias or name of an expression
2168fn get_alias_or_name(expr: &Expression) -> Option<String> {
2169    match expr {
2170        Expression::Alias(alias) => Some(alias.alias.name.clone()),
2171        Expression::Column(col) => Some(col.name.name.clone()),
2172        Expression::Identifier(id) => Some(id.name.clone()),
2173        Expression::Star(_) => Some("*".to_string()),
2174        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
2175        // Unwrap to get the actual column/alias name from the inner expression.
2176        Expression::Annotated(a) => get_alias_or_name(&a.this),
2177        _ => None,
2178    }
2179}
2180
2181fn find_prior_select_alias_expr(
2182    scope_expr: &Expression,
2183    target_expr: &Expression,
2184    alias_name: &str,
2185    dialect: Option<DialectType>,
2186) -> Option<Expression> {
2187    let Expression::Select(select) = scope_expr else {
2188        return None;
2189    };
2190
2191    let normalized_alias = normalize_column_name(alias_name, dialect);
2192    for expr in &select.expressions {
2193        if expr == target_expr {
2194            return None;
2195        }
2196
2197        if let Expression::Alias(alias) = expr {
2198            if normalize_column_name(&alias.alias.name, dialect) == normalized_alias {
2199                return Some(alias.this.clone());
2200            }
2201        }
2202    }
2203
2204    None
2205}
2206
2207/// Resolve the display name for a column reference.
2208fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
2209    match column {
2210        ColumnRef::Name(n) => n.to_string(),
2211        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
2212    }
2213}
2214
2215/// Find the select expression matching a column reference.
2216fn find_select_expr(
2217    scope_expr: &Expression,
2218    column: &ColumnRef<'_>,
2219    dialect: Option<DialectType>,
2220) -> Result<Expression> {
2221    if let Expression::Select(ref select) = scope_expr {
2222        match column {
2223            ColumnRef::Name(name) => {
2224                let normalized_name = normalize_column_name(name, dialect);
2225                for expr in &select.expressions {
2226                    if let Some(alias_or_name) = get_alias_or_name(expr) {
2227                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
2228                            return Ok(expr.clone());
2229                        }
2230                    }
2231                }
2232                if let Some(expr) = synthesize_star_passthrough_expr(select, name) {
2233                    return Ok(expr);
2234                }
2235                Err(crate::error::Error::parse(
2236                    format!("Cannot find column '{}' in query", name),
2237                    0,
2238                    0,
2239                    0,
2240                    0,
2241                ))
2242            }
2243            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
2244                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
2245            }),
2246        }
2247    } else {
2248        Err(crate::error::Error::parse(
2249            "Expected SELECT expression for column lookup",
2250            0,
2251            0,
2252            0,
2253            0,
2254        ))
2255    }
2256}
2257
2258fn synthesize_star_passthrough_expr(select: &Select, name: &str) -> Option<Expression> {
2259    let sources = get_select_sources(select);
2260    if sources.is_empty() {
2261        return None;
2262    }
2263
2264    let mut candidate_aliases = Vec::new();
2265    let mut seen = HashSet::new();
2266
2267    for expr in &select.expressions {
2268        let aliases = match star_passthrough_source_aliases(expr, &sources) {
2269            StarPassthroughSources::None => continue,
2270            StarPassthroughSources::Ambiguous => return None,
2271            StarPassthroughSources::Aliases(aliases) => aliases,
2272        };
2273
2274        for alias in aliases {
2275            if seen.insert(alias.clone()) {
2276                candidate_aliases.push(alias);
2277            }
2278        }
2279    }
2280
2281    match candidate_aliases.as_slice() {
2282        [alias] => {
2283            let table = Identifier::new(alias.clone());
2284            Some(make_column_expr(name, Some(&table)))
2285        }
2286        _ => None,
2287    }
2288}
2289
2290enum StarPassthroughSources {
2291    None,
2292    Ambiguous,
2293    Aliases(Vec<String>),
2294}
2295
2296fn star_passthrough_source_aliases(
2297    expr: &Expression,
2298    sources: &[SourceInfo],
2299) -> StarPassthroughSources {
2300    match expr {
2301        Expression::Star(star) => star_source_aliases(star.table.as_ref(), sources),
2302        Expression::Column(column) if column.name.name == "*" => {
2303            star_source_aliases(column.table.as_ref(), sources)
2304        }
2305        Expression::Annotated(annotated) => {
2306            star_passthrough_source_aliases(&annotated.this, sources)
2307        }
2308        _ => StarPassthroughSources::None,
2309    }
2310}
2311
2312fn star_source_aliases(
2313    qualifier: Option<&Identifier>,
2314    sources: &[SourceInfo],
2315) -> StarPassthroughSources {
2316    if let Some(qualifier) = qualifier {
2317        let mut aliases = Vec::new();
2318
2319        for source in sources {
2320            if source_matches_star_qualifier(source, qualifier) {
2321                aliases.push(source.alias.clone());
2322            }
2323        }
2324
2325        return match aliases.len() {
2326            0 => StarPassthroughSources::None,
2327            1 => StarPassthroughSources::Aliases(aliases),
2328            _ => StarPassthroughSources::Ambiguous,
2329        };
2330    }
2331
2332    match sources {
2333        // Do not synthesize a source column for unresolved quoted table stars.
2334        // This keeps quoted CTE/table case semantics intact while still allowing
2335        // the schema-less fallback for common unquoted SELECT * passthroughs.
2336        [source] if source.quoted => StarPassthroughSources::None,
2337        [source] => StarPassthroughSources::Aliases(vec![source.alias.clone()]),
2338        [] => StarPassthroughSources::None,
2339        _ => StarPassthroughSources::Ambiguous,
2340    }
2341}
2342
2343fn source_matches_star_qualifier(source: &SourceInfo, qualifier: &Identifier) -> bool {
2344    if source.normalized == normalize_cte_name(qualifier) {
2345        return true;
2346    }
2347
2348    if qualifier.quoted {
2349        source.alias == qualifier.name
2350    } else {
2351        source.alias.eq_ignore_ascii_case(&qualifier.name)
2352    }
2353}
2354
2355/// Find the positional index of a column name in a set operation's first SELECT branch.
2356fn column_to_index(
2357    set_op_expr: &Expression,
2358    name: &str,
2359    dialect: Option<DialectType>,
2360) -> Result<usize> {
2361    let normalized_name = normalize_column_name(name, dialect);
2362    let mut expr = set_op_expr;
2363    loop {
2364        match expr {
2365            Expression::Union(u) => expr = &u.left,
2366            Expression::Intersect(i) => expr = &i.left,
2367            Expression::Except(e) => expr = &e.left,
2368            Expression::Subquery(subquery) => expr = &subquery.this,
2369            Expression::Cte(cte) => expr = &cte.this,
2370            Expression::Paren(paren) => expr = &paren.this,
2371            Expression::Select(select) => {
2372                for (i, e) in select.expressions.iter().enumerate() {
2373                    if let Some(alias_or_name) = get_alias_or_name(e) {
2374                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
2375                            return Ok(i);
2376                        }
2377                    }
2378                }
2379                return Err(crate::error::Error::parse(
2380                    format!("Cannot find column '{}' in set operation", name),
2381                    0,
2382                    0,
2383                    0,
2384                    0,
2385                ));
2386            }
2387            _ => {
2388                return Err(crate::error::Error::parse(
2389                    "Expected SELECT or set operation",
2390                    0,
2391                    0,
2392                    0,
2393                    0,
2394                ))
2395            }
2396        }
2397    }
2398}
2399
2400fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
2401    normalize_name(name, dialect, false, true)
2402}
2403
2404/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
2405fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
2406    if let Expression::Select(select) = select_expr {
2407        let mut trimmed = select.as_ref().clone();
2408        trimmed.expressions = vec![target_expr.clone()];
2409        Expression::Select(Box::new(trimmed))
2410    } else {
2411        select_expr.clone()
2412    }
2413}
2414
2415/// Find the child scope (CTE or derived table) for a given source name.
2416fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
2417    // Check CTE scopes
2418    if scope.cte_sources.contains_key(source_name) {
2419        for cte_scope in &scope.cte_scopes {
2420            if let Expression::Cte(cte) = &cte_scope.expression {
2421                if cte.alias.name == source_name {
2422                    return Some(cte_scope);
2423                }
2424            }
2425        }
2426    }
2427
2428    // Check derived table scopes
2429    if let Some(source_info) = scope.sources.get(source_name) {
2430        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
2431            if let Some(query) = derived_source_query(&source_info.expression) {
2432                for dt_scope in &scope.derived_table_scopes {
2433                    if expressions_equivalent_after_wrappers(&dt_scope.expression, query) {
2434                        return Some(dt_scope);
2435                    }
2436                }
2437            }
2438        }
2439    }
2440
2441    None
2442}
2443
2444/// Find a CTE scope by name, searching through a combined list of CTE scopes.
2445/// This handles nested CTEs where the current scope doesn't have the CTE scope
2446/// as a direct child but knows about it via cte_sources.
2447fn find_child_scope_in<'a>(
2448    all_cte_scopes: &[&'a Scope],
2449    scope: &'a Scope,
2450    source_name: &str,
2451) -> Option<&'a Scope> {
2452    // First try the scope's own cte_scopes
2453    for cte_scope in &scope.cte_scopes {
2454        if let Expression::Cte(cte) = &cte_scope.expression {
2455            if cte.alias.name == source_name {
2456                return Some(cte_scope);
2457            }
2458        }
2459    }
2460
2461    // Then search through all ancestor CTE scopes
2462    for cte_scope in all_cte_scopes {
2463        if let Expression::Cte(cte) = &cte_scope.expression {
2464            if cte.alias.name == source_name {
2465                return Some(cte_scope);
2466            }
2467        }
2468    }
2469
2470    // Fall back to derived table scopes
2471    if let Some(source_info) = scope.sources.get(source_name) {
2472        if source_info.is_scope {
2473            if let Some(query) = derived_source_query(&source_info.expression) {
2474                for dt_scope in &scope.derived_table_scopes {
2475                    if expressions_equivalent_after_wrappers(&dt_scope.expression, query) {
2476                        return Some(dt_scope);
2477                    }
2478                }
2479            }
2480        }
2481    }
2482
2483    None
2484}
2485
2486/// Create a terminal lineage node for a table.column reference.
2487fn make_table_column_node(table: &str, column: &str) -> LineageNode {
2488    let mut node = LineageNode::new(
2489        format!("{}.{}", table, column),
2490        Expression::Column(Box::new(crate::expressions::Column {
2491            name: crate::expressions::Identifier::new(column.to_string()),
2492            table: Some(crate::expressions::Identifier::new(table.to_string())),
2493            join_mark: false,
2494            trailing_comments: vec![],
2495            span: None,
2496            inferred_type: None,
2497        })),
2498        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
2499    );
2500    node.source_name = table.to_string();
2501    node.source_kind = SourceKind::Table;
2502    node
2503}
2504
2505fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
2506    let mut parts: Vec<String> = Vec::new();
2507    if let Some(catalog) = &table_ref.catalog {
2508        parts.push(catalog.name.clone());
2509    }
2510    if let Some(schema) = &table_ref.schema {
2511        parts.push(schema.name.clone());
2512    }
2513    parts.push(table_ref.name.name.clone());
2514    parts.join(".")
2515}
2516
2517fn apply_source_info_context(
2518    node: &mut LineageNode,
2519    source_key: &str,
2520    source_info: &ScopeSourceInfo,
2521) {
2522    node.source_kind = source_info.kind;
2523    node.source_name =
2524        source_info
2525            .lineage_name
2526            .clone()
2527            .unwrap_or_else(|| match &source_info.expression {
2528                Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
2529                _ => source_key.to_string(),
2530            });
2531    node.source_alias = source_info.alias.clone();
2532}
2533
2534fn make_table_column_node_from_source(
2535    source_key: &str,
2536    column: &str,
2537    source_info: &ScopeSourceInfo,
2538) -> LineageNode {
2539    let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
2540    let mut node = LineageNode::new(
2541        format!("{}.{}", lineage_name, column),
2542        Expression::Column(Box::new(crate::expressions::Column {
2543            name: crate::expressions::Identifier::new(column.to_string()),
2544            table: Some(crate::expressions::Identifier::new(
2545                lineage_name.to_string(),
2546            )),
2547            join_mark: false,
2548            trailing_comments: vec![],
2549            span: None,
2550            inferred_type: None,
2551        })),
2552        source_info.expression.clone(),
2553    );
2554
2555    apply_source_info_context(&mut node, source_key, source_info);
2556
2557    node
2558}
2559
2560/// Simple column reference extracted from an expression
2561#[derive(Debug, Clone)]
2562struct SimpleColumnRef {
2563    table: Option<crate::expressions::Identifier>,
2564    column: String,
2565}
2566
2567/// Find all column references in an expression (does not recurse into subqueries).
2568fn find_column_refs_in_expr(
2569    expr: &Expression,
2570    dialect: Option<DialectType>,
2571) -> Vec<SimpleColumnRef> {
2572    let mut refs = Vec::new();
2573    collect_column_refs(expr, dialect, &mut refs, None);
2574    refs
2575}
2576
2577fn find_column_refs_in_expr_with_select(
2578    expr: &Expression,
2579    select_expr: &Expression,
2580    dialect: Option<DialectType>,
2581) -> Vec<SimpleColumnRef> {
2582    let named_windows = match select_expr {
2583        Expression::Select(select) => select.windows.as_deref(),
2584        _ => None,
2585    };
2586    let mut refs = Vec::new();
2587    collect_column_refs(expr, dialect, &mut refs, named_windows);
2588    refs
2589}
2590
2591fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
2592    match expr {
2593        Expression::Column(col) => {
2594            col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
2595        }
2596        Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
2597        _ => false,
2598    }
2599}
2600
2601fn collect_column_refs(
2602    expr: &Expression,
2603    dialect: Option<DialectType>,
2604    refs: &mut Vec<SimpleColumnRef>,
2605    named_windows: Option<&[NamedWindow]>,
2606) {
2607    let mut stack: Vec<&Expression> = vec![expr];
2608
2609    while let Some(current) = stack.pop() {
2610        match current {
2611            // === Leaf: collect Column references ===
2612            Expression::Column(col) => {
2613                refs.push(SimpleColumnRef {
2614                    table: col.table.clone(),
2615                    column: col.name.name.clone(),
2616                });
2617            }
2618
2619            // === Boundary: don't recurse into subqueries (handled separately) ===
2620            Expression::Subquery(_) | Expression::Exists(_) => {}
2621
2622            // === BinaryOp variants: left, right ===
2623            Expression::And(op)
2624            | Expression::Or(op)
2625            | Expression::Eq(op)
2626            | Expression::Neq(op)
2627            | Expression::Lt(op)
2628            | Expression::Lte(op)
2629            | Expression::Gt(op)
2630            | Expression::Gte(op)
2631            | Expression::Add(op)
2632            | Expression::Sub(op)
2633            | Expression::Mul(op)
2634            | Expression::Div(op)
2635            | Expression::Mod(op)
2636            | Expression::BitwiseAnd(op)
2637            | Expression::BitwiseOr(op)
2638            | Expression::BitwiseXor(op)
2639            | Expression::BitwiseLeftShift(op)
2640            | Expression::BitwiseRightShift(op)
2641            | Expression::Concat(op)
2642            | Expression::Adjacent(op)
2643            | Expression::TsMatch(op)
2644            | Expression::PropertyEQ(op)
2645            | Expression::ArrayContainsAll(op)
2646            | Expression::ArrayContainedBy(op)
2647            | Expression::ArrayOverlaps(op)
2648            | Expression::JSONBContainsAllTopKeys(op)
2649            | Expression::JSONBContainsAnyTopKeys(op)
2650            | Expression::JSONBDeleteAtPath(op)
2651            | Expression::ExtendsLeft(op)
2652            | Expression::ExtendsRight(op)
2653            | Expression::Is(op)
2654            | Expression::MemberOf(op)
2655            | Expression::NullSafeEq(op)
2656            | Expression::NullSafeNeq(op)
2657            | Expression::Glob(op)
2658            | Expression::Match(op) => {
2659                stack.push(&op.left);
2660                stack.push(&op.right);
2661            }
2662
2663            // === UnaryOp variants: this ===
2664            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
2665                stack.push(&u.this);
2666            }
2667
2668            // === UnaryFunc variants: this ===
2669            Expression::Upper(f)
2670            | Expression::Lower(f)
2671            | Expression::Length(f)
2672            | Expression::LTrim(f)
2673            | Expression::RTrim(f)
2674            | Expression::Reverse(f)
2675            | Expression::Abs(f)
2676            | Expression::Sqrt(f)
2677            | Expression::Cbrt(f)
2678            | Expression::Ln(f)
2679            | Expression::Exp(f)
2680            | Expression::Sign(f)
2681            | Expression::Date(f)
2682            | Expression::Time(f)
2683            | Expression::DateFromUnixDate(f)
2684            | Expression::UnixDate(f)
2685            | Expression::UnixSeconds(f)
2686            | Expression::UnixMillis(f)
2687            | Expression::UnixMicros(f)
2688            | Expression::TimeStrToDate(f)
2689            | Expression::DateToDi(f)
2690            | Expression::DiToDate(f)
2691            | Expression::TsOrDiToDi(f)
2692            | Expression::TsOrDsToDatetime(f)
2693            | Expression::TsOrDsToTimestamp(f)
2694            | Expression::YearOfWeek(f)
2695            | Expression::YearOfWeekIso(f)
2696            | Expression::Initcap(f)
2697            | Expression::Ascii(f)
2698            | Expression::Chr(f)
2699            | Expression::Soundex(f)
2700            | Expression::ByteLength(f)
2701            | Expression::Hex(f)
2702            | Expression::LowerHex(f)
2703            | Expression::Unicode(f)
2704            | Expression::Radians(f)
2705            | Expression::Degrees(f)
2706            | Expression::Sin(f)
2707            | Expression::Cos(f)
2708            | Expression::Tan(f)
2709            | Expression::Asin(f)
2710            | Expression::Acos(f)
2711            | Expression::Atan(f)
2712            | Expression::IsNan(f)
2713            | Expression::IsInf(f)
2714            | Expression::ArrayLength(f)
2715            | Expression::ArraySize(f)
2716            | Expression::Cardinality(f)
2717            | Expression::ArrayReverse(f)
2718            | Expression::ArrayDistinct(f)
2719            | Expression::ArrayFlatten(f)
2720            | Expression::ArrayCompact(f)
2721            | Expression::Explode(f)
2722            | Expression::ExplodeOuter(f)
2723            | Expression::ToArray(f)
2724            | Expression::MapFromEntries(f)
2725            | Expression::MapKeys(f)
2726            | Expression::MapValues(f)
2727            | Expression::JsonArrayLength(f)
2728            | Expression::JsonKeys(f)
2729            | Expression::JsonType(f)
2730            | Expression::ParseJson(f)
2731            | Expression::ToJson(f)
2732            | Expression::Typeof(f)
2733            | Expression::BitwiseCount(f)
2734            | Expression::Year(f)
2735            | Expression::Month(f)
2736            | Expression::Day(f)
2737            | Expression::Hour(f)
2738            | Expression::Minute(f)
2739            | Expression::Second(f)
2740            | Expression::DayOfWeek(f)
2741            | Expression::DayOfWeekIso(f)
2742            | Expression::DayOfMonth(f)
2743            | Expression::DayOfYear(f)
2744            | Expression::WeekOfYear(f)
2745            | Expression::Quarter(f)
2746            | Expression::Epoch(f)
2747            | Expression::EpochMs(f)
2748            | Expression::TimeStrToUnix(f)
2749            | Expression::SHA(f)
2750            | Expression::SHA1Digest(f)
2751            | Expression::TimeToUnix(f)
2752            | Expression::JSONBool(f)
2753            | Expression::Int64(f)
2754            | Expression::MD5NumberLower64(f)
2755            | Expression::MD5NumberUpper64(f)
2756            | Expression::DateStrToDate(f)
2757            | Expression::DateToDateStr(f) => {
2758                stack.push(&f.this);
2759            }
2760
2761            // === BinaryFunc variants: this, expression ===
2762            Expression::Power(f)
2763            | Expression::NullIf(f)
2764            | Expression::IfNull(f)
2765            | Expression::Nvl(f)
2766            | Expression::UnixToTimeStr(f)
2767            | Expression::Contains(f)
2768            | Expression::StartsWith(f)
2769            | Expression::EndsWith(f)
2770            | Expression::Levenshtein(f)
2771            | Expression::ModFunc(f)
2772            | Expression::Atan2(f)
2773            | Expression::IntDiv(f)
2774            | Expression::AddMonths(f)
2775            | Expression::MonthsBetween(f)
2776            | Expression::NextDay(f)
2777            | Expression::ArrayContains(f)
2778            | Expression::ArrayPosition(f)
2779            | Expression::ArrayAppend(f)
2780            | Expression::ArrayPrepend(f)
2781            | Expression::ArrayUnion(f)
2782            | Expression::ArrayExcept(f)
2783            | Expression::ArrayRemove(f)
2784            | Expression::StarMap(f)
2785            | Expression::MapFromArrays(f)
2786            | Expression::MapContainsKey(f)
2787            | Expression::ElementAt(f)
2788            | Expression::JsonMergePatch(f)
2789            | Expression::JSONBContains(f)
2790            | Expression::JSONBExtract(f) => {
2791                stack.push(&f.this);
2792                stack.push(&f.expression);
2793            }
2794
2795            // === VarArgFunc variants: expressions ===
2796            Expression::Greatest(f)
2797            | Expression::Least(f)
2798            | Expression::Coalesce(f)
2799            | Expression::ArrayConcat(f)
2800            | Expression::ArrayIntersect(f)
2801            | Expression::ArrayZip(f)
2802            | Expression::MapConcat(f)
2803            | Expression::JsonArray(f) => {
2804                for e in &f.expressions {
2805                    stack.push(e);
2806                }
2807            }
2808
2809            // === AggFunc variants: this, filter, having_max, limit ===
2810            Expression::Sum(f)
2811            | Expression::Avg(f)
2812            | Expression::Min(f)
2813            | Expression::Max(f)
2814            | Expression::ArrayAgg(f)
2815            | Expression::CountIf(f)
2816            | Expression::Stddev(f)
2817            | Expression::StddevPop(f)
2818            | Expression::StddevSamp(f)
2819            | Expression::Variance(f)
2820            | Expression::VarPop(f)
2821            | Expression::VarSamp(f)
2822            | Expression::Median(f)
2823            | Expression::Mode(f)
2824            | Expression::First(f)
2825            | Expression::Last(f)
2826            | Expression::AnyValue(f)
2827            | Expression::ApproxDistinct(f)
2828            | Expression::ApproxCountDistinct(f)
2829            | Expression::LogicalAnd(f)
2830            | Expression::LogicalOr(f)
2831            | Expression::Skewness(f)
2832            | Expression::ArrayConcatAgg(f)
2833            | Expression::ArrayUniqueAgg(f)
2834            | Expression::BoolXorAgg(f)
2835            | Expression::BitwiseAndAgg(f)
2836            | Expression::BitwiseOrAgg(f)
2837            | Expression::BitwiseXorAgg(f) => {
2838                stack.push(&f.this);
2839                if let Some(ref filter) = f.filter {
2840                    stack.push(filter);
2841                }
2842                if let Some((ref expr, _)) = f.having_max {
2843                    stack.push(expr);
2844                }
2845                if let Some(ref limit) = f.limit {
2846                    stack.push(limit);
2847                }
2848            }
2849
2850            // === Generic Function / AggregateFunction: args ===
2851            Expression::Function(func) => {
2852                for arg in &func.args {
2853                    stack.push(arg);
2854                }
2855            }
2856            Expression::AggregateFunction(func) => {
2857                for arg in &func.args {
2858                    stack.push(arg);
2859                }
2860                if let Some(ref filter) = func.filter {
2861                    stack.push(filter);
2862                }
2863                if let Some(ref limit) = func.limit {
2864                    stack.push(limit);
2865                }
2866            }
2867
2868            // === WindowFunction: this (skip Over for lineage purposes) ===
2869            Expression::WindowFunction(wf) => {
2870                stack.push(&wf.this);
2871                for e in &wf.over.partition_by {
2872                    stack.push(e);
2873                }
2874                for e in &wf.over.order_by {
2875                    stack.push(&e.this);
2876                }
2877                if let Some(keep) = &wf.keep {
2878                    for e in &keep.order_by {
2879                        stack.push(&e.this);
2880                    }
2881                }
2882                if let (Some(window_name), Some(named_windows)) =
2883                    (&wf.over.window_name, named_windows)
2884                {
2885                    for named_window in named_windows {
2886                        if named_window
2887                            .name
2888                            .name
2889                            .eq_ignore_ascii_case(&window_name.name)
2890                        {
2891                            for e in &named_window.spec.partition_by {
2892                                stack.push(e);
2893                            }
2894                            for e in &named_window.spec.order_by {
2895                                stack.push(&e.this);
2896                            }
2897                        }
2898                    }
2899                }
2900            }
2901
2902            // === Containers and special expressions ===
2903            Expression::Alias(a) => {
2904                stack.push(&a.this);
2905            }
2906            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
2907                stack.push(&c.this);
2908                if let Some(ref fmt) = c.format {
2909                    stack.push(fmt);
2910                }
2911                if let Some(ref def) = c.default {
2912                    stack.push(def);
2913                }
2914            }
2915            Expression::Paren(p) => {
2916                stack.push(&p.this);
2917            }
2918            Expression::Annotated(a) => {
2919                stack.push(&a.this);
2920            }
2921            Expression::Case(case) => {
2922                if let Some(ref operand) = case.operand {
2923                    stack.push(operand);
2924                }
2925                for (cond, result) in &case.whens {
2926                    stack.push(cond);
2927                    stack.push(result);
2928                }
2929                if let Some(ref else_expr) = case.else_ {
2930                    stack.push(else_expr);
2931                }
2932            }
2933            Expression::Collation(c) => {
2934                stack.push(&c.this);
2935            }
2936            Expression::In(i) => {
2937                stack.push(&i.this);
2938                for e in &i.expressions {
2939                    stack.push(e);
2940                }
2941                if let Some(ref q) = i.query {
2942                    stack.push(q);
2943                }
2944                if let Some(ref u) = i.unnest {
2945                    stack.push(u);
2946                }
2947            }
2948            Expression::Between(b) => {
2949                stack.push(&b.this);
2950                stack.push(&b.low);
2951                stack.push(&b.high);
2952            }
2953            Expression::IsNull(n) => {
2954                stack.push(&n.this);
2955            }
2956            Expression::IsTrue(t) | Expression::IsFalse(t) => {
2957                stack.push(&t.this);
2958            }
2959            Expression::IsJson(j) => {
2960                stack.push(&j.this);
2961            }
2962            Expression::Like(l) | Expression::ILike(l) => {
2963                stack.push(&l.left);
2964                stack.push(&l.right);
2965                if let Some(ref esc) = l.escape {
2966                    stack.push(esc);
2967                }
2968            }
2969            Expression::SimilarTo(s) => {
2970                stack.push(&s.this);
2971                stack.push(&s.pattern);
2972                if let Some(ref esc) = s.escape {
2973                    stack.push(esc);
2974                }
2975            }
2976            Expression::Ordered(o) => {
2977                stack.push(&o.this);
2978            }
2979            Expression::Array(a) => {
2980                for e in &a.expressions {
2981                    stack.push(e);
2982                }
2983            }
2984            Expression::Tuple(t) => {
2985                for e in &t.expressions {
2986                    stack.push(e);
2987                }
2988            }
2989            Expression::Struct(s) => {
2990                for (_, e) in &s.fields {
2991                    stack.push(e);
2992                }
2993            }
2994            Expression::Subscript(s) => {
2995                stack.push(&s.this);
2996                stack.push(&s.index);
2997            }
2998            Expression::Dot(d) => {
2999                stack.push(&d.this);
3000            }
3001            Expression::MethodCall(m) => {
3002                if !matches!(dialect, Some(DialectType::BigQuery))
3003                    || !is_bigquery_safe_namespace_receiver(&m.this)
3004                {
3005                    stack.push(&m.this);
3006                }
3007                for arg in &m.args {
3008                    stack.push(arg);
3009                }
3010            }
3011            Expression::ArraySlice(s) => {
3012                stack.push(&s.this);
3013                if let Some(ref start) = s.start {
3014                    stack.push(start);
3015                }
3016                if let Some(ref end) = s.end {
3017                    stack.push(end);
3018                }
3019            }
3020            Expression::Lambda(l) => {
3021                stack.push(&l.body);
3022            }
3023            Expression::NamedArgument(n) => {
3024                stack.push(&n.value);
3025            }
3026            Expression::Lateral(l) => {
3027                stack.push(&l.this);
3028                if let Some(ref view) = l.view {
3029                    stack.push(view);
3030                }
3031                if let Some(ref outer) = l.outer {
3032                    stack.push(outer);
3033                }
3034                if let Some(ref ordinality) = l.ordinality {
3035                    stack.push(ordinality);
3036                }
3037            }
3038            Expression::LateralView(lv) => {
3039                stack.push(&lv.this);
3040            }
3041            Expression::TryCatch(t) => {
3042                for stmt in &t.try_body {
3043                    stack.push(stmt);
3044                }
3045                if let Some(catch_body) = &t.catch_body {
3046                    for stmt in catch_body {
3047                        stack.push(stmt);
3048                    }
3049                }
3050            }
3051            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
3052                stack.push(e);
3053            }
3054
3055            // === Custom function structs ===
3056            Expression::Substring(f) => {
3057                stack.push(&f.this);
3058                stack.push(&f.start);
3059                if let Some(ref len) = f.length {
3060                    stack.push(len);
3061                }
3062            }
3063            Expression::Trim(f) => {
3064                stack.push(&f.this);
3065                if let Some(ref chars) = f.characters {
3066                    stack.push(chars);
3067                }
3068            }
3069            Expression::Replace(f) => {
3070                stack.push(&f.this);
3071                stack.push(&f.old);
3072                stack.push(&f.new);
3073            }
3074            Expression::IfFunc(f) => {
3075                stack.push(&f.condition);
3076                stack.push(&f.true_value);
3077                if let Some(ref fv) = f.false_value {
3078                    stack.push(fv);
3079                }
3080            }
3081            Expression::Nvl2(f) => {
3082                stack.push(&f.this);
3083                stack.push(&f.true_value);
3084                stack.push(&f.false_value);
3085            }
3086            Expression::ConcatWs(f) => {
3087                stack.push(&f.separator);
3088                for e in &f.expressions {
3089                    stack.push(e);
3090                }
3091            }
3092            Expression::Count(f) => {
3093                if let Some(ref this) = f.this {
3094                    stack.push(this);
3095                }
3096                if let Some(ref filter) = f.filter {
3097                    stack.push(filter);
3098                }
3099            }
3100            Expression::GroupConcat(f) => {
3101                stack.push(&f.this);
3102                if let Some(ref sep) = f.separator {
3103                    stack.push(sep);
3104                }
3105                if let Some(ref filter) = f.filter {
3106                    stack.push(filter);
3107                }
3108            }
3109            Expression::StringAgg(f) => {
3110                stack.push(&f.this);
3111                if let Some(ref sep) = f.separator {
3112                    stack.push(sep);
3113                }
3114                if let Some(ref filter) = f.filter {
3115                    stack.push(filter);
3116                }
3117                if let Some(ref limit) = f.limit {
3118                    stack.push(limit);
3119                }
3120            }
3121            Expression::ListAgg(f) => {
3122                stack.push(&f.this);
3123                if let Some(ref sep) = f.separator {
3124                    stack.push(sep);
3125                }
3126                if let Some(ref filter) = f.filter {
3127                    stack.push(filter);
3128                }
3129            }
3130            Expression::SumIf(f) => {
3131                stack.push(&f.this);
3132                stack.push(&f.condition);
3133                if let Some(ref filter) = f.filter {
3134                    stack.push(filter);
3135                }
3136            }
3137            Expression::DateAdd(f) | Expression::DateSub(f) => {
3138                stack.push(&f.this);
3139                stack.push(&f.interval);
3140            }
3141            Expression::DateDiff(f) => {
3142                stack.push(&f.this);
3143                stack.push(&f.expression);
3144            }
3145            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
3146                stack.push(&f.this);
3147            }
3148            Expression::Extract(f) => {
3149                stack.push(&f.this);
3150            }
3151            Expression::Round(f) => {
3152                stack.push(&f.this);
3153                if let Some(ref d) = f.decimals {
3154                    stack.push(d);
3155                }
3156            }
3157            Expression::Floor(f) => {
3158                stack.push(&f.this);
3159                if let Some(ref s) = f.scale {
3160                    stack.push(s);
3161                }
3162                if let Some(ref t) = f.to {
3163                    stack.push(t);
3164                }
3165            }
3166            Expression::Ceil(f) => {
3167                stack.push(&f.this);
3168                if let Some(ref d) = f.decimals {
3169                    stack.push(d);
3170                }
3171                if let Some(ref t) = f.to {
3172                    stack.push(t);
3173                }
3174            }
3175            Expression::Log(f) => {
3176                stack.push(&f.this);
3177                if let Some(ref b) = f.base {
3178                    stack.push(b);
3179                }
3180            }
3181            Expression::AtTimeZone(f) => {
3182                stack.push(&f.this);
3183                stack.push(&f.zone);
3184            }
3185            Expression::Lead(f) | Expression::Lag(f) => {
3186                stack.push(&f.this);
3187                if let Some(ref off) = f.offset {
3188                    stack.push(off);
3189                }
3190                if let Some(ref def) = f.default {
3191                    stack.push(def);
3192                }
3193            }
3194            Expression::FirstValue(f) | Expression::LastValue(f) => {
3195                stack.push(&f.this);
3196            }
3197            Expression::NthValue(f) => {
3198                stack.push(&f.this);
3199                stack.push(&f.offset);
3200            }
3201            Expression::Position(f) => {
3202                stack.push(&f.substring);
3203                stack.push(&f.string);
3204                if let Some(ref start) = f.start {
3205                    stack.push(start);
3206                }
3207            }
3208            Expression::Decode(f) => {
3209                stack.push(&f.this);
3210                for (search, result) in &f.search_results {
3211                    stack.push(search);
3212                    stack.push(result);
3213                }
3214                if let Some(ref def) = f.default {
3215                    stack.push(def);
3216                }
3217            }
3218            Expression::CharFunc(f) => {
3219                for arg in &f.args {
3220                    stack.push(arg);
3221                }
3222            }
3223            Expression::ArraySort(f) => {
3224                stack.push(&f.this);
3225                if let Some(ref cmp) = f.comparator {
3226                    stack.push(cmp);
3227                }
3228            }
3229            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
3230                stack.push(&f.this);
3231                stack.push(&f.separator);
3232                if let Some(ref nr) = f.null_replacement {
3233                    stack.push(nr);
3234                }
3235            }
3236            Expression::ArrayFilter(f) => {
3237                stack.push(&f.this);
3238                stack.push(&f.filter);
3239            }
3240            Expression::ArrayTransform(f) => {
3241                stack.push(&f.this);
3242                stack.push(&f.transform);
3243            }
3244            Expression::Sequence(f)
3245            | Expression::Generate(f)
3246            | Expression::ExplodingGenerateSeries(f) => {
3247                stack.push(&f.start);
3248                stack.push(&f.stop);
3249                if let Some(ref step) = f.step {
3250                    stack.push(step);
3251                }
3252            }
3253            Expression::JsonExtract(f)
3254            | Expression::JsonExtractScalar(f)
3255            | Expression::JsonQuery(f)
3256            | Expression::JsonValue(f) => {
3257                stack.push(&f.this);
3258                stack.push(&f.path);
3259            }
3260            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
3261                stack.push(&f.this);
3262                for p in &f.paths {
3263                    stack.push(p);
3264                }
3265            }
3266            Expression::JsonObject(f) => {
3267                for (k, v) in &f.pairs {
3268                    stack.push(k);
3269                    stack.push(v);
3270                }
3271            }
3272            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
3273                stack.push(&f.this);
3274                for (path, val) in &f.path_values {
3275                    stack.push(path);
3276                    stack.push(val);
3277                }
3278            }
3279            Expression::Overlay(f) => {
3280                stack.push(&f.this);
3281                stack.push(&f.replacement);
3282                stack.push(&f.from);
3283                if let Some(ref len) = f.length {
3284                    stack.push(len);
3285                }
3286            }
3287            Expression::Convert(f) => {
3288                stack.push(&f.this);
3289                if let Some(ref style) = f.style {
3290                    stack.push(style);
3291                }
3292            }
3293            Expression::ApproxPercentile(f) => {
3294                stack.push(&f.this);
3295                stack.push(&f.percentile);
3296                if let Some(ref acc) = f.accuracy {
3297                    stack.push(acc);
3298                }
3299                if let Some(ref filter) = f.filter {
3300                    stack.push(filter);
3301                }
3302            }
3303            Expression::Percentile(f)
3304            | Expression::PercentileCont(f)
3305            | Expression::PercentileDisc(f) => {
3306                stack.push(&f.this);
3307                stack.push(&f.percentile);
3308                if let Some(ref filter) = f.filter {
3309                    stack.push(filter);
3310                }
3311            }
3312            Expression::WithinGroup(f) => {
3313                stack.push(&f.this);
3314                for e in &f.order_by {
3315                    stack.push(&e.this);
3316                }
3317            }
3318            Expression::Left(f) | Expression::Right(f) => {
3319                stack.push(&f.this);
3320                stack.push(&f.length);
3321            }
3322            Expression::Repeat(f) => {
3323                stack.push(&f.this);
3324                stack.push(&f.times);
3325            }
3326            Expression::Lpad(f) | Expression::Rpad(f) => {
3327                stack.push(&f.this);
3328                stack.push(&f.length);
3329                if let Some(ref fill) = f.fill {
3330                    stack.push(fill);
3331                }
3332            }
3333            Expression::Split(f) => {
3334                stack.push(&f.this);
3335                stack.push(&f.delimiter);
3336            }
3337            Expression::RegexpLike(f) => {
3338                stack.push(&f.this);
3339                stack.push(&f.pattern);
3340                if let Some(ref flags) = f.flags {
3341                    stack.push(flags);
3342                }
3343            }
3344            Expression::RegexpReplace(f) => {
3345                stack.push(&f.this);
3346                stack.push(&f.pattern);
3347                stack.push(&f.replacement);
3348                if let Some(ref flags) = f.flags {
3349                    stack.push(flags);
3350                }
3351            }
3352            Expression::RegexpExtract(f) => {
3353                stack.push(&f.this);
3354                stack.push(&f.pattern);
3355                if let Some(ref group) = f.group {
3356                    stack.push(group);
3357                }
3358            }
3359            Expression::ToDate(f) => {
3360                stack.push(&f.this);
3361                if let Some(ref fmt) = f.format {
3362                    stack.push(fmt);
3363                }
3364            }
3365            Expression::ToTimestamp(f) => {
3366                stack.push(&f.this);
3367                if let Some(ref fmt) = f.format {
3368                    stack.push(fmt);
3369                }
3370            }
3371            Expression::DateFormat(f) | Expression::FormatDate(f) => {
3372                stack.push(&f.this);
3373                stack.push(&f.format);
3374            }
3375            Expression::LastDay(f) => {
3376                stack.push(&f.this);
3377            }
3378            Expression::FromUnixtime(f) => {
3379                stack.push(&f.this);
3380                if let Some(ref fmt) = f.format {
3381                    stack.push(fmt);
3382                }
3383            }
3384            Expression::UnixTimestamp(f) => {
3385                if let Some(ref this) = f.this {
3386                    stack.push(this);
3387                }
3388                if let Some(ref fmt) = f.format {
3389                    stack.push(fmt);
3390                }
3391            }
3392            Expression::MakeDate(f) => {
3393                stack.push(&f.year);
3394                stack.push(&f.month);
3395                stack.push(&f.day);
3396            }
3397            Expression::MakeTimestamp(f) => {
3398                stack.push(&f.year);
3399                stack.push(&f.month);
3400                stack.push(&f.day);
3401                stack.push(&f.hour);
3402                stack.push(&f.minute);
3403                stack.push(&f.second);
3404                if let Some(ref tz) = f.timezone {
3405                    stack.push(tz);
3406                }
3407            }
3408            Expression::TruncFunc(f) => {
3409                stack.push(&f.this);
3410                if let Some(ref d) = f.decimals {
3411                    stack.push(d);
3412                }
3413            }
3414            Expression::ArrayFunc(f) => {
3415                for e in &f.expressions {
3416                    stack.push(e);
3417                }
3418            }
3419            Expression::Unnest(f) => {
3420                stack.push(&f.this);
3421                for e in &f.expressions {
3422                    stack.push(e);
3423                }
3424            }
3425            Expression::StructFunc(f) => {
3426                for (_, e) in &f.fields {
3427                    stack.push(e);
3428                }
3429            }
3430            Expression::StructExtract(f) => {
3431                stack.push(&f.this);
3432            }
3433            Expression::NamedStruct(f) => {
3434                for (k, v) in &f.pairs {
3435                    stack.push(k);
3436                    stack.push(v);
3437                }
3438            }
3439            Expression::MapFunc(f) => {
3440                for k in &f.keys {
3441                    stack.push(k);
3442                }
3443                for v in &f.values {
3444                    stack.push(v);
3445                }
3446            }
3447            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
3448                stack.push(&f.this);
3449                stack.push(&f.transform);
3450            }
3451            Expression::JsonArrayAgg(f) => {
3452                stack.push(&f.this);
3453                if let Some(ref filter) = f.filter {
3454                    stack.push(filter);
3455                }
3456            }
3457            Expression::JsonObjectAgg(f) => {
3458                stack.push(&f.key);
3459                stack.push(&f.value);
3460                if let Some(ref filter) = f.filter {
3461                    stack.push(filter);
3462                }
3463            }
3464            Expression::NTile(f) => {
3465                if let Some(ref n) = f.num_buckets {
3466                    stack.push(n);
3467                }
3468            }
3469            Expression::Rand(f) => {
3470                if let Some(ref s) = f.seed {
3471                    stack.push(s);
3472                }
3473                if let Some(ref lo) = f.lower {
3474                    stack.push(lo);
3475                }
3476                if let Some(ref hi) = f.upper {
3477                    stack.push(hi);
3478                }
3479            }
3480            Expression::Any(q) | Expression::All(q) => {
3481                stack.push(&q.this);
3482                stack.push(&q.subquery);
3483            }
3484            Expression::Overlaps(o) => {
3485                if let Some(ref this) = o.this {
3486                    stack.push(this);
3487                }
3488                if let Some(ref expr) = o.expression {
3489                    stack.push(expr);
3490                }
3491                if let Some(ref ls) = o.left_start {
3492                    stack.push(ls);
3493                }
3494                if let Some(ref le) = o.left_end {
3495                    stack.push(le);
3496                }
3497                if let Some(ref rs) = o.right_start {
3498                    stack.push(rs);
3499                }
3500                if let Some(ref re) = o.right_end {
3501                    stack.push(re);
3502                }
3503            }
3504            Expression::Interval(i) => {
3505                if let Some(ref this) = i.this {
3506                    stack.push(this);
3507                }
3508            }
3509            Expression::TimeStrToTime(f) => {
3510                stack.push(&f.this);
3511                if let Some(ref zone) = f.zone {
3512                    stack.push(zone);
3513                }
3514            }
3515            Expression::JSONBExtractScalar(f) => {
3516                stack.push(&f.this);
3517                stack.push(&f.expression);
3518                if let Some(ref jt) = f.json_type {
3519                    stack.push(jt);
3520                }
3521            }
3522            Expression::JSONExtract(f) => {
3523                stack.push(&f.this);
3524                stack.push(&f.expression);
3525                for e in &f.expressions {
3526                    stack.push(e);
3527                }
3528                if let Some(ref option) = f.option {
3529                    stack.push(option);
3530                }
3531                if let Some(ref on_condition) = f.on_condition {
3532                    stack.push(on_condition);
3533                }
3534            }
3535
3536            // === True leaves and non-expression-bearing nodes ===
3537            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
3538            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
3539            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
3540            _ => {}
3541        }
3542    }
3543}
3544
3545// ---------------------------------------------------------------------------
3546// Tests
3547// ---------------------------------------------------------------------------
3548
3549#[cfg(test)]
3550mod tests {
3551    use super::*;
3552    use crate::dialects::{Dialect, DialectType};
3553    use crate::expressions::DataType;
3554    use crate::optimizer::annotate_types::annotate_types;
3555    use crate::parse_one;
3556    use crate::schema::{MappingSchema, Schema};
3557
3558    fn parse(sql: &str) -> Expression {
3559        let dialect = Dialect::get(DialectType::Generic);
3560        let ast = dialect.parse(sql).unwrap();
3561        ast.into_iter().next().unwrap()
3562    }
3563
3564    fn parse_dialect(sql: &str, dialect_type: DialectType) -> Expression {
3565        let dialect = Dialect::get(dialect_type);
3566        let ast = dialect.parse(sql).unwrap();
3567        ast.into_iter().next().unwrap()
3568    }
3569
3570    fn lineage_names(node: &LineageNode) -> Vec<String> {
3571        node.walk().map(|n| n.name.clone()).collect()
3572    }
3573
3574    fn assert_lineage_contains(node: &LineageNode, expected: &str) {
3575        let names = lineage_names(node);
3576        assert!(
3577            names.iter().any(|name| name == expected),
3578            "expected {expected} in lineage, got {names:?}"
3579        );
3580    }
3581
3582    #[test]
3583    fn test_simple_lineage() {
3584        let expr = parse("SELECT a FROM t");
3585        let node = lineage("a", &expr, None, false).unwrap();
3586
3587        assert_eq!(node.name, "a");
3588        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
3589        // Should trace to t.a
3590        let names = node.downstream_names();
3591        assert!(
3592            names.iter().any(|n| n == "t.a"),
3593            "Expected t.a in downstream, got: {:?}",
3594            names
3595        );
3596    }
3597
3598    #[test]
3599    fn test_lineage_walk() {
3600        let root = LineageNode {
3601            name: "col_a".to_string(),
3602            expression: Expression::Null(crate::expressions::Null),
3603            source: Expression::Null(crate::expressions::Null),
3604            downstream: vec![LineageNode::new(
3605                "t.a",
3606                Expression::Null(crate::expressions::Null),
3607                Expression::Null(crate::expressions::Null),
3608            )],
3609            source_name: String::new(),
3610            source_kind: SourceKind::Unknown,
3611            source_alias: None,
3612            reference_node_name: String::new(),
3613        };
3614
3615        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
3616        assert_eq!(names.len(), 2);
3617        assert_eq!(names[0], "col_a");
3618        assert_eq!(names[1], "t.a");
3619    }
3620
3621    #[test]
3622    fn test_aliased_column() {
3623        let expr = parse("SELECT a + 1 AS b FROM t");
3624        let node = lineage("b", &expr, None, false).unwrap();
3625
3626        assert_eq!(node.name, "b");
3627        // Should trace through the expression to t.a
3628        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3629        assert!(
3630            all_names.iter().any(|n| n.contains("a")),
3631            "Expected to trace to column a, got: {:?}",
3632            all_names
3633        );
3634    }
3635
3636    #[test]
3637    fn test_qualified_column() {
3638        let expr = parse("SELECT t.a FROM t");
3639        let node = lineage("a", &expr, None, false).unwrap();
3640
3641        assert_eq!(node.name, "a");
3642        let names = node.downstream_names();
3643        assert!(
3644            names.iter().any(|n| n == "t.a"),
3645            "Expected t.a, got: {:?}",
3646            names
3647        );
3648    }
3649
3650    #[test]
3651    fn test_unqualified_column() {
3652        let expr = parse("SELECT a FROM t");
3653        let node = lineage("a", &expr, None, false).unwrap();
3654
3655        // Unqualified but single source → resolved to t.a
3656        let names = node.downstream_names();
3657        assert!(
3658            names.iter().any(|n| n == "t.a"),
3659            "Expected t.a, got: {:?}",
3660            names
3661        );
3662    }
3663
3664    #[test]
3665    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
3666        let query = "SELECT name FROM users";
3667        let dialect = Dialect::get(DialectType::BigQuery);
3668        let expr = dialect
3669            .parse(query)
3670            .unwrap()
3671            .into_iter()
3672            .next()
3673            .expect("expected one expression");
3674
3675        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3676        schema
3677            .add_table("users", &[("name".into(), DataType::Text)], None)
3678            .expect("schema setup");
3679
3680        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
3681            .expect("lineage without schema");
3682        let mut expr_without = node_without_schema.expression.clone();
3683        annotate_types(
3684            &mut expr_without,
3685            Some(&schema),
3686            Some(DialectType::BigQuery),
3687        );
3688        assert_eq!(
3689            expr_without.inferred_type(),
3690            None,
3691            "Expected unresolved root type without schema-aware lineage qualification"
3692        );
3693
3694        let node_with_schema = lineage_with_schema(
3695            "name",
3696            &expr,
3697            Some(&schema),
3698            Some(DialectType::BigQuery),
3699            false,
3700        )
3701        .expect("lineage with schema");
3702        let mut expr_with = node_with_schema.expression.clone();
3703        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
3704
3705        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
3706    }
3707
3708    #[test]
3709    fn test_lineage_with_schema_correlated_scalar_subquery() {
3710        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
3711        let dialect = Dialect::get(DialectType::BigQuery);
3712        let expr = dialect
3713            .parse(query)
3714            .unwrap()
3715            .into_iter()
3716            .next()
3717            .expect("expected one expression");
3718
3719        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3720        schema
3721            .add_table(
3722                "t1",
3723                &[("id".into(), DataType::BigInt { length: None })],
3724                None,
3725            )
3726            .expect("schema setup");
3727        schema
3728            .add_table(
3729                "t2",
3730                &[
3731                    ("id".into(), DataType::BigInt { length: None }),
3732                    ("val".into(), DataType::BigInt { length: None }),
3733                ],
3734                None,
3735            )
3736            .expect("schema setup");
3737
3738        let node = lineage_with_schema(
3739            "id",
3740            &expr,
3741            Some(&schema),
3742            Some(DialectType::BigQuery),
3743            false,
3744        )
3745        .expect("lineage_with_schema should handle correlated scalar subqueries");
3746
3747        assert_eq!(node.name, "id");
3748    }
3749
3750    #[test]
3751    fn test_lineage_with_schema_join_using() {
3752        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
3753        let dialect = Dialect::get(DialectType::BigQuery);
3754        let expr = dialect
3755            .parse(query)
3756            .unwrap()
3757            .into_iter()
3758            .next()
3759            .expect("expected one expression");
3760
3761        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3762        schema
3763            .add_table(
3764                "t1",
3765                &[("a".into(), DataType::BigInt { length: None })],
3766                None,
3767            )
3768            .expect("schema setup");
3769        schema
3770            .add_table(
3771                "t2",
3772                &[("a".into(), DataType::BigInt { length: None })],
3773                None,
3774            )
3775            .expect("schema setup");
3776
3777        let node = lineage_with_schema(
3778            "a",
3779            &expr,
3780            Some(&schema),
3781            Some(DialectType::BigQuery),
3782            false,
3783        )
3784        .expect("lineage_with_schema should handle JOIN USING");
3785
3786        assert_eq!(node.name, "a");
3787    }
3788
3789    #[test]
3790    fn test_lineage_with_schema_qualified_table_name() {
3791        let query = "SELECT a FROM raw.t1";
3792        let dialect = Dialect::get(DialectType::BigQuery);
3793        let expr = dialect
3794            .parse(query)
3795            .unwrap()
3796            .into_iter()
3797            .next()
3798            .expect("expected one expression");
3799
3800        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3801        schema
3802            .add_table(
3803                "raw.t1",
3804                &[("a".into(), DataType::BigInt { length: None })],
3805                None,
3806            )
3807            .expect("schema setup");
3808
3809        let node = lineage_with_schema(
3810            "a",
3811            &expr,
3812            Some(&schema),
3813            Some(DialectType::BigQuery),
3814            false,
3815        )
3816        .expect("lineage_with_schema should handle dotted schema.table names");
3817
3818        assert_eq!(node.name, "a");
3819    }
3820
3821    #[test]
3822    fn test_lineage_with_schema_none_matches_lineage() {
3823        let expr = parse("SELECT a FROM t");
3824        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
3825        let with_none =
3826            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
3827
3828        assert_eq!(with_none.name, baseline.name);
3829        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
3830    }
3831
3832    #[test]
3833    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
3834        let dialect = Dialect::get(DialectType::BigQuery);
3835        let expr = dialect
3836            .parse("SELECT Name AS name FROM teams")
3837            .unwrap()
3838            .into_iter()
3839            .next()
3840            .expect("expected one expression");
3841
3842        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3843        schema
3844            .add_table(
3845                "teams",
3846                &[("Name".into(), DataType::String { length: None })],
3847                None,
3848            )
3849            .expect("schema setup");
3850
3851        let node = lineage_with_schema(
3852            "name",
3853            &expr,
3854            Some(&schema),
3855            Some(DialectType::BigQuery),
3856            false,
3857        )
3858        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
3859
3860        let names = node.downstream_names();
3861        assert!(
3862            names.iter().any(|n| n == "teams.Name"),
3863            "Expected teams.Name in downstream, got: {:?}",
3864            names
3865        );
3866    }
3867
3868    #[test]
3869    fn test_lineage_bigquery_mixed_case_alias_lookup() {
3870        let dialect = Dialect::get(DialectType::BigQuery);
3871        let expr = dialect
3872            .parse("SELECT Name AS Name FROM teams")
3873            .unwrap()
3874            .into_iter()
3875            .next()
3876            .expect("expected one expression");
3877
3878        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
3879            .expect("lineage should resolve mixed-case aliases in BigQuery");
3880
3881        assert_eq!(node.name, "name");
3882    }
3883
3884    #[test]
3885    fn test_lineage_bigquery_unnest_alias_source_issue_209() {
3886        let expr = parse_one(
3887            r#"
3888SELECT date_val AS week_start
3889FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
3890"#,
3891            DialectType::BigQuery,
3892        )
3893        .expect("parse");
3894
3895        let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
3896            .expect("lineage should resolve UNNEST alias as a source");
3897        let child = node
3898            .downstream
3899            .first()
3900            .expect("week_start should have downstream lineage");
3901
3902        assert_eq!(child.name, "_0.date_val");
3903        assert_eq!(child.source_name, "_0");
3904        assert_eq!(child.source_kind, SourceKind::Virtual);
3905        assert_eq!(child.source_alias.as_deref(), Some("date_val"));
3906
3907        let Expression::Column(column) = &child.expression else {
3908            panic!(
3909                "expected downstream column expression, got {:?}",
3910                child.expression
3911            );
3912        };
3913        assert_eq!(column.name.name, "date_val");
3914        assert_eq!(
3915            column.table.as_ref().map(|table| table.name.as_str()),
3916            Some("_0")
3917        );
3918        assert!(
3919            matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
3920            "expected UNNEST source expression, got {:?}",
3921            child.source
3922        );
3923    }
3924
3925    #[test]
3926    fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
3927        let expr =
3928            parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
3929
3930        let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3931        let child = node.downstream.first().expect("id should have lineage");
3932
3933        assert_eq!(child.name, "date_val.id");
3934        assert_eq!(child.source_name, "date_val");
3935        assert_eq!(child.source_kind, SourceKind::Table);
3936        assert_eq!(child.source_alias, None);
3937    }
3938
3939    #[test]
3940    fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
3941        let expr = parse_one(
3942            r#"
3943SELECT a.a AS first_value, b.b AS second_value
3944FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
3945JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
3946"#,
3947            DialectType::BigQuery,
3948        )
3949        .expect("parse");
3950
3951        let first =
3952            lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3953        let second =
3954            lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3955
3956        let first_child = first.downstream.first().expect("first source");
3957        let second_child = second.downstream.first().expect("second source");
3958
3959        assert_eq!(first_child.name, "_0.a");
3960        assert_eq!(first_child.source_name, "_0");
3961        assert_eq!(first_child.source_alias.as_deref(), Some("a"));
3962        assert_eq!(first_child.source_kind, SourceKind::Virtual);
3963
3964        assert_eq!(second_child.name, "_1.b");
3965        assert_eq!(second_child.source_name, "_1");
3966        assert_eq!(second_child.source_alias.as_deref(), Some("b"));
3967        assert_eq!(second_child.source_kind, SourceKind::Virtual);
3968    }
3969
3970    #[test]
3971    fn test_lineage_table_backed_unnest_points_to_real_source_column() {
3972        let expr = parse_one(
3973            r#"
3974SELECT item.item AS item
3975FROM t JOIN UNNEST(t.items) AS item ON TRUE
3976"#,
3977            DialectType::BigQuery,
3978        )
3979        .expect("parse");
3980
3981        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
3982        let virtual_child = node.downstream.first().expect("virtual item source");
3983        assert_eq!(virtual_child.name, "_0.item");
3984        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
3985
3986        let real_child = virtual_child
3987            .downstream
3988            .first()
3989            .expect("UNNEST(t.items) should depend on t.items");
3990        assert_eq!(real_child.name, "t.items");
3991        assert_eq!(real_child.source_name, "t");
3992        assert_eq!(real_child.source_kind, SourceKind::Table);
3993    }
3994
3995    #[test]
3996    fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
3997        let expr = parse_one(
3998            r#"
3999SELECT item AS item
4000FROM t JOIN UNNEST(t.items) AS item ON TRUE
4001"#,
4002            DialectType::BigQuery,
4003        )
4004        .expect("parse");
4005
4006        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4007        let virtual_child = node.downstream.first().expect("virtual item source");
4008        assert_eq!(virtual_child.name, "_0.item");
4009        assert_eq!(virtual_child.source_name, "_0");
4010        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4011        assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
4012
4013        let real_child = virtual_child
4014            .downstream
4015            .first()
4016            .expect("UNNEST(t.items) should depend on t.items");
4017        assert_eq!(real_child.name, "t.items");
4018        assert_eq!(real_child.source_name, "t");
4019        assert_eq!(real_child.source_kind, SourceKind::Table);
4020    }
4021
4022    #[test]
4023    fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
4024        let cases = [
4025            (
4026                DialectType::PostgreSQL,
4027                "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
4028            ),
4029            (
4030                DialectType::Presto,
4031                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
4032            ),
4033            (
4034                DialectType::Trino,
4035                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
4036            ),
4037        ];
4038
4039        for (dialect, sql) in cases {
4040            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
4041            let node = lineage("out", &expr, Some(dialect), false)
4042                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
4043            let virtual_child = node
4044                .downstream
4045                .first()
4046                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
4047
4048            assert_eq!(
4049                virtual_child.name, "_0.x",
4050                "unexpected virtual child for {dialect:?}"
4051            );
4052            assert_eq!(virtual_child.source_name, "_0");
4053            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4054            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
4055
4056            let real_child = virtual_child
4057                .downstream
4058                .first()
4059                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
4060            assert_eq!(real_child.name, "t.items");
4061            assert_eq!(real_child.source_kind, SourceKind::Table);
4062        }
4063    }
4064
4065    #[test]
4066    fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
4067        let cases = [
4068            (
4069                DialectType::Spark,
4070                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
4071            ),
4072            (
4073                DialectType::Hive,
4074                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
4075            ),
4076        ];
4077
4078        for (dialect, sql) in cases {
4079            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
4080            let node = lineage("out", &expr, Some(dialect), false)
4081                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
4082            let virtual_child = node
4083                .downstream
4084                .first()
4085                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
4086
4087            assert_eq!(virtual_child.name, "_0.x");
4088            assert_eq!(virtual_child.source_name, "_0");
4089            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4090            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
4091
4092            let real_child = virtual_child
4093                .downstream
4094                .first()
4095                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
4096            assert_eq!(real_child.name, "t.items");
4097            assert_eq!(real_child.source_kind, SourceKind::Table);
4098        }
4099    }
4100
4101    #[test]
4102    fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
4103        let expr = parse_one(
4104            "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
4105            DialectType::Snowflake,
4106        )
4107        .expect("parse");
4108
4109        let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
4110        let virtual_child = node.downstream.first().expect("virtual flatten source");
4111        assert_eq!(virtual_child.name, "_0.value");
4112        assert_eq!(virtual_child.source_name, "_0");
4113        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4114        assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
4115
4116        let real_child = virtual_child
4117            .downstream
4118            .first()
4119            .expect("FLATTEN input should depend on raw_events.payload");
4120        assert_eq!(real_child.name, "raw_events.payload");
4121        assert_eq!(real_child.source_kind, SourceKind::Table);
4122    }
4123
4124    #[test]
4125    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
4126        let expr = parse_one(
4127            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
4128            DialectType::Snowflake,
4129        )
4130        .expect("parse");
4131
4132        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4133        schema
4134            .add_table(
4135                "fact.some_daily_metrics",
4136                &[("date_utc".to_string(), DataType::Date)],
4137                None,
4138            )
4139            .expect("schema setup");
4140
4141        let node = lineage_with_schema(
4142            "recency",
4143            &expr,
4144            Some(&schema),
4145            Some(DialectType::Snowflake),
4146            false,
4147        )
4148        .expect("lineage_with_schema should not treat date part as a column");
4149
4150        let names = node.downstream_names();
4151        assert!(
4152            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4153            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4154            names
4155        );
4156        assert!(
4157            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4158            "Did not expect date part to appear as lineage column, got: {:?}",
4159            names
4160        );
4161    }
4162
4163    #[test]
4164    fn test_snowflake_datediff_parses_to_typed_ast() {
4165        let expr = parse_one(
4166            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
4167            DialectType::Snowflake,
4168        )
4169        .expect("parse");
4170
4171        match expr {
4172            Expression::Select(select) => match &select.expressions[0] {
4173                Expression::Alias(alias) => match &alias.this {
4174                    Expression::DateDiff(f) => {
4175                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
4176                    }
4177                    other => panic!("expected DateDiff, got {other:?}"),
4178                },
4179                other => panic!("expected Alias, got {other:?}"),
4180            },
4181            other => panic!("expected Select, got {other:?}"),
4182        }
4183    }
4184
4185    #[test]
4186    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
4187        let expr = parse_one(
4188            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
4189            DialectType::Snowflake,
4190        )
4191        .expect("parse");
4192
4193        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4194        schema
4195            .add_table(
4196                "fact.some_daily_metrics",
4197                &[("date_utc".to_string(), DataType::Date)],
4198                None,
4199            )
4200            .expect("schema setup");
4201
4202        let node = lineage_with_schema(
4203            "next_day",
4204            &expr,
4205            Some(&schema),
4206            Some(DialectType::Snowflake),
4207            false,
4208        )
4209        .expect("lineage_with_schema should not treat DATEADD date part as a column");
4210
4211        let names = node.downstream_names();
4212        assert!(
4213            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4214            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4215            names
4216        );
4217        assert!(
4218            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4219            "Did not expect date part to appear as lineage column, got: {:?}",
4220            names
4221        );
4222    }
4223
4224    #[test]
4225    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
4226        let expr = parse_one(
4227            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
4228            DialectType::Snowflake,
4229        )
4230        .expect("parse");
4231
4232        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4233        schema
4234            .add_table(
4235                "fact.some_daily_metrics",
4236                &[("date_utc".to_string(), DataType::Date)],
4237                None,
4238            )
4239            .expect("schema setup");
4240
4241        let node = lineage_with_schema(
4242            "day_part",
4243            &expr,
4244            Some(&schema),
4245            Some(DialectType::Snowflake),
4246            false,
4247        )
4248        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
4249
4250        let names = node.downstream_names();
4251        assert!(
4252            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4253            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4254            names
4255        );
4256        assert!(
4257            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4258            "Did not expect date part to appear as lineage column, got: {:?}",
4259            names
4260        );
4261    }
4262
4263    #[test]
4264    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
4265        let expr = parse_one(
4266            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
4267            DialectType::Snowflake,
4268        )
4269        .expect("parse");
4270
4271        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4272        schema
4273            .add_table(
4274                "fact.some_daily_metrics",
4275                &[("date_utc".to_string(), DataType::Date)],
4276                None,
4277            )
4278            .expect("schema setup");
4279
4280        let node = lineage_with_schema(
4281            "day_part",
4282            &expr,
4283            Some(&schema),
4284            Some(DialectType::Snowflake),
4285            false,
4286        )
4287        .expect("quoted DATE_PART should continue to work");
4288
4289        let names = node.downstream_names();
4290        assert!(
4291            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4292            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4293            names
4294        );
4295    }
4296
4297    #[test]
4298    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
4299        let expr = parse_one(
4300            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
4301            DialectType::Snowflake,
4302        )
4303        .expect("parse");
4304
4305        match expr {
4306            Expression::Select(select) => match &select.expressions[0] {
4307                Expression::Alias(alias) => match &alias.this {
4308                    Expression::Function(f) => {
4309                        assert_eq!(f.name.to_uppercase(), "DATEADD");
4310                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4311                    }
4312                    other => panic!("expected generic DATEADD function, got {other:?}"),
4313                },
4314                other => panic!("expected Alias, got {other:?}"),
4315            },
4316            other => panic!("expected Select, got {other:?}"),
4317        }
4318    }
4319
4320    #[test]
4321    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
4322        let expr = parse_one(
4323            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
4324            DialectType::Snowflake,
4325        )
4326        .expect("parse");
4327
4328        match expr {
4329            Expression::Select(select) => match &select.expressions[0] {
4330                Expression::Alias(alias) => match &alias.this {
4331                    Expression::Function(f) => {
4332                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
4333                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4334                    }
4335                    other => panic!("expected generic DATE_PART function, got {other:?}"),
4336                },
4337                other => panic!("expected Alias, got {other:?}"),
4338            },
4339            other => panic!("expected Select, got {other:?}"),
4340        }
4341    }
4342
4343    #[test]
4344    fn test_snowflake_date_part_string_literal_stays_generic_function() {
4345        let expr = parse_one(
4346            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
4347            DialectType::Snowflake,
4348        )
4349        .expect("parse");
4350
4351        match expr {
4352            Expression::Select(select) => match &select.expressions[0] {
4353                Expression::Alias(alias) => match &alias.this {
4354                    Expression::Function(f) => {
4355                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
4356                    }
4357                    other => panic!("expected generic DATE_PART function, got {other:?}"),
4358                },
4359                other => panic!("expected Alias, got {other:?}"),
4360            },
4361            other => panic!("expected Select, got {other:?}"),
4362        }
4363    }
4364
4365    #[test]
4366    fn test_lineage_join() {
4367        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4368
4369        let node_a = lineage("a", &expr, None, false).unwrap();
4370        let names_a = node_a.downstream_names();
4371        assert!(
4372            names_a.iter().any(|n| n == "t.a"),
4373            "Expected t.a, got: {:?}",
4374            names_a
4375        );
4376
4377        let node_b = lineage("b", &expr, None, false).unwrap();
4378        let names_b = node_b.downstream_names();
4379        assert!(
4380            names_b.iter().any(|n| n == "s.b"),
4381            "Expected s.b, got: {:?}",
4382            names_b
4383        );
4384    }
4385
4386    #[test]
4387    fn test_lineage_alias_leaf_has_resolved_source_name() {
4388        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
4389        let node = lineage("col1", &expr, None, false).unwrap();
4390
4391        // Keep alias in the display lineage edge.
4392        let names = node.downstream_names();
4393        assert!(
4394            names.iter().any(|n| n == "t1.col1"),
4395            "Expected aliased column edge t1.col1, got: {:?}",
4396            names
4397        );
4398
4399        // Leaf should expose the resolved base table for consumers.
4400        let leaf = node
4401            .downstream
4402            .iter()
4403            .find(|n| n.name == "t1.col1")
4404            .expect("Expected t1.col1 leaf");
4405        assert_eq!(leaf.source_name, "table1");
4406        match &leaf.source {
4407            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
4408            _ => panic!("Expected leaf source to be a table expression"),
4409        }
4410    }
4411
4412    #[test]
4413    fn test_lineage_derived_table() {
4414        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
4415        let node = lineage("a", &expr, None, false).unwrap();
4416
4417        assert_eq!(node.name, "a");
4418        // Should trace through the derived table to t.a
4419        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4420        assert!(
4421            all_names.iter().any(|n| n == "t.a"),
4422            "Expected to trace through derived table to t.a, got: {:?}",
4423            all_names
4424        );
4425    }
4426
4427    #[test]
4428    fn test_lineage_cte() {
4429        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
4430        let node = lineage("a", &expr, None, false).unwrap();
4431
4432        assert_eq!(node.name, "a");
4433        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4434        assert!(
4435            all_names.iter().any(|n| n == "t.a"),
4436            "Expected to trace through CTE to t.a, got: {:?}",
4437            all_names
4438        );
4439    }
4440
4441    #[test]
4442    fn test_lineage_union() {
4443        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
4444        let node = lineage("a", &expr, None, false).unwrap();
4445
4446        assert_eq!(node.name, "a");
4447        // Should have 2 downstream branches
4448        assert_eq!(
4449            node.downstream.len(),
4450            2,
4451            "Expected 2 branches for UNION, got {}",
4452            node.downstream.len()
4453        );
4454    }
4455
4456    #[test]
4457    fn test_lineage_cte_union() {
4458        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
4459        let node = lineage("a", &expr, None, false).unwrap();
4460
4461        // Should trace through CTE into both UNION branches
4462        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4463        assert!(
4464            all_names.len() >= 3,
4465            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
4466            all_names
4467        );
4468    }
4469
4470    #[test]
4471    fn test_lineage_star() {
4472        let expr = parse("SELECT * FROM t");
4473        let node = lineage("*", &expr, None, false).unwrap();
4474
4475        assert_eq!(node.name, "*");
4476        // Should have downstream for table t
4477        assert!(
4478            !node.downstream.is_empty(),
4479            "Star should produce downstream nodes"
4480        );
4481    }
4482
4483    #[test]
4484    fn test_lineage_subquery_in_select() {
4485        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
4486        let node = lineage("x", &expr, None, false).unwrap();
4487
4488        assert_eq!(node.name, "x");
4489        // Should have traced into the scalar subquery
4490        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4491        assert!(
4492            all_names.len() >= 2,
4493            "Expected tracing into scalar subquery, got: {:?}",
4494            all_names
4495        );
4496    }
4497
4498    #[test]
4499    fn test_lineage_multiple_columns() {
4500        let expr = parse("SELECT a, b FROM t");
4501
4502        let node_a = lineage("a", &expr, None, false).unwrap();
4503        let node_b = lineage("b", &expr, None, false).unwrap();
4504
4505        assert_eq!(node_a.name, "a");
4506        assert_eq!(node_b.name, "b");
4507
4508        // Each should trace independently
4509        let names_a = node_a.downstream_names();
4510        let names_b = node_b.downstream_names();
4511        assert!(names_a.iter().any(|n| n == "t.a"));
4512        assert!(names_b.iter().any(|n| n == "t.b"));
4513    }
4514
4515    #[test]
4516    fn test_get_source_tables() {
4517        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4518        let node = lineage("a", &expr, None, false).unwrap();
4519
4520        let tables = get_source_tables(&node);
4521        assert!(
4522            tables.contains("t"),
4523            "Expected source table 't', got: {:?}",
4524            tables
4525        );
4526    }
4527
4528    #[test]
4529    fn test_lineage_column_not_found() {
4530        let expr = parse("SELECT a FROM t");
4531        let result = lineage("nonexistent", &expr, None, false);
4532        assert!(result.is_err());
4533    }
4534
4535    #[test]
4536    fn test_lineage_nested_cte() {
4537        let expr = parse(
4538            "WITH cte1 AS (SELECT a FROM t), \
4539             cte2 AS (SELECT a FROM cte1) \
4540             SELECT a FROM cte2",
4541        );
4542        let node = lineage("a", &expr, None, false).unwrap();
4543
4544        // Should trace through cte2 → cte1 → t
4545        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4546        assert!(
4547            all_names.len() >= 3,
4548            "Expected to trace through nested CTEs, got: {:?}",
4549            all_names
4550        );
4551    }
4552
4553    #[test]
4554    fn test_trim_selects_true() {
4555        let expr = parse("SELECT a, b, c FROM t");
4556        let node = lineage("a", &expr, None, true).unwrap();
4557
4558        // The source should be trimmed to only include 'a'
4559        if let Expression::Select(select) = &node.source {
4560            assert_eq!(
4561                select.expressions.len(),
4562                1,
4563                "Trimmed source should have 1 expression, got {}",
4564                select.expressions.len()
4565            );
4566        } else {
4567            panic!("Expected Select source");
4568        }
4569    }
4570
4571    #[test]
4572    fn test_trim_selects_false() {
4573        let expr = parse("SELECT a, b, c FROM t");
4574        let node = lineage("a", &expr, None, false).unwrap();
4575
4576        // The source should keep all columns
4577        if let Expression::Select(select) = &node.source {
4578            assert_eq!(
4579                select.expressions.len(),
4580                3,
4581                "Untrimmed source should have 3 expressions"
4582            );
4583        } else {
4584            panic!("Expected Select source");
4585        }
4586    }
4587
4588    #[test]
4589    fn test_lineage_expression_in_select() {
4590        let expr = parse("SELECT a + b AS c FROM t");
4591        let node = lineage("c", &expr, None, false).unwrap();
4592
4593        // Should trace to both a and b from t
4594        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4595        assert!(
4596            all_names.len() >= 3,
4597            "Expected to trace a + b to both columns, got: {:?}",
4598            all_names
4599        );
4600    }
4601
4602    #[test]
4603    fn test_set_operation_by_index() {
4604        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
4605
4606        // Trace column "a" which is at index 0
4607        let node = lineage("a", &expr, None, false).unwrap();
4608
4609        // UNION branches should be traced by index
4610        assert_eq!(node.downstream.len(), 2);
4611    }
4612
4613    // --- Tests for column lineage inside function calls (issue #18) ---
4614
4615    fn print_node(node: &LineageNode, indent: usize) {
4616        let pad = "  ".repeat(indent);
4617        println!(
4618            "{pad}name={:?} source_name={:?}",
4619            node.name, node.source_name
4620        );
4621        for child in &node.downstream {
4622            print_node(child, indent + 1);
4623        }
4624    }
4625
4626    #[test]
4627    fn test_issue18_repro() {
4628        // Exact scenario from the issue
4629        let query = "SELECT UPPER(name) as upper_name FROM users";
4630        println!("Query: {query}\n");
4631
4632        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
4633        let exprs = dialect.parse(query).unwrap();
4634        let expr = &exprs[0];
4635
4636        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
4637        println!("lineage(\"upper_name\"):");
4638        print_node(&node, 1);
4639
4640        let names = node.downstream_names();
4641        assert!(
4642            names.iter().any(|n| n == "users.name"),
4643            "Expected users.name in downstream, got: {:?}",
4644            names
4645        );
4646    }
4647
4648    #[test]
4649    fn test_lineage_bigquery_safe_namespace_issue207() {
4650        let query = r#"
4651WITH import_cte AS (
4652  SELECT timestamp, data, operation
4653  FROM `project`.`dataset`.`source_table`
4654),
4655transform_cte AS (
4656  SELECT
4657    timestamp,
4658    SAFE.PARSE_JSON(data) AS json_data
4659  FROM import_cte
4660)
4661SELECT json_data FROM transform_cte
4662"#;
4663        let expr = parse_one(query, DialectType::BigQuery).expect("parse");
4664        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4665            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4666        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4667
4668        assert!(
4669            names.iter().any(|name| name == "source_table.data"),
4670            "expected source_table.data in lineage, got {names:?}"
4671        );
4672        assert!(
4673            !names
4674                .iter()
4675                .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
4676            "did not expect SAFE namespace receiver in lineage, got {names:?}"
4677        );
4678    }
4679
4680    #[test]
4681    fn test_lineage_bigquery_safe_namespace_method_call_guard() {
4682        let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
4683        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4684            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4685        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4686
4687        assert!(
4688            names.iter().any(|name| name == "t.data"),
4689            "expected t.data in lineage, got {names:?}"
4690        );
4691        assert!(
4692            !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
4693            "did not expect SAFE namespace receiver in lineage, got {names:?}"
4694        );
4695    }
4696
4697    #[test]
4698    fn test_lineage_method_call_receiver_control() {
4699        let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
4700        let node = lineage("out", &expr, None, false).expect("lineage");
4701        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4702
4703        assert!(
4704            names.iter().any(|name| name == "t.obj"),
4705            "expected ordinary method receiver to remain in lineage, got {names:?}"
4706        );
4707        assert!(
4708            names.iter().any(|name| name == "t.arg"),
4709            "expected method argument in lineage, got {names:?}"
4710        );
4711    }
4712
4713    #[test]
4714    fn test_lineage_upper_function() {
4715        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
4716        let node = lineage("upper_name", &expr, None, false).unwrap();
4717
4718        let names = node.downstream_names();
4719        assert!(
4720            names.iter().any(|n| n == "users.name"),
4721            "Expected users.name in downstream, got: {:?}",
4722            names
4723        );
4724    }
4725
4726    #[test]
4727    fn test_lineage_round_function() {
4728        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
4729        let node = lineage("rounded", &expr, None, false).unwrap();
4730
4731        let names = node.downstream_names();
4732        assert!(
4733            names.iter().any(|n| n == "products.price"),
4734            "Expected products.price in downstream, got: {:?}",
4735            names
4736        );
4737    }
4738
4739    #[test]
4740    fn test_lineage_coalesce_function() {
4741        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
4742        let node = lineage("val", &expr, None, false).unwrap();
4743
4744        let names = node.downstream_names();
4745        assert!(
4746            names.iter().any(|n| n == "t.a"),
4747            "Expected t.a in downstream, got: {:?}",
4748            names
4749        );
4750        assert!(
4751            names.iter().any(|n| n == "t.b"),
4752            "Expected t.b in downstream, got: {:?}",
4753            names
4754        );
4755    }
4756
4757    #[test]
4758    fn test_lineage_count_function() {
4759        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
4760        let node = lineage("cnt", &expr, None, false).unwrap();
4761
4762        let names = node.downstream_names();
4763        assert!(
4764            names.iter().any(|n| n == "t.id"),
4765            "Expected t.id in downstream, got: {:?}",
4766            names
4767        );
4768    }
4769
4770    #[test]
4771    fn test_lineage_sum_function() {
4772        let expr = parse("SELECT SUM(amount) AS total FROM t");
4773        let node = lineage("total", &expr, None, false).unwrap();
4774
4775        let names = node.downstream_names();
4776        assert!(
4777            names.iter().any(|n| n == "t.amount"),
4778            "Expected t.amount in downstream, got: {:?}",
4779            names
4780        );
4781    }
4782
4783    #[test]
4784    fn test_lineage_case_with_nested_functions() {
4785        let expr =
4786            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
4787        let node = lineage("result", &expr, None, false).unwrap();
4788
4789        let names = node.downstream_names();
4790        assert!(
4791            names.iter().any(|n| n == "t.x"),
4792            "Expected t.x in downstream, got: {:?}",
4793            names
4794        );
4795        assert!(
4796            names.iter().any(|n| n == "t.name"),
4797            "Expected t.name in downstream, got: {:?}",
4798            names
4799        );
4800    }
4801
4802    #[test]
4803    fn test_lineage_substring_function() {
4804        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
4805        let node = lineage("short", &expr, None, false).unwrap();
4806
4807        let names = node.downstream_names();
4808        assert!(
4809            names.iter().any(|n| n == "t.name"),
4810            "Expected t.name in downstream, got: {:?}",
4811            names
4812        );
4813    }
4814
4815    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
4816
4817    #[test]
4818    fn test_lineage_cte_select_star() {
4819        // Ported from sqlglot: test_lineage_source_with_star
4820        // WITH y AS (SELECT * FROM x) SELECT a FROM y
4821        // After star expansion: SELECT y.a AS a FROM y
4822        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
4823        let node = lineage("a", &expr, None, false).unwrap();
4824
4825        assert_eq!(node.name, "a");
4826        // Should successfully resolve column 'a' through the CTE
4827        // (previously failed with "Cannot find column 'a' in query")
4828        assert!(
4829            !node.downstream.is_empty(),
4830            "Expected downstream nodes tracing through CTE, got none"
4831        );
4832    }
4833
4834    #[test]
4835    fn test_lineage_schema_less_cte_star_passthrough_resolves_base_column() {
4836        let expr = parse("WITH c AS (SELECT * FROM t) SELECT c.x FROM c");
4837        let node = lineage("x", &expr, None, false).unwrap();
4838
4839        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4840        assert!(
4841            all_names.iter().any(|name| name == "t.x"),
4842            "Expected schema-less CTE star passthrough to reach t.x, got: {:?}",
4843            all_names
4844        );
4845
4846        let cte_node = node
4847            .walk()
4848            .find(|child| child.source_kind == SourceKind::Cte && child.source_name == "c")
4849            .expect("expected CTE hop with source_name c");
4850        assert_eq!(cte_node.source_kind, SourceKind::Cte);
4851        assert_eq!(cte_node.source_name, "c");
4852    }
4853
4854    #[test]
4855    fn test_lineage_schema_less_cte_star_passthrough_with_aggregation() {
4856        let expr = parse(
4857            "WITH c AS (SELECT * FROM t) \
4858             SELECT SUM(c.x) AS s FROM c GROUP BY 1",
4859        );
4860        let node = lineage("s", &expr, None, false).unwrap();
4861
4862        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4863        assert!(
4864            all_names.iter().any(|name| name == "t.x"),
4865            "Expected aggregate over CTE star passthrough to reach t.x, got: {:?}",
4866            all_names
4867        );
4868    }
4869
4870    #[test]
4871    fn test_lineage_schema_less_cte_star_passthrough_with_join_and_alias() {
4872        let expr = parse(
4873            "WITH a AS (SELECT * FROM t1), b AS (SELECT * FROM t2) \
4874             SELECT SUM(b.x) AS s FROM a LEFT JOIN b ON b.id = a.id GROUP BY a.k",
4875        );
4876        let node = lineage("s", &expr, None, false).unwrap();
4877
4878        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4879        assert!(
4880            all_names.iter().any(|name| name == "t2.x"),
4881            "Expected joined CTE star passthrough to reach t2.x, got: {:?}",
4882            all_names
4883        );
4884    }
4885
4886    #[test]
4887    fn test_lineage_schema_less_chained_cte_star_passthrough() {
4888        let expr = parse(
4889            "WITH c1 AS (SELECT * FROM t), \
4890             c2 AS (SELECT * FROM c1), \
4891             c3 AS (SELECT * FROM c2) \
4892             SELECT c3.x FROM c3",
4893        );
4894        let node = lineage("x", &expr, None, false).unwrap();
4895
4896        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4897        assert!(
4898            all_names.iter().any(|name| name == "t.x"),
4899            "Expected chained CTE star passthrough to reach t.x, got: {:?}",
4900            all_names
4901        );
4902    }
4903
4904    #[test]
4905    fn test_lineage_schema_less_unqualified_star_with_multiple_sources_does_not_guess() {
4906        let expr = parse("SELECT * FROM t1 JOIN t2 ON t1.id = t2.id");
4907        let result = lineage("x", &expr, None, false);
4908
4909        assert!(
4910            result.is_err(),
4911            "Unqualified star over multiple sources should remain ambiguous, got: {:?}",
4912            result
4913        );
4914    }
4915
4916    #[test]
4917    fn test_lineage_cte_select_star_renamed_column() {
4918        // dbt standard pattern: CTE with column rename + outer SELECT *
4919        // This is the primary use case for dbt projects (jaffle-shop etc.)
4920        let expr =
4921            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
4922        let node = lineage("customer_id", &expr, None, false).unwrap();
4923
4924        assert_eq!(node.name, "customer_id");
4925        // Should trace customer_id → renamed CTE → source.id
4926        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4927        assert!(
4928            all_names.len() >= 2,
4929            "Expected at least 2 nodes (customer_id → source), got: {:?}",
4930            all_names
4931        );
4932    }
4933
4934    #[test]
4935    fn test_lineage_cte_select_star_multiple_columns() {
4936        // CTE exposes multiple columns, outer SELECT * should resolve each
4937        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
4938
4939        for col in &["a", "b", "c"] {
4940            let node = lineage(col, &expr, None, false).unwrap();
4941            assert_eq!(node.name, *col);
4942            // Verify lineage resolves without error (star expanded to explicit columns)
4943            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4944            assert!(
4945                all_names.len() >= 2,
4946                "Expected at least 2 nodes for column {}, got: {:?}",
4947                col,
4948                all_names
4949            );
4950        }
4951    }
4952
4953    #[test]
4954    fn test_lineage_nested_cte_select_star() {
4955        // Nested CTE star expansion: cte2 references cte1 via SELECT *
4956        let expr = parse(
4957            "WITH cte1 AS (SELECT a FROM t), \
4958             cte2 AS (SELECT * FROM cte1) \
4959             SELECT * FROM cte2",
4960        );
4961        let node = lineage("a", &expr, None, false).unwrap();
4962
4963        assert_eq!(node.name, "a");
4964        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4965        assert!(
4966            all_names.len() >= 3,
4967            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
4968            all_names
4969        );
4970    }
4971
4972    #[test]
4973    fn test_lineage_three_level_nested_cte_star() {
4974        // Three-level nested CTE: cte3 → cte2 → cte1 → t
4975        let expr = parse(
4976            "WITH cte1 AS (SELECT x FROM t), \
4977             cte2 AS (SELECT * FROM cte1), \
4978             cte3 AS (SELECT * FROM cte2) \
4979             SELECT * FROM cte3",
4980        );
4981        let node = lineage("x", &expr, None, false).unwrap();
4982
4983        assert_eq!(node.name, "x");
4984        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4985        assert!(
4986            all_names.len() >= 4,
4987            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
4988            all_names
4989        );
4990    }
4991
4992    #[test]
4993    fn test_lineage_cte_union_star() {
4994        // CTE with UNION body, outer SELECT * should resolve from left branch
4995        let expr = parse(
4996            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
4997             SELECT * FROM cte",
4998        );
4999        let node = lineage("a", &expr, None, false).unwrap();
5000
5001        assert_eq!(node.name, "a");
5002        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5003        assert!(
5004            all_names.len() >= 2,
5005            "Expected at least 2 nodes for CTE union star, got: {:?}",
5006            all_names
5007        );
5008    }
5009
5010    #[test]
5011    fn test_lineage_cte_star_unknown_table() {
5012        // When CTE references an unknown table, star expansion is skipped gracefully
5013        // and lineage falls back to normal resolution (which may fail)
5014        let expr = parse(
5015            "WITH cte AS (SELECT * FROM unknown_table) \
5016             SELECT * FROM cte",
5017        );
5018        // This should not panic — it may succeed or fail depending on resolution,
5019        // but should not crash
5020        let _result = lineage("x", &expr, None, false);
5021    }
5022
5023    #[test]
5024    fn test_lineage_cte_explicit_columns() {
5025        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
5026        let expr = parse(
5027            "WITH cte(x, y) AS (SELECT a, b FROM t) \
5028             SELECT * FROM cte",
5029        );
5030        let node = lineage("x", &expr, None, false).unwrap();
5031        assert_eq!(node.name, "x");
5032    }
5033
5034    #[test]
5035    fn test_lineage_cte_qualified_star() {
5036        // Qualified star: SELECT cte.* FROM cte
5037        let expr = parse(
5038            "WITH cte AS (SELECT a, b FROM t) \
5039             SELECT cte.* FROM cte",
5040        );
5041        for col in &["a", "b"] {
5042            let node = lineage(col, &expr, None, false).unwrap();
5043            assert_eq!(node.name, *col);
5044            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5045            assert!(
5046                all_names.len() >= 2,
5047                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
5048                col,
5049                all_names
5050            );
5051        }
5052    }
5053
5054    #[test]
5055    fn test_lineage_subquery_select_star() {
5056        // Ported from sqlglot: test_select_star
5057        // SELECT x FROM (SELECT * FROM table_a)
5058        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
5059        let node = lineage("x", &expr, None, false).unwrap();
5060
5061        assert_eq!(node.name, "x");
5062        assert!(
5063            !node.downstream.is_empty(),
5064            "Expected downstream nodes for subquery with SELECT *, got none"
5065        );
5066    }
5067
5068    #[test]
5069    fn test_lineage_cte_star_with_schema_external_table() {
5070        // CTE references an external table via SELECT * — schema enables expansion
5071        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
5072SELECT * FROM orders"#;
5073        let expr = parse(sql);
5074
5075        let mut schema = MappingSchema::new();
5076        let cols = vec![
5077            ("order_id".to_string(), DataType::Unknown),
5078            ("customer_id".to_string(), DataType::Unknown),
5079            ("amount".to_string(), DataType::Unknown),
5080        ];
5081        schema.add_table("stg_orders", &cols, None).unwrap();
5082
5083        let node =
5084            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5085                .unwrap();
5086        assert_eq!(node.name, "order_id");
5087    }
5088
5089    #[test]
5090    fn test_lineage_cte_star_with_schema_three_part_name() {
5091        // CTE references an external table with fully-qualified 3-part name
5092        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
5093SELECT * FROM orders"#;
5094        let expr = parse(sql);
5095
5096        let mut schema = MappingSchema::new();
5097        let cols = vec![
5098            ("order_id".to_string(), DataType::Unknown),
5099            ("customer_id".to_string(), DataType::Unknown),
5100        ];
5101        schema
5102            .add_table("db.schema.stg_orders", &cols, None)
5103            .unwrap();
5104
5105        let node = lineage_with_schema(
5106            "customer_id",
5107            &expr,
5108            Some(&schema as &dyn Schema),
5109            None,
5110            false,
5111        )
5112        .unwrap();
5113        assert_eq!(node.name, "customer_id");
5114    }
5115
5116    #[test]
5117    fn test_lineage_cte_star_with_schema_nested() {
5118        // Nested CTEs: outer CTE references inner CTE with SELECT *,
5119        // inner CTE references external table with SELECT *
5120        let sql = r#"WITH
5121            raw AS (SELECT * FROM external_table),
5122            enriched AS (SELECT * FROM raw)
5123        SELECT * FROM enriched"#;
5124        let expr = parse(sql);
5125
5126        let mut schema = MappingSchema::new();
5127        let cols = vec![
5128            ("id".to_string(), DataType::Unknown),
5129            ("name".to_string(), DataType::Unknown),
5130        ];
5131        schema.add_table("external_table", &cols, None).unwrap();
5132
5133        let node =
5134            lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
5135        assert_eq!(node.name, "name");
5136    }
5137
5138    #[test]
5139    fn test_lineage_cte_qualified_star_with_schema() {
5140        // CTE uses qualified star (orders.*) from a CTE whose columns
5141        // come from an external table via SELECT *
5142        let sql = r#"WITH
5143            orders AS (SELECT * FROM stg_orders),
5144            enriched AS (
5145                SELECT orders.*, 'extra' AS extra
5146                FROM orders
5147            )
5148        SELECT * FROM enriched"#;
5149        let expr = parse(sql);
5150
5151        let mut schema = MappingSchema::new();
5152        let cols = vec![
5153            ("order_id".to_string(), DataType::Unknown),
5154            ("total".to_string(), DataType::Unknown),
5155        ];
5156        schema.add_table("stg_orders", &cols, None).unwrap();
5157
5158        let node =
5159            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5160                .unwrap();
5161        assert_eq!(node.name, "order_id");
5162
5163        // Also verify the extra column works
5164        let extra =
5165            lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
5166        assert_eq!(extra.name, "extra");
5167    }
5168
5169    #[test]
5170    fn test_lineage_cte_star_without_schema_still_works() {
5171        // Without schema, CTE-to-CTE star expansion still works
5172        let sql = r#"WITH
5173            cte1 AS (SELECT id, name FROM raw_table),
5174            cte2 AS (SELECT * FROM cte1)
5175        SELECT * FROM cte2"#;
5176        let expr = parse(sql);
5177
5178        // No schema — should still resolve through CTE chain
5179        let node = lineage("id", &expr, None, false).unwrap();
5180        assert_eq!(node.name, "id");
5181    }
5182
5183    #[test]
5184    fn test_lineage_nested_cte_star_with_join_and_schema() {
5185        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
5186        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
5187        let sql = r#"WITH
5188base_orders AS (
5189    SELECT * FROM stg_orders
5190),
5191with_payments AS (
5192    SELECT
5193        base_orders.*,
5194        p.amount
5195    FROM base_orders
5196    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
5197),
5198final_cte AS (
5199    SELECT * FROM with_payments
5200)
5201SELECT * FROM final_cte"#;
5202        let expr = parse(sql);
5203
5204        let mut schema = MappingSchema::new();
5205        let order_cols = vec![
5206            (
5207                "order_id".to_string(),
5208                crate::expressions::DataType::Unknown,
5209            ),
5210            (
5211                "customer_id".to_string(),
5212                crate::expressions::DataType::Unknown,
5213            ),
5214            ("status".to_string(), crate::expressions::DataType::Unknown),
5215        ];
5216        let pay_cols = vec![
5217            (
5218                "payment_id".to_string(),
5219                crate::expressions::DataType::Unknown,
5220            ),
5221            (
5222                "order_id".to_string(),
5223                crate::expressions::DataType::Unknown,
5224            ),
5225            ("amount".to_string(), crate::expressions::DataType::Unknown),
5226        ];
5227        schema.add_table("stg_orders", &order_cols, None).unwrap();
5228        schema.add_table("stg_payments", &pay_cols, None).unwrap();
5229
5230        // order_id should trace back to stg_orders
5231        let node =
5232            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5233                .unwrap();
5234        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5235
5236        // The leaf should be "stg_orders.order_id" (not just "order_id")
5237        let has_table_qualified = all_names
5238            .iter()
5239            .any(|n| n.contains('.') && n.contains("order_id"));
5240        assert!(
5241            has_table_qualified,
5242            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
5243            all_names
5244        );
5245
5246        // amount should trace back to stg_payments
5247        let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
5248            .unwrap();
5249        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5250
5251        let has_table_qualified = all_names
5252            .iter()
5253            .any(|n| n.contains('.') && n.contains("amount"));
5254        assert!(
5255            has_table_qualified,
5256            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
5257            all_names
5258        );
5259    }
5260
5261    #[test]
5262    fn test_lineage_cte_alias_resolution() {
5263        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
5264        let sql = r#"WITH import_stg_items AS (
5265    SELECT item_id, name, status FROM stg_items
5266)
5267SELECT base.item_id, base.status
5268FROM import_stg_items AS base"#;
5269        let expr = parse(sql);
5270
5271        let node = lineage("item_id", &expr, None, false).unwrap();
5272        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5273        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
5274        assert!(
5275            all_names.iter().any(|n| n == "stg_items.item_id"),
5276            "Expected leaf 'stg_items.item_id', got: {:?}",
5277            all_names
5278        );
5279    }
5280
5281    #[test]
5282    fn test_lineage_cte_alias_with_schema_and_star() {
5283        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
5284        let sql = r#"WITH import_stg AS (
5285    SELECT * FROM stg_items
5286)
5287SELECT base.item_id, base.status
5288FROM import_stg AS base"#;
5289        let expr = parse(sql);
5290
5291        let mut schema = MappingSchema::new();
5292        schema
5293            .add_table(
5294                "stg_items",
5295                &[
5296                    ("item_id".to_string(), DataType::Unknown),
5297                    ("name".to_string(), DataType::Unknown),
5298                    ("status".to_string(), DataType::Unknown),
5299                ],
5300                None,
5301            )
5302            .unwrap();
5303
5304        let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
5305            .unwrap();
5306        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5307        assert!(
5308            all_names.iter().any(|n| n == "stg_items.item_id"),
5309            "Expected leaf 'stg_items.item_id', got: {:?}",
5310            all_names
5311        );
5312    }
5313
5314    #[test]
5315    fn test_lineage_cte_alias_with_join() {
5316        // Multiple CTE aliases in a JOIN: each should resolve independently
5317        let sql = r#"WITH
5318    import_users AS (SELECT id, name FROM users),
5319    import_orders AS (SELECT id, user_id, amount FROM orders)
5320SELECT u.name, o.amount
5321FROM import_users AS u
5322LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
5323        let expr = parse(sql);
5324
5325        let node = lineage("name", &expr, None, false).unwrap();
5326        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5327        assert!(
5328            all_names.iter().any(|n| n == "users.name"),
5329            "Expected leaf 'users.name', got: {:?}",
5330            all_names
5331        );
5332
5333        let node = lineage("amount", &expr, None, false).unwrap();
5334        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5335        assert!(
5336            all_names.iter().any(|n| n == "orders.amount"),
5337            "Expected leaf 'orders.amount', got: {:?}",
5338            all_names
5339        );
5340    }
5341
5342    // -----------------------------------------------------------------------
5343    // Quoted CTE name tests — verifying SQL identifier case semantics
5344    // -----------------------------------------------------------------------
5345
5346    #[test]
5347    fn test_lineage_unquoted_cte_case_insensitive() {
5348        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
5349        // MyCte and MYCTE should match.
5350        let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
5351        let node = lineage("col", &expr, None, false).unwrap();
5352        assert_eq!(node.name, "col");
5353        assert!(
5354            !node.downstream.is_empty(),
5355            "Unquoted CTE should resolve case-insensitively"
5356        );
5357    }
5358
5359    #[test]
5360    fn test_lineage_quoted_cte_case_preserved() {
5361        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
5362        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
5363        let node = lineage("col", &expr, None, false).unwrap();
5364        assert_eq!(node.name, "col");
5365        assert!(
5366            !node.downstream.is_empty(),
5367            "Quoted CTE with matching case should resolve"
5368        );
5369    }
5370
5371    #[test]
5372    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
5373        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
5374        // sqlglot treats this as a table reference, not a CTE match.
5375        // Star expansion should NOT resolve through the CTE.
5376        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
5377        // lineage("col", ...) should fail because "mycte" is treated as an external
5378        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
5379        let result = lineage("col", &expr, None, false);
5380        assert!(
5381            result.is_err(),
5382            "Quoted CTE with case mismatch should not expand star: {:?}",
5383            result
5384        );
5385    }
5386
5387    #[test]
5388    fn test_lineage_mixed_quoted_unquoted_cte() {
5389        // Mix of unquoted and quoted CTEs in a nested chain.
5390        let expr = parse(
5391            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
5392        );
5393        let node = lineage("a", &expr, None, false).unwrap();
5394        assert_eq!(node.name, "a");
5395        assert!(
5396            !node.downstream.is_empty(),
5397            "Mixed quoted/unquoted CTE chain should resolve"
5398        );
5399    }
5400
5401    // -----------------------------------------------------------------------
5402    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
5403    // -----------------------------------------------------------------------
5404    //
5405    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
5406    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
5407    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
5408    // direct string comparison for CTE name matching, ignoring the quoted status.
5409    //
5410    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
5411    // and unquoted as case-insensitive. The scope system should do the same.
5412    //
5413    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
5414    // which is broader than the star expansion scope of this PR.
5415
5416    #[test]
5417    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
5418        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
5419        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
5420        // as "mycte" incorrectly resolves to the CTE.
5421        //
5422        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
5423        // case-sensitive: "mycte" should NOT match CTE "MyCte".
5424        //
5425        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
5426        // this test should fail — update the assertion to match correct behavior:
5427        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
5428        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
5429        let node = lineage("col", &expr, None, false).unwrap();
5430        assert!(!node.downstream.is_empty());
5431        let child = &node.downstream[0];
5432        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
5433        assert_eq!(
5434            child.source_name, "MyCte",
5435            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5436             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5437        );
5438    }
5439
5440    #[test]
5441    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
5442        // Known bug: same as above but with qualified column reference ("mycte".col).
5443        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
5444        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
5445        //
5446        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
5447        // this test should fail — update to assert source_name != "MyCte".
5448        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
5449        let node = lineage("col", &expr, None, false).unwrap();
5450        assert!(!node.downstream.is_empty());
5451        let child = &node.downstream[0];
5452        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
5453        assert_eq!(
5454            child.source_name, "MyCte",
5455            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5456             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5457        );
5458    }
5459
5460    #[test]
5461    fn test_lineage_recursive_cte_terminates_at_base_case() {
5462        let expr = parse_dialect(
5463            "WITH RECURSIVE nums AS (\
5464             SELECT 1 AS n \
5465             UNION ALL \
5466             SELECT n + 1 FROM nums WHERE n < 5\
5467             ) SELECT n FROM nums",
5468            DialectType::DuckDB,
5469        );
5470        let node = lineage("n", &expr, Some(DialectType::DuckDB), false).unwrap();
5471        let names = lineage_names(&node);
5472
5473        assert!(
5474            names.len() <= 12,
5475            "recursive CTE lineage should not unroll repeatedly, got {names:?}"
5476        );
5477        assert!(
5478            node.walk()
5479                .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "nums"),
5480            "expected recursive source to be marked as a CTE, got {names:?}"
5481        );
5482    }
5483
5484    #[test]
5485    fn test_lineage_window_partition_and_order_columns() {
5486        let expr = parse(
5487            "WITH c AS (SELECT user_id, ts FROM events) \
5488             SELECT ROW_NUMBER() OVER (PARTITION BY c.user_id ORDER BY c.ts) AS out FROM c",
5489        );
5490        let node = lineage("out", &expr, None, false).unwrap();
5491
5492        assert_lineage_contains(&node, "events.user_id");
5493        assert_lineage_contains(&node, "events.ts");
5494    }
5495
5496    #[test]
5497    fn test_lineage_window_aggregate_order_column() {
5498        let expr = parse(
5499            "WITH c AS (SELECT amount, d FROM txns) \
5500             SELECT SUM(c.amount) OVER (ORDER BY c.d) AS running FROM c",
5501        );
5502        let node = lineage("running", &expr, None, false).unwrap();
5503
5504        assert_lineage_contains(&node, "txns.amount");
5505        assert_lineage_contains(&node, "txns.d");
5506    }
5507
5508    #[test]
5509    fn test_lineage_named_window_columns() {
5510        let expr = parse(
5511            "SELECT ROW_NUMBER() OVER w AS out \
5512             FROM events \
5513             WINDOW w AS (PARTITION BY user_id ORDER BY ts)",
5514        );
5515        let node = lineage("out", &expr, None, false).unwrap();
5516
5517        assert_lineage_contains(&node, "events.user_id");
5518        assert_lineage_contains(&node, "events.ts");
5519    }
5520
5521    #[test]
5522    fn test_lineage_within_group_order_column() {
5523        let expr =
5524            parse("SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) AS p FROM txns");
5525        let node = lineage("p", &expr, None, false).unwrap();
5526
5527        assert_lineage_contains(&node, "txns.amount");
5528    }
5529
5530    #[test]
5531    fn test_lineage_query_wrappers_resolve_inner_select() {
5532        for sql in [
5533            "CREATE TABLE tgt AS SELECT x FROM src",
5534            "CREATE VIEW v AS SELECT x FROM src",
5535            "INSERT INTO tgt SELECT x FROM src",
5536        ] {
5537            let expr = parse(sql);
5538            let node = lineage("x", &expr, None, false).unwrap();
5539            assert_lineage_contains(&node, "src.x");
5540        }
5541    }
5542
5543    #[test]
5544    fn test_lineage_scalar_subquery_through_cte_reaches_base_table() {
5545        let expr = parse(
5546            "WITH c AS (SELECT x FROM t) \
5547             SELECT (SELECT SUM(x) FROM c) AS s FROM c LIMIT 1",
5548        );
5549        let node = lineage("s", &expr, None, false).unwrap();
5550
5551        assert_lineage_contains(&node, "t.x");
5552        assert!(
5553            node.walk()
5554                .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "c"),
5555            "expected scalar subquery CTE hop in lineage, got {:?}",
5556            lineage_names(&node)
5557        );
5558    }
5559
5560    #[test]
5561    fn test_lineage_scalar_subqueries_inside_expression_wrappers() {
5562        for sql in [
5563            "WITH c AS (SELECT a, b FROM t) \
5564             SELECT CASE WHEN c.a > 0 THEN c.b ELSE (SELECT MAX(z) FROM o) END AS r FROM c",
5565            "WITH c AS (SELECT a FROM t) \
5566             SELECT COALESCE(c.a, (SELECT MAX(z) FROM o)) AS r FROM c",
5567            "WITH c AS (SELECT a FROM t) \
5568             SELECT CAST((SELECT MAX(z) FROM o) AS INT) + c.a AS r FROM c",
5569            "WITH c AS (SELECT a FROM t) \
5570             SELECT CASE WHEN c.a BETWEEN 0 AND (SELECT MAX(z) FROM o) THEN c.a END AS r FROM c",
5571        ] {
5572            let expr = parse_dialect(sql, DialectType::DuckDB);
5573            let node = lineage("r", &expr, Some(DialectType::DuckDB), false)
5574                .unwrap_or_else(|error| panic!("lineage failed for {sql}: {error}"));
5575
5576            assert_lineage_contains(&node, "o.z");
5577            assert_lineage_contains(&node, "t.a");
5578        }
5579    }
5580
5581    #[test]
5582    fn test_lineage_nested_set_operation_inside_derived_table() {
5583        let expr = parse_dialect(
5584            "SELECT v FROM ((SELECT v FROM t1 UNION ALL SELECT v FROM t2) \
5585             UNION ALL SELECT v FROM t3) u",
5586            DialectType::DuckDB,
5587        );
5588        let node = lineage("v", &expr, Some(DialectType::DuckDB), false).unwrap();
5589
5590        assert_lineage_contains(&node, "t1.v");
5591        assert_lineage_contains(&node, "t2.v");
5592        assert_lineage_contains(&node, "t3.v");
5593    }
5594
5595    #[test]
5596    fn test_lineage_select_alias_reference_resolves_to_alias_source() {
5597        let expr = parse_dialect(
5598            "WITH c AS (SELECT x FROM t) SELECT c.x AS a, a + 1 AS b FROM c",
5599            DialectType::DuckDB,
5600        );
5601        let node = lineage("b", &expr, Some(DialectType::DuckDB), false).unwrap();
5602
5603        assert_lineage_contains(&node, "t.x");
5604    }
5605
5606    #[test]
5607    fn test_lineage_pivot_output_resolves_aggregation_input() {
5608        let expr = parse_dialect(
5609            "SELECT * FROM (SELECT region, q, amt FROM sales) \
5610             PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5611            DialectType::DuckDB,
5612        );
5613        let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5614
5615        assert_lineage_contains(&node, "sales.amt");
5616    }
5617
5618    #[test]
5619    fn test_lineage_pivot_multi_aggregate_and_alias_columns() {
5620        let multi = parse_dialect(
5621            "SELECT * FROM (SELECT category, value, price FROM t) \
5622             PIVOT(SUM(value) AS value_sum, MAX(price) FOR category IN ('a' AS cat_a, 'b'))",
5623            DialectType::DuckDB,
5624        );
5625        let value_sum =
5626            lineage("cat_a_value_sum", &multi, Some(DialectType::DuckDB), false).unwrap();
5627        assert_lineage_contains(&value_sum, "t.value");
5628
5629        let max_price =
5630            lineage("cat_a_max(price)", &multi, Some(DialectType::DuckDB), false).unwrap();
5631        assert_lineage_contains(&max_price, "t.price");
5632
5633        let aliased = parse_dialect(
5634            "SELECT * FROM (SELECT region, q, amt FROM sales) \
5635             PIVOT(SUM(amt) FOR q IN ('Q1')) AS p(region2, p1)",
5636            DialectType::DuckDB,
5637        );
5638        let region = lineage("region2", &aliased, Some(DialectType::DuckDB), false).unwrap();
5639        assert_lineage_contains(&region, "sales.region");
5640
5641        let pivot_value = lineage("p1", &aliased, Some(DialectType::DuckDB), false).unwrap();
5642        assert_lineage_contains(&pivot_value, "sales.amt");
5643    }
5644
5645    #[test]
5646    fn test_lineage_pivot_through_cte_resolves_aggregation_input() {
5647        let expr = parse_dialect(
5648            "WITH src AS (SELECT region, q, amt FROM sales) \
5649             SELECT q1 FROM src PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5650            DialectType::DuckDB,
5651        );
5652        let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5653
5654        assert_lineage_contains(&node, "sales.amt");
5655    }
5656
5657    #[test]
5658    fn test_lineage_unpivot_value_resolves_input_columns() {
5659        let expr = parse_dialect(
5660            "SELECT name, val FROM t UNPIVOT(val FOR col IN (a, b, c))",
5661            DialectType::DuckDB,
5662        );
5663        let node = lineage("val", &expr, Some(DialectType::DuckDB), false).unwrap();
5664
5665        assert_lineage_contains(&node, "t.a");
5666        assert_lineage_contains(&node, "t.b");
5667        assert_lineage_contains(&node, "t.c");
5668    }
5669
5670    #[test]
5671    fn test_lineage_unpivot_multi_value_columns_resolve_positionally() {
5672        let expr = parse_dialect(
5673            "SELECT first_half_sales, second_half_sales, semester \
5674             FROM produce \
5675             UNPIVOT((first_half_sales, second_half_sales) \
5676             FOR semester IN ((q1, q2) AS 'semester_1', (q3, q4) AS 'semester_2'))",
5677            DialectType::BigQuery,
5678        );
5679
5680        let first = lineage(
5681            "first_half_sales",
5682            &expr,
5683            Some(DialectType::BigQuery),
5684            false,
5685        )
5686        .unwrap();
5687        assert_lineage_contains(&first, "produce.q1");
5688        assert_lineage_contains(&first, "produce.q3");
5689
5690        let second = lineage(
5691            "second_half_sales",
5692            &expr,
5693            Some(DialectType::BigQuery),
5694            false,
5695        )
5696        .unwrap();
5697        assert_lineage_contains(&second, "produce.q2");
5698        assert_lineage_contains(&second, "produce.q4");
5699    }
5700
5701    #[test]
5702    fn test_lineage_top_level_union_over_ctes_reaches_base_tables() {
5703        let expr = parse(
5704            "WITH a AS (SELECT x FROM t1), b AS (SELECT x FROM t2) \
5705             SELECT x FROM a UNION SELECT x FROM b",
5706        );
5707        let node = lineage("x", &expr, None, false).unwrap();
5708
5709        assert_lineage_contains(&node, "t1.x");
5710        assert_lineage_contains(&node, "t2.x");
5711    }
5712
5713    #[test]
5714    fn test_lineage_star_excludes_semi_join_rhs_source() {
5715        let expr = parse_dialect(
5716            "SELECT * FROM orders LEFT SEMI JOIN customers ON orders.customer_id = customers.id",
5717            DialectType::DuckDB,
5718        );
5719        let node = lineage("customer_id", &expr, Some(DialectType::DuckDB), false).unwrap();
5720
5721        assert_lineage_contains(&node, "orders.customer_id");
5722    }
5723
5724    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
5725
5726    /// sqlglot: test_node_name_doesnt_contain_comment
5727    /// Comments in column expressions should not affect lineage resolution.
5728    /// NOTE: This test uses SELECT * from a derived table, which is a separate
5729    /// known limitation in polyglot-sql (star expansion in subqueries).
5730    #[test]
5731    #[ignore = "requires derived table star expansion (separate issue)"]
5732    fn test_node_name_doesnt_contain_comment() {
5733        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
5734        let node = lineage("x", &expr, None, false).unwrap();
5735
5736        assert_eq!(node.name, "x");
5737        assert!(!node.downstream.is_empty());
5738    }
5739
5740    /// A line comment between SELECT and the first column wraps the column
5741    /// in an Annotated node. Lineage must unwrap it to find the column name.
5742    /// Verify that commented and uncommented queries produce identical lineage.
5743    #[test]
5744    fn test_comment_before_first_column_in_cte() {
5745        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
5746        let sql_without_comment = "with t as (select 1 as a) select a from t";
5747
5748        // Without comment — baseline
5749        let expr_ok = parse(sql_without_comment);
5750        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
5751
5752        // With comment — should produce identical lineage
5753        let expr_comment = parse(sql_with_comment);
5754        let node_comment = lineage("a", &expr_comment, None, false)
5755            .expect("with comment before first column should succeed");
5756
5757        assert_eq!(node_ok.name, node_comment.name, "node names should match");
5758        assert_eq!(
5759            node_ok.downstream_names(),
5760            node_comment.downstream_names(),
5761            "downstream lineage should be identical with or without comment"
5762        );
5763    }
5764
5765    /// Block comment between SELECT and first column.
5766    #[test]
5767    fn test_block_comment_before_first_column() {
5768        let sql = "with t as (select 1 as a) select /* section */ a from t";
5769        let expr = parse(sql);
5770        let node = lineage("a", &expr, None, false)
5771            .expect("block comment before first column should succeed");
5772        assert_eq!(node.name, "a");
5773        assert!(
5774            !node.downstream.is_empty(),
5775            "should have downstream lineage"
5776        );
5777    }
5778
5779    /// Comment before first column should not affect second column resolution.
5780    #[test]
5781    fn test_comment_before_first_column_second_col_ok() {
5782        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
5783        let expr = parse(sql);
5784
5785        let node_a =
5786            lineage("a", &expr, None, false).expect("column a with comment should succeed");
5787        assert_eq!(node_a.name, "a");
5788
5789        let node_b =
5790            lineage("b", &expr, None, false).expect("column b with comment should succeed");
5791        assert_eq!(node_b.name, "b");
5792    }
5793
5794    /// Aliased column with preceding comment.
5795    #[test]
5796    fn test_comment_before_aliased_column() {
5797        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
5798        let expr = parse(sql);
5799        let node =
5800            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
5801        assert_eq!(node.name, "y");
5802        assert!(
5803            !node.downstream.is_empty(),
5804            "aliased column should have downstream lineage"
5805        );
5806    }
5807}