Skip to main content

polyglot_sql/
lineage.rs

1//! Column Lineage Tracking
2//!
3//! This module provides functionality to track column lineage through SQL queries,
4//! building a graph of how columns flow from source tables to the result set.
5//! Supports UNION/INTERSECT/EXCEPT, CTEs, derived tables, subqueries, and star expansion.
6//!
7
8use crate::dialects::DialectType;
9use crate::expressions::{Expression, Identifier, JoinKind, NamedWindow, Select};
10use crate::generator::Generator;
11use crate::optimizer::annotate_types::annotate_types;
12use crate::optimizer::qualify_columns::{qualify_columns, QualifyColumnsOptions};
13use crate::schema::{normalize_name, Schema};
14use crate::scope::{
15    build_scope, find_all_in_scope, Scope, ScopeType, SourceInfo as ScopeSourceInfo, SourceKind,
16};
17use crate::{Error, Result};
18use serde::{Deserialize, Serialize};
19use std::collections::{HashMap, HashSet};
20
21/// A node in the column lineage graph
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct LineageNode {
24    /// Name of this lineage step (e.g., "table.column")
25    pub name: String,
26    /// The expression at this node
27    pub expression: Expression,
28    /// The source expression (the full query context)
29    pub source: Expression,
30    /// Downstream nodes that depend on this one
31    pub downstream: Vec<LineageNode>,
32    /// Optional source name (e.g., for derived tables)
33    pub source_name: String,
34    /// Semantic source kind for downstream consumers.
35    pub source_kind: SourceKind,
36    /// User-written source alias when different from canonical source name.
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub source_alias: Option<String>,
39    /// Optional reference node name (e.g., for CTEs)
40    pub reference_node_name: String,
41}
42
43impl LineageNode {
44    /// Create a new lineage node
45    pub fn new(name: impl Into<String>, expression: Expression, source: Expression) -> Self {
46        Self {
47            name: name.into(),
48            expression,
49            source,
50            downstream: Vec::new(),
51            source_name: String::new(),
52            source_kind: SourceKind::Unknown,
53            source_alias: None,
54            reference_node_name: String::new(),
55        }
56    }
57
58    /// Iterate over all nodes in the lineage graph using DFS
59    pub fn walk(&self) -> LineageWalker<'_> {
60        LineageWalker { stack: vec![self] }
61    }
62
63    /// Get all downstream column names
64    pub fn downstream_names(&self) -> Vec<String> {
65        self.downstream.iter().map(|n| n.name.clone()).collect()
66    }
67}
68
69fn source_kind_for_scope_context(
70    scope: &Scope,
71    source_name: &str,
72    reference_node_name: &str,
73) -> SourceKind {
74    if source_name.is_empty() && reference_node_name.is_empty() {
75        return SourceKind::Root;
76    }
77    if let Some(source_info) = scope.sources.get(source_name) {
78        return source_info.kind;
79    }
80    if scope.cte_sources.contains_key(source_name) {
81        return SourceKind::Cte;
82    }
83    match scope.scope_type {
84        ScopeType::Cte => SourceKind::Cte,
85        ScopeType::DerivedTable => SourceKind::DerivedTable,
86        ScopeType::Udtf => SourceKind::Virtual,
87        _ => SourceKind::Unknown,
88    }
89}
90
91fn apply_scope_context(
92    node: &mut LineageNode,
93    scope: &Scope,
94    source_name: &str,
95    reference_node_name: &str,
96) {
97    node.source_name = source_name.to_string();
98    node.reference_node_name = reference_node_name.to_string();
99    node.source_kind = source_kind_for_scope_context(scope, source_name, reference_node_name);
100}
101
102/// Iterator for walking the lineage graph
103pub struct LineageWalker<'a> {
104    stack: Vec<&'a LineageNode>,
105}
106
107impl<'a> Iterator for LineageWalker<'a> {
108    type Item = &'a LineageNode;
109
110    fn next(&mut self) -> Option<Self::Item> {
111        if let Some(node) = self.stack.pop() {
112            // Add children in reverse order so they're visited in order
113            for child in node.downstream.iter().rev() {
114                self.stack.push(child);
115            }
116            Some(node)
117        } else {
118            None
119        }
120    }
121}
122
123// ---------------------------------------------------------------------------
124// ColumnRef: name or positional index for column lookup
125// ---------------------------------------------------------------------------
126
127/// Column reference for lineage tracing — by name or positional index.
128enum ColumnRef<'a> {
129    Name(&'a str),
130    Index(usize),
131}
132
133// ---------------------------------------------------------------------------
134// Public API
135// ---------------------------------------------------------------------------
136
137/// Build the lineage graph for a column in a SQL query
138///
139/// # Arguments
140/// * `column` - The column name to trace lineage for
141/// * `sql` - The SQL expression (SELECT, UNION, etc.)
142/// * `dialect` - Optional dialect for parsing
143/// * `trim_selects` - If true, trim the source SELECT to only include the target column
144///
145/// # Returns
146/// The root lineage node for the specified column
147///
148/// # Example
149/// ```ignore
150/// use polyglot_sql::lineage::lineage;
151/// use polyglot_sql::parse_one;
152/// use polyglot_sql::DialectType;
153///
154/// let sql = "SELECT a, b + 1 AS c FROM t";
155/// let expr = parse_one(sql, DialectType::Generic).unwrap();
156/// let node = lineage("c", &expr, None, false).unwrap();
157/// ```
158pub fn lineage(
159    column: &str,
160    sql: &Expression,
161    dialect: Option<DialectType>,
162    trim_selects: bool,
163) -> Result<LineageNode> {
164    let mut owned = lineage_normalized_expression(sql);
165    // Fast path: skip clone when there are no CTEs to expand
166    if has_lineage_with_clause(&owned) {
167        expand_cte_stars(&mut owned, None);
168    }
169    lineage_from_expression(column, &owned, dialect, trim_selects)
170}
171
172/// Build the lineage graph for a column in a SQL query using optional schema metadata.
173///
174/// When `schema` is provided, the query is first qualified with
175/// `optimizer::qualify_columns`, allowing more accurate lineage for unqualified or
176/// ambiguous column references.
177///
178/// # Arguments
179/// * `column` - The column name to trace lineage for
180/// * `sql` - The SQL expression (SELECT, UNION, etc.)
181/// * `schema` - Optional schema used for qualification
182/// * `dialect` - Optional dialect for qualification and lineage handling
183/// * `trim_selects` - If true, trim the source SELECT to only include the target column
184///
185/// # Returns
186/// The root lineage node for the specified column
187pub fn lineage_with_schema(
188    column: &str,
189    sql: &Expression,
190    schema: Option<&dyn Schema>,
191    dialect: Option<DialectType>,
192    trim_selects: bool,
193) -> Result<LineageNode> {
194    let normalized_expression = lineage_normalized_expression(sql);
195    let mut qualified_expression = if let Some(schema) = schema {
196        let options = if let Some(dialect_type) = dialect.or_else(|| schema.dialect()) {
197            QualifyColumnsOptions::new()
198                .with_dialect(dialect_type)
199                .with_allow_partial(true)
200        } else {
201            QualifyColumnsOptions::new().with_allow_partial(true)
202        };
203
204        qualify_columns(normalized_expression.clone(), schema, &options).map_err(|e| {
205            Error::internal(format!("Lineage qualification failed with schema: {}", e))
206        })?
207    } else {
208        normalized_expression
209    };
210
211    // Annotate types in-place so lineage nodes carry type information
212    annotate_types(&mut qualified_expression, schema, dialect);
213
214    // Expand CTE stars on the already-owned expression (no extra clone).
215    // Pass schema so that stars from external tables can also be resolved.
216    expand_cte_stars(&mut qualified_expression, schema);
217
218    lineage_from_expression(column, &qualified_expression, dialect, trim_selects)
219}
220
221fn lineage_from_expression(
222    column: &str,
223    sql: &Expression,
224    dialect: Option<DialectType>,
225    trim_selects: bool,
226) -> Result<LineageNode> {
227    let scope = build_scope(sql);
228    to_node(
229        ColumnRef::Name(column),
230        &scope,
231        dialect,
232        "",
233        "",
234        "",
235        trim_selects,
236    )
237}
238
239pub(crate) fn lineage_by_index_from_expression(
240    column_index: usize,
241    sql: &Expression,
242    dialect: Option<DialectType>,
243    trim_selects: bool,
244) -> Result<LineageNode> {
245    let normalized = lineage_normalized_expression(sql);
246    let scope = build_scope(&normalized);
247    to_node(
248        ColumnRef::Index(column_index),
249        &scope,
250        dialect,
251        "",
252        "",
253        "",
254        trim_selects,
255    )
256}
257
258fn lineage_normalized_expression(sql: &Expression) -> Expression {
259    match sql {
260        Expression::Prepare(prepare) => lineage_normalized_expression(&prepare.statement),
261        Expression::CreateTable(create) => create
262            .as_select
263            .as_ref()
264            .map(|query| attach_with_to_query(query.clone(), create.with_cte.clone()))
265            .unwrap_or_else(|| sql.clone()),
266        Expression::CreateView(create) => lineage_normalized_expression(&create.query),
267        Expression::Insert(insert) => insert
268            .query
269            .as_ref()
270            .map(|query| attach_with_to_query(query.clone(), insert.with.clone()))
271            .unwrap_or_else(|| sql.clone()),
272        _ => sql.clone(),
273    }
274}
275
276fn attach_with_to_query(
277    mut query: Expression,
278    with: Option<crate::expressions::With>,
279) -> Expression {
280    if let Some(with) = with {
281        attach_with_to_query_mut(&mut query, with);
282    }
283    query
284}
285
286fn attach_with_to_query_mut(query: &mut Expression, with: crate::expressions::With) {
287    match query {
288        Expression::Select(select) => {
289            if select.with.is_none() {
290                select.with = Some(with);
291            }
292        }
293        Expression::Union(union) => {
294            if union.with.is_none() {
295                union.with = Some(with);
296            }
297        }
298        Expression::Intersect(intersect) => {
299            if intersect.with.is_none() {
300                intersect.with = Some(with);
301            }
302        }
303        Expression::Except(except) => {
304            if except.with.is_none() {
305                except.with = Some(with);
306            }
307        }
308        Expression::Paren(paren) => attach_with_to_query_mut(&mut paren.this, with),
309        _ => {}
310    }
311}
312
313fn has_lineage_with_clause(expr: &Expression) -> bool {
314    match expr {
315        Expression::Select(select) => select.with.is_some(),
316        Expression::Union(union) => {
317            union.with.is_some()
318                || has_lineage_with_clause(&union.left)
319                || has_lineage_with_clause(&union.right)
320        }
321        Expression::Intersect(intersect) => {
322            intersect.with.is_some()
323                || has_lineage_with_clause(&intersect.left)
324                || has_lineage_with_clause(&intersect.right)
325        }
326        Expression::Except(except) => {
327            except.with.is_some()
328                || has_lineage_with_clause(&except.left)
329                || has_lineage_with_clause(&except.right)
330        }
331        Expression::Paren(paren) => has_lineage_with_clause(&paren.this),
332        _ => false,
333    }
334}
335
336// ---------------------------------------------------------------------------
337// CTE star expansion
338// ---------------------------------------------------------------------------
339
340/// Normalize an identifier for CTE name matching.
341///
342/// Follows SQL semantics: unquoted identifiers are case-insensitive (lowercased),
343/// quoted identifiers preserve their original case. This matches sqlglot's
344/// `normalize_identifiers` behavior.
345fn normalize_cte_name(ident: &Identifier) -> String {
346    if ident.quoted {
347        ident.name.clone()
348    } else {
349        ident.name.to_lowercase()
350    }
351}
352
353/// Expand SELECT * in CTEs by walking CTE definitions in order and propagating
354/// resolved column lists. This handles nested CTEs (e.g., cte2 AS (SELECT * FROM cte1))
355/// which qualify_columns cannot resolve because it processes each SELECT independently.
356///
357/// When `schema` is provided, stars from external tables (not CTEs) are also resolved
358/// by looking up column names in the schema. This enables correct expansion of patterns
359/// like `WITH cte AS (SELECT * FROM external_table) SELECT * FROM cte`.
360///
361/// CTE name matching follows SQL identifier semantics: unquoted names are compared
362/// case-insensitively (lowercased), while quoted names preserve their original case.
363/// This matches sqlglot's `normalize_identifiers` behavior.
364pub fn expand_cte_stars(expr: &mut Expression, schema: Option<&dyn Schema>) {
365    if let Expression::Prepare(prepare) = expr {
366        expand_cte_stars(&mut prepare.statement, schema);
367        return;
368    }
369
370    let select = match expr {
371        Expression::Select(s) => s,
372        _ => return,
373    };
374
375    let with = match &mut select.with {
376        Some(w) => w,
377        None => return,
378    };
379
380    let mut resolved_cte_columns: HashMap<String, Vec<String>> = HashMap::new();
381
382    for cte in &mut with.ctes {
383        let cte_name = normalize_cte_name(&cte.alias);
384
385        // If CTE has explicit column list (e.g., cte(a, b) AS (...)), use that
386        if !cte.columns.is_empty() {
387            let cols: Vec<String> = cte.columns.iter().map(|c| c.name.clone()).collect();
388            resolved_cte_columns.insert(cte_name, cols);
389            continue;
390        }
391
392        // Skip recursive CTEs (self-referencing) — their column resolution is complex.
393        // A CTE is recursive if the WITH block is marked recursive AND the CTE body
394        // references itself. We detect this conservatively: if the CTE name appears as
395        // a source in its own body, skip it. Non-recursive CTEs in a recursive WITH
396        // block are still expanded.
397        if with.recursive {
398            let is_self_referencing =
399                if let Some(body_select) = get_leftmost_select_mut(&mut cte.this) {
400                    let body_sources = get_select_sources(body_select);
401                    body_sources.iter().any(|s| s.normalized == cte_name)
402                } else {
403                    false
404                };
405            if is_self_referencing {
406                continue;
407            }
408        }
409
410        // Get the SELECT from the CTE body (handle UNION by taking left branch)
411        let body_select = match get_leftmost_select_mut(&mut cte.this) {
412            Some(s) => s,
413            None => continue,
414        };
415
416        let columns = rewrite_stars_in_select(body_select, &resolved_cte_columns, schema);
417        resolved_cte_columns.insert(cte_name, columns);
418    }
419
420    // Also expand stars in the outer SELECT itself
421    rewrite_stars_in_select(select, &resolved_cte_columns, schema);
422}
423
424/// Get the leftmost SELECT from an expression, drilling through UNION/INTERSECT/EXCEPT.
425///
426/// Per the SQL standard, the column names of a set operation (UNION, INTERSECT, EXCEPT)
427/// are determined by the left branch. This matches sqlglot's behavior.
428fn get_leftmost_select_mut(expr: &mut Expression) -> Option<&mut Select> {
429    let mut current = expr;
430    for _ in 0..MAX_LINEAGE_DEPTH {
431        match current {
432            Expression::Select(s) => return Some(s),
433            Expression::Union(u) => current = &mut u.left,
434            Expression::Intersect(i) => current = &mut i.left,
435            Expression::Except(e) => current = &mut e.left,
436            Expression::Paren(p) => current = &mut p.this,
437            _ => return None,
438        }
439    }
440    None
441}
442
443/// Rewrite star expressions in a SELECT using resolved CTE column lists.
444/// Falls back to `schema` for external table column lookup.
445/// Returns the list of output column names after expansion.
446fn rewrite_stars_in_select(
447    select: &mut Select,
448    resolved_ctes: &HashMap<String, Vec<String>>,
449    schema: Option<&dyn Schema>,
450) -> Vec<String> {
451    // The AST represents star expressions in two forms depending on syntax:
452    //   - `SELECT *`      → Expression::Star (unqualified star)
453    //   - `SELECT table.*` → Expression::Column { name: "*", table: Some(...) } (qualified star)
454    // Both must be checked to handle all star patterns.
455    let has_star = select
456        .expressions
457        .iter()
458        .any(|e| matches!(e, Expression::Star(_)));
459    let has_qualified_star = select
460        .expressions
461        .iter()
462        .any(|e| matches!(e, Expression::Column(c) if c.name.name == "*"));
463
464    if !has_star && !has_qualified_star {
465        // No stars — just extract column names without rewriting
466        return select
467            .expressions
468            .iter()
469            .filter_map(get_expression_output_name)
470            .collect();
471    }
472
473    let sources = get_select_sources(select);
474    let mut new_expressions = Vec::new();
475    let mut result_columns = Vec::new();
476
477    for expr in &select.expressions {
478        match expr {
479            Expression::Star(star) => {
480                let qual = star.table.as_ref();
481                if let Some(expanded) =
482                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
483                {
484                    for (src_alias, col_name) in &expanded {
485                        let table_id = Identifier::new(src_alias);
486                        new_expressions.push(make_column_expr(col_name, Some(&table_id)));
487                        result_columns.push(col_name.clone());
488                    }
489                } else {
490                    new_expressions.push(expr.clone());
491                    result_columns.push("*".to_string());
492                }
493            }
494            Expression::Column(c) if c.name.name == "*" => {
495                let qual = c.table.as_ref();
496                if let Some(expanded) =
497                    expand_star_from_sources(qual, &sources, resolved_ctes, schema)
498                {
499                    for (_src_alias, col_name) in &expanded {
500                        // Keep the original table qualifier for qualified stars (table.*)
501                        new_expressions.push(make_column_expr(col_name, c.table.as_ref()));
502                        result_columns.push(col_name.clone());
503                    }
504                } else {
505                    new_expressions.push(expr.clone());
506                    result_columns.push("*".to_string());
507                }
508            }
509            _ => {
510                new_expressions.push(expr.clone());
511                if let Some(name) = get_expression_output_name(expr) {
512                    result_columns.push(name);
513                }
514            }
515        }
516    }
517
518    select.expressions = new_expressions;
519    result_columns
520}
521
522/// Try to expand a star expression by looking up source columns from resolved CTEs,
523/// falling back to the schema for external tables.
524/// Returns (source_alias, column_name) pairs so the caller can set table qualifiers.
525/// `qualifier`: Optional table qualifier (for `table.*`). If None, expand all sources.
526fn expand_star_from_sources(
527    qualifier: Option<&Identifier>,
528    sources: &[SourceInfo],
529    resolved_ctes: &HashMap<String, Vec<String>>,
530    schema: Option<&dyn Schema>,
531) -> Option<Vec<(String, String)>> {
532    let mut expanded = Vec::new();
533
534    if let Some(qual) = qualifier {
535        // Qualified star: table.*
536        let qual_normalized = normalize_cte_name(qual);
537        for src in sources {
538            if src.normalized == qual_normalized || src.alias.to_lowercase() == qual_normalized {
539                // Try CTE first
540                if let Some(cols) = resolved_ctes.get(&src.normalized) {
541                    expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
542                    return Some(expanded);
543                }
544                // Fall back to schema
545                if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
546                    expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
547                    return Some(expanded);
548                }
549            }
550        }
551        None
552    } else {
553        // Unqualified star: expand all sources.
554        // Intentionally conservative: if any source can't be resolved, the entire
555        // expansion is aborted. Partial expansion would produce an incomplete column
556        // list, causing downstream lineage resolution to silently omit columns.
557        // This matches sqlglot's behavior (raises SqlglotError when schema is missing).
558        let mut any_expanded = false;
559        for src in sources {
560            if let Some(cols) = resolved_ctes.get(&src.normalized) {
561                expanded.extend(cols.iter().map(|c| (src.alias.clone(), c.clone())));
562                any_expanded = true;
563            } else if let Some(cols) = lookup_schema_columns(schema, &src.fq_name) {
564                expanded.extend(cols.into_iter().map(|c| (src.alias.clone(), c)));
565                any_expanded = true;
566            } else {
567                return None;
568            }
569        }
570        if any_expanded {
571            Some(expanded)
572        } else {
573            None
574        }
575    }
576}
577
578/// Look up column names for a table from the schema.
579fn lookup_schema_columns(schema: Option<&dyn Schema>, fq_name: &str) -> Option<Vec<String>> {
580    let schema = schema?;
581    if fq_name.is_empty() {
582        return None;
583    }
584    schema
585        .column_names(fq_name)
586        .ok()
587        .filter(|cols| !cols.is_empty() && !cols.contains(&"*".to_string()))
588}
589
590/// Create a Column expression with the given name and optional table qualifier.
591fn make_column_expr(name: &str, table: Option<&Identifier>) -> Expression {
592    Expression::Column(Box::new(crate::expressions::Column {
593        name: Identifier::new(name),
594        table: table.cloned(),
595        join_mark: false,
596        trailing_comments: Vec::new(),
597        span: None,
598        inferred_type: None,
599    }))
600}
601
602/// Extract the output name of a SELECT expression.
603fn get_expression_output_name(expr: &Expression) -> Option<String> {
604    match expr {
605        Expression::Alias(a) => Some(a.alias.name.clone()),
606        Expression::Column(c) => Some(c.name.name.clone()),
607        Expression::Identifier(id) => Some(id.name.clone()),
608        Expression::Star(_) => Some("*".to_string()),
609        _ => None,
610    }
611}
612
613/// Source info extracted from a SELECT's FROM/JOIN clauses in a single pass.
614struct SourceInfo {
615    alias: String,
616    /// Whether this source was introduced through a quoted identifier.
617    ///
618    /// The schema-less star passthrough heuristic must stay conservative for
619    /// quoted sources because unresolved quoted table names can be distinct
620    /// from similarly named CTEs that older scope paths still compare
621    /// case-insensitively.
622    quoted: bool,
623    /// Normalized name for CTE lookup: unquoted → lowercased, quoted → as-is.
624    normalized: String,
625    /// Fully-qualified table name for schema lookup (e.g., "db.schema.table").
626    fq_name: String,
627}
628
629/// Extract source info (alias, normalized CTE name, fully-qualified name) from a
630/// SELECT's FROM and JOIN clauses in a single pass.
631fn get_select_sources(select: &Select) -> Vec<SourceInfo> {
632    let mut sources = Vec::new();
633
634    fn extract_source(expr: &Expression) -> Option<SourceInfo> {
635        fn virtual_source_info(alias: &Identifier) -> SourceInfo {
636            SourceInfo {
637                alias: alias.name.clone(),
638                quoted: alias.quoted,
639                normalized: normalize_cte_name(alias),
640                fq_name: alias.name.clone(),
641            }
642        }
643
644        fn named_virtual_source_info(alias: &str) -> SourceInfo {
645            SourceInfo {
646                alias: alias.to_string(),
647                quoted: false,
648                normalized: alias.to_lowercase(),
649                fq_name: alias.to_string(),
650            }
651        }
652
653        match expr {
654            Expression::Table(t) => {
655                let normalized = normalize_cte_name(&t.name);
656                let alias = t
657                    .alias
658                    .as_ref()
659                    .map(|a| a.name.clone())
660                    .unwrap_or_else(|| t.name.name.clone());
661                let mut parts = Vec::new();
662                if let Some(catalog) = &t.catalog {
663                    parts.push(catalog.name.clone());
664                }
665                if let Some(schema) = &t.schema {
666                    parts.push(schema.name.clone());
667                }
668                parts.push(t.name.name.clone());
669                let fq_name = parts.join(".");
670                Some(SourceInfo {
671                    alias,
672                    quoted: t.name.quoted,
673                    normalized,
674                    fq_name,
675                })
676            }
677            Expression::Subquery(s) => {
678                let alias_identifier = s.alias.as_ref()?;
679                let alias = alias_identifier.name.clone();
680                let normalized = alias.to_lowercase();
681                let fq_name = alias.clone();
682                Some(SourceInfo {
683                    alias,
684                    quoted: alias_identifier.quoted,
685                    normalized,
686                    fq_name,
687                })
688            }
689            Expression::Unnest(u) => u.alias.as_ref().map(virtual_source_info),
690            Expression::Alias(a) if matches!(&a.this, Expression::Unnest(_)) => {
691                Some(virtual_source_info(&a.alias))
692            }
693            Expression::Alias(a) if is_query_like_relation(&a.this) => {
694                Some(virtual_source_info(&a.alias))
695            }
696            Expression::Lateral(lateral) => lateral.alias.as_deref().map(named_virtual_source_info),
697            Expression::LateralView(lateral_view) => lateral_view
698                .table_alias
699                .as_ref()
700                .or_else(|| lateral_view.column_aliases.first())
701                .map(virtual_source_info),
702            Expression::Pivot(pivot) => {
703                let alias = pivot_lineage_source_name(
704                    &pivot.this,
705                    pivot.alias.as_ref().map(|alias| alias.name.as_str()),
706                );
707                Some(SourceInfo {
708                    alias: alias.clone(),
709                    quoted: false,
710                    normalized: alias.to_lowercase(),
711                    fq_name: alias,
712                })
713            }
714            Expression::Unpivot(unpivot) => {
715                let alias = pivot_lineage_source_name(
716                    &unpivot.this,
717                    unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
718                );
719                Some(SourceInfo {
720                    alias: alias.clone(),
721                    quoted: false,
722                    normalized: alias.to_lowercase(),
723                    fq_name: alias,
724                })
725            }
726            Expression::Paren(p) => extract_source(&p.this),
727            _ => None,
728        }
729    }
730
731    if let Some(from) = &select.from {
732        for expr in &from.expressions {
733            if let Some(info) = extract_source(expr) {
734                sources.push(info);
735            }
736        }
737    }
738    for join in &select.joins {
739        if is_semi_or_anti_join_kind(join.kind) {
740            continue;
741        }
742        if let Some(info) = extract_source(&join.this) {
743            sources.push(info);
744        }
745    }
746    for lateral_view in &select.lateral_views {
747        if let Some(info) = extract_source(&Expression::LateralView(Box::new(lateral_view.clone())))
748        {
749            sources.push(info);
750        }
751    }
752    sources
753}
754
755fn pivot_lineage_source_name(source: &Expression, explicit_alias: Option<&str>) -> String {
756    if let Some(alias) = explicit_alias {
757        return alias.to_string();
758    }
759
760    match source {
761        Expression::Table(table) => table
762            .alias
763            .as_ref()
764            .map(|alias| alias.name.clone())
765            .unwrap_or_else(|| table.name.name.clone()),
766        Expression::Subquery(subquery) => subquery
767            .alias
768            .as_ref()
769            .map(|alias| alias.name.clone())
770            .unwrap_or_else(|| "_0".to_string()),
771        Expression::Paren(paren) => pivot_lineage_source_name(&paren.this, explicit_alias),
772        _ => "_0".to_string(),
773    }
774}
775
776/// Get all source tables from a lineage graph
777pub fn get_source_tables(node: &LineageNode) -> HashSet<String> {
778    let mut tables = HashSet::new();
779    collect_source_tables(node, &mut tables);
780    tables
781}
782
783/// Recursively collect source table names from lineage graph
784pub fn collect_source_tables(node: &LineageNode, tables: &mut HashSet<String>) {
785    if let Expression::Table(table) = &node.source {
786        tables.insert(table.name.name.clone());
787    }
788    for child in &node.downstream {
789        collect_source_tables(child, tables);
790    }
791}
792
793// ---------------------------------------------------------------------------
794// Core recursive lineage builder
795// ---------------------------------------------------------------------------
796
797/// Maximum recursion depth for lineage tracing to prevent stack overflow
798/// on circular or deeply nested CTE chains.
799const MAX_LINEAGE_DEPTH: usize = 64;
800
801/// Recursively build a lineage node for a column in a scope.
802fn to_node(
803    column: ColumnRef<'_>,
804    scope: &Scope,
805    dialect: Option<DialectType>,
806    scope_name: &str,
807    source_name: &str,
808    reference_node_name: &str,
809    trim_selects: bool,
810) -> Result<LineageNode> {
811    to_node_inner(
812        column,
813        scope,
814        dialect,
815        scope_name,
816        source_name,
817        reference_node_name,
818        trim_selects,
819        &[],
820        0,
821    )
822}
823
824fn to_node_inner(
825    column: ColumnRef<'_>,
826    scope: &Scope,
827    dialect: Option<DialectType>,
828    scope_name: &str,
829    source_name: &str,
830    reference_node_name: &str,
831    trim_selects: bool,
832    ancestor_cte_scopes: &[Scope],
833    depth: usize,
834) -> Result<LineageNode> {
835    if depth > MAX_LINEAGE_DEPTH {
836        return Err(Error::internal(format!(
837            "lineage recursion depth exceeded (>{MAX_LINEAGE_DEPTH}) — possible circular CTE reference for scope '{scope_name}'"
838        )));
839    }
840    let scope_expr = &scope.expression;
841
842    // Build combined CTE scopes: current scope's cte_scopes + ancestors
843    let mut all_cte_scopes: Vec<&Scope> = scope.cte_scopes.iter().collect();
844    for s in ancestor_cte_scopes {
845        all_cte_scopes.push(s);
846    }
847    let descendant_cte_scopes = descendant_cte_scope_clones(&all_cte_scopes, scope);
848
849    // 0. Unwrap CTE scope — CTE scope expressions are Expression::Cte(...)
850    //    but we need the inner query (SELECT/UNION) for column lookup.
851    let effective_expr = effective_scope_expression(scope_expr);
852
853    // 1. Set operations (UNION / INTERSECT / EXCEPT)
854    if matches!(
855        effective_expr,
856        Expression::Union(_) | Expression::Intersect(_) | Expression::Except(_)
857    ) {
858        // For CTE wrapping a set op, create a temporary scope with the inner expression
859        if matches!(scope_expr, Expression::Cte(_)) {
860            let mut inner_scope = Scope::new(effective_expr.clone());
861            inner_scope.union_scopes = scope.union_scopes.clone();
862            inner_scope.sources = scope.sources.clone();
863            inner_scope.cte_sources = scope.cte_sources.clone();
864            inner_scope.cte_scopes = scope.cte_scopes.clone();
865            inner_scope.derived_table_scopes = scope.derived_table_scopes.clone();
866            inner_scope.subquery_scopes = scope.subquery_scopes.clone();
867            return handle_set_operation(
868                &column,
869                &inner_scope,
870                dialect,
871                scope_name,
872                source_name,
873                reference_node_name,
874                trim_selects,
875                &descendant_cte_scopes,
876                depth,
877            );
878        }
879        return handle_set_operation(
880            &column,
881            scope,
882            dialect,
883            scope_name,
884            source_name,
885            reference_node_name,
886            trim_selects,
887            &descendant_cte_scopes,
888            depth,
889        );
890    }
891
892    // 2. Find the select expression for this column
893    let select_expr = find_select_expr(effective_expr, &column, dialect)?;
894    let column_name = resolve_column_name(&column, &select_expr);
895
896    // 3. Trim source if requested
897    let node_source = if trim_selects {
898        trim_source(effective_expr, &select_expr)
899    } else {
900        effective_expr.clone()
901    };
902
903    // 4. Create the lineage node
904    let mut node = LineageNode::new(&column_name, select_expr.clone(), node_source);
905    apply_scope_context(&mut node, scope, source_name, reference_node_name);
906
907    // 5. Star handling — add downstream for each source
908    if let Expression::Star(star) = &select_expr {
909        let star_table = star
910            .table
911            .as_ref()
912            .map(|identifier| identifier.name.as_str());
913        for (name, source_info) in &scope.sources {
914            if let Some(star_table) = star_table {
915                let table_matches = name.eq_ignore_ascii_case(star_table)
916                    || source_info
917                        .alias
918                        .as_deref()
919                        .is_some_and(|alias| alias.eq_ignore_ascii_case(star_table))
920                    || matches!(
921                        &source_info.expression,
922                        Expression::Table(table_ref)
923                            if table_name_from_table_ref(table_ref).eq_ignore_ascii_case(star_table)
924                    );
925                if !table_matches {
926                    continue;
927                }
928            }
929
930            let mut child = LineageNode::new(
931                format!("{}.*", name),
932                Expression::Star(crate::expressions::Star {
933                    table: star.table.clone(),
934                    except: None,
935                    replace: None,
936                    rename: None,
937                    trailing_comments: vec![],
938                    span: None,
939                }),
940                source_info.expression.clone(),
941            );
942            apply_source_info_context(&mut child, name, source_info);
943            node.downstream.push(child);
944        }
945        return Ok(node);
946    }
947
948    // 6. Subqueries in select — trace through scalar subqueries
949    for query in query_expressions_in_scope(&select_expr) {
950        for sq_scope in &scope.subquery_scopes {
951            if sq_scope.expression == *query {
952                if let Ok(child) = to_node_inner(
953                    ColumnRef::Index(0),
954                    sq_scope,
955                    dialect,
956                    &column_name,
957                    "",
958                    "",
959                    trim_selects,
960                    &descendant_cte_scopes,
961                    depth + 1,
962                ) {
963                    node.downstream.push(child);
964                }
965                break;
966            }
967        }
968    }
969
970    // 7. Column references — trace each column to its source
971    let col_refs = find_column_refs_in_expr_with_select(&select_expr, effective_expr, dialect);
972    for col_ref in col_refs {
973        let col_name = &col_ref.column;
974        if let Some(ref table_id) = col_ref.table {
975            let tbl = &table_id.name;
976            resolve_qualified_column(
977                &mut node,
978                scope,
979                dialect,
980                tbl,
981                col_name,
982                &column_name,
983                trim_selects,
984                &all_cte_scopes,
985                depth,
986            );
987        } else {
988            if let Some(alias_expr) =
989                find_prior_select_alias_expr(effective_expr, &select_expr, col_name, dialect)
990            {
991                for alias_ref in
992                    find_column_refs_in_expr_with_select(&alias_expr, effective_expr, dialect)
993                {
994                    if let Some(ref table_id) = alias_ref.table {
995                        resolve_qualified_column(
996                            &mut node,
997                            scope,
998                            dialect,
999                            &table_id.name,
1000                            &alias_ref.column,
1001                            &column_name,
1002                            trim_selects,
1003                            &all_cte_scopes,
1004                            depth,
1005                        );
1006                    } else {
1007                        resolve_unqualified_column(
1008                            &mut node,
1009                            scope,
1010                            dialect,
1011                            &alias_ref.column,
1012                            &column_name,
1013                            trim_selects,
1014                            &all_cte_scopes,
1015                            depth,
1016                        );
1017                    }
1018                }
1019                continue;
1020            }
1021
1022            resolve_unqualified_column(
1023                &mut node,
1024                scope,
1025                dialect,
1026                col_name,
1027                &column_name,
1028                trim_selects,
1029                &all_cte_scopes,
1030                depth,
1031            );
1032        }
1033    }
1034
1035    Ok(node)
1036}
1037
1038fn descendant_cte_scope_clones(all_cte_scopes: &[&Scope], current_scope: &Scope) -> Vec<Scope> {
1039    all_cte_scopes
1040        .iter()
1041        .filter(|scope| scope.expression != current_scope.expression)
1042        .map(|scope| (*scope).clone())
1043        .collect()
1044}
1045
1046fn effective_scope_expression(expr: &Expression) -> &Expression {
1047    match expr {
1048        Expression::Cte(cte) => effective_scope_expression(&cte.this),
1049        Expression::Subquery(subquery) => effective_scope_expression(&subquery.this),
1050        Expression::Paren(paren) => effective_scope_expression(&paren.this),
1051        other => other,
1052    }
1053}
1054
1055fn query_expressions_in_scope(expr: &Expression) -> Vec<&Expression> {
1056    let mut queries = Vec::new();
1057    let mut seen = HashSet::new();
1058
1059    for node in find_all_in_scope(
1060        expr,
1061        |node| {
1062            matches!(
1063                node,
1064                Expression::Subquery(subquery) if subquery.alias.is_none()
1065            ) || matches!(
1066                node,
1067                Expression::Exists(_) | Expression::In(_) | Expression::Any(_) | Expression::All(_)
1068            )
1069        },
1070        false,
1071    ) {
1072        let query = match node {
1073            Expression::Subquery(subquery) if subquery.alias.is_none() => Some(&subquery.this),
1074            Expression::Exists(exists) => Some(&exists.this),
1075            Expression::In(in_expr) => in_expr.query.as_ref(),
1076            Expression::Any(quantified) | Expression::All(quantified) => Some(&quantified.subquery),
1077            _ => None,
1078        };
1079
1080        if let Some(query) = query {
1081            let key = query as *const Expression as usize;
1082            if seen.insert(key) {
1083                queries.push(query);
1084            }
1085        }
1086    }
1087
1088    queries
1089}
1090
1091// ---------------------------------------------------------------------------
1092// Set operation handling
1093// ---------------------------------------------------------------------------
1094
1095fn handle_set_operation(
1096    column: &ColumnRef<'_>,
1097    scope: &Scope,
1098    dialect: Option<DialectType>,
1099    scope_name: &str,
1100    source_name: &str,
1101    reference_node_name: &str,
1102    trim_selects: bool,
1103    ancestor_cte_scopes: &[Scope],
1104    depth: usize,
1105) -> Result<LineageNode> {
1106    let scope_expr = &scope.expression;
1107
1108    // Determine column index
1109    let col_index = match column {
1110        ColumnRef::Name(name) => column_to_index(scope_expr, name, dialect)?,
1111        ColumnRef::Index(i) => *i,
1112    };
1113
1114    let col_name = match column {
1115        ColumnRef::Name(name) => name.to_string(),
1116        ColumnRef::Index(_) => format!("_{col_index}"),
1117    };
1118
1119    let mut node = LineageNode::new(&col_name, scope_expr.clone(), scope_expr.clone());
1120    apply_scope_context(&mut node, scope, source_name, reference_node_name);
1121
1122    // Recurse into each union branch
1123    for branch_scope in &scope.union_scopes {
1124        if let Ok(child) = to_node_inner(
1125            ColumnRef::Index(col_index),
1126            branch_scope,
1127            dialect,
1128            scope_name,
1129            "",
1130            "",
1131            trim_selects,
1132            ancestor_cte_scopes,
1133            depth + 1,
1134        ) {
1135            node.downstream.push(child);
1136        }
1137    }
1138
1139    Ok(node)
1140}
1141
1142// ---------------------------------------------------------------------------
1143// Column resolution helpers
1144// ---------------------------------------------------------------------------
1145
1146fn resolve_qualified_column(
1147    node: &mut LineageNode,
1148    scope: &Scope,
1149    dialect: Option<DialectType>,
1150    table: &str,
1151    col_name: &str,
1152    parent_name: &str,
1153    trim_selects: bool,
1154    all_cte_scopes: &[&Scope],
1155    depth: usize,
1156) {
1157    // Resolve CTE alias: if `table` is a FROM alias for a CTE (e.g., `FROM my_cte AS t`),
1158    // resolve it to the actual CTE name so the CTE scope lookup succeeds.
1159    let resolved_cte_name = resolve_cte_alias(scope, table);
1160    let effective_table = resolved_cte_name.as_deref().unwrap_or(table);
1161
1162    if let Some(source_info) = scope
1163        .sources
1164        .get(table)
1165        .or_else(|| scope.sources.get(effective_table))
1166    {
1167        match &source_info.expression {
1168            Expression::Pivot(pivot) => {
1169                if attach_pivot_dependencies(
1170                    node,
1171                    scope,
1172                    dialect,
1173                    pivot,
1174                    col_name,
1175                    trim_selects,
1176                    all_cte_scopes,
1177                    depth,
1178                ) {
1179                    return;
1180                }
1181            }
1182            Expression::Unpivot(unpivot) => {
1183                if attach_unpivot_dependencies(
1184                    node,
1185                    scope,
1186                    dialect,
1187                    unpivot,
1188                    col_name,
1189                    trim_selects,
1190                    all_cte_scopes,
1191                    depth,
1192                ) {
1193                    return;
1194                }
1195            }
1196            _ => {}
1197        }
1198    }
1199
1200    // Check if table is a CTE reference — check both the current scope's cte_sources
1201    // and ancestor CTE scopes (for sibling CTEs in parent WITH clauses).
1202    let is_cte = scope.cte_sources.contains_key(effective_table)
1203        || all_cte_scopes.iter().any(
1204            |s| matches!(&s.expression, Expression::Cte(cte) if cte.alias.name == effective_table),
1205        );
1206    if is_cte {
1207        if let Some(child_scope) = find_child_scope_in(all_cte_scopes, scope, effective_table) {
1208            // Build ancestor CTE scopes from all_cte_scopes for the recursive call
1209            let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1210            if let Ok(child) = to_node_inner(
1211                ColumnRef::Name(col_name),
1212                child_scope,
1213                dialect,
1214                parent_name,
1215                effective_table,
1216                parent_name,
1217                trim_selects,
1218                &ancestors,
1219                depth + 1,
1220            ) {
1221                node.downstream.push(child);
1222                return;
1223            }
1224        }
1225
1226        if let Some(source_info) = scope
1227            .sources
1228            .get(table)
1229            .or_else(|| scope.sources.get(effective_table))
1230            .filter(|source_info| source_info.kind == SourceKind::Cte)
1231        {
1232            node.downstream.push(make_table_column_node_from_source(
1233                effective_table,
1234                col_name,
1235                source_info,
1236            ));
1237            return;
1238        }
1239    }
1240
1241    // Check if table is a derived table (is_scope = true in sources)
1242    if let Some(source_info) = scope.sources.get(table) {
1243        if source_info.is_scope {
1244            if let Some(child_scope) = find_child_scope(scope, table) {
1245                let ancestors: Vec<Scope> = all_cte_scopes.iter().map(|s| (*s).clone()).collect();
1246                if let Ok(child) = to_node_inner(
1247                    ColumnRef::Name(col_name),
1248                    child_scope,
1249                    dialect,
1250                    parent_name,
1251                    table,
1252                    parent_name,
1253                    trim_selects,
1254                    &ancestors,
1255                    depth + 1,
1256                ) {
1257                    node.downstream.push(child);
1258                    return;
1259                }
1260            }
1261        }
1262    }
1263
1264    // Base table source found in current scope: preserve alias in the display name
1265    // but store the resolved table expression and name for downstream consumers.
1266    if let Some(source_info) = scope.sources.get(table) {
1267        if !source_info.is_scope {
1268            let mut child = make_table_column_node_from_source(table, col_name, source_info);
1269            if source_info.kind == SourceKind::Virtual {
1270                attach_virtual_source_dependencies(
1271                    &mut child,
1272                    scope,
1273                    dialect,
1274                    table,
1275                    &source_info.expression,
1276                    trim_selects,
1277                    all_cte_scopes,
1278                    depth,
1279                );
1280            }
1281            node.downstream.push(child);
1282            return;
1283        }
1284    }
1285
1286    // Base table or unresolved — terminal node
1287    node.downstream
1288        .push(make_table_column_node(table, col_name));
1289}
1290
1291fn attach_pivot_dependencies(
1292    node: &mut LineageNode,
1293    scope: &Scope,
1294    dialect: Option<DialectType>,
1295    pivot: &crate::expressions::Pivot,
1296    col_name: &str,
1297    trim_selects: bool,
1298    all_cte_scopes: &[&Scope],
1299    depth: usize,
1300) -> bool {
1301    if pivot.unpivot {
1302        return false;
1303    }
1304
1305    let mapping = pivot_lineage_column_mapping(pivot, scope, dialect);
1306    let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1307        if pivot_implicit_source_column(pivot, col_name) {
1308            let col_ref = SimpleColumnRef {
1309                table: None,
1310                column: col_name.to_string(),
1311            };
1312            attach_pivot_input_column(
1313                node,
1314                scope,
1315                dialect,
1316                &pivot.this,
1317                &col_ref,
1318                trim_selects,
1319                all_cte_scopes,
1320                depth,
1321            );
1322            return true;
1323        }
1324        return false;
1325    };
1326
1327    for col_ref in input_columns {
1328        attach_pivot_input_column(
1329            node,
1330            scope,
1331            dialect,
1332            &pivot.this,
1333            col_ref,
1334            trim_selects,
1335            all_cte_scopes,
1336            depth,
1337        );
1338    }
1339    true
1340}
1341
1342fn attach_unpivot_dependencies(
1343    node: &mut LineageNode,
1344    scope: &Scope,
1345    dialect: Option<DialectType>,
1346    unpivot: &crate::expressions::Unpivot,
1347    col_name: &str,
1348    trim_selects: bool,
1349    all_cte_scopes: &[&Scope],
1350    depth: usize,
1351) -> bool {
1352    let mapping = unpivot_column_mapping(unpivot, dialect);
1353    let Some(input_columns) = mapping.get(&normalize_column_name(col_name, dialect)) else {
1354        return false;
1355    };
1356
1357    for col_ref in input_columns {
1358        attach_pivot_input_column(
1359            node,
1360            scope,
1361            dialect,
1362            &unpivot.this,
1363            col_ref,
1364            trim_selects,
1365            all_cte_scopes,
1366            depth,
1367        );
1368    }
1369    true
1370}
1371
1372fn pivot_column_mapping(
1373    pivot: &crate::expressions::Pivot,
1374    dialect: Option<DialectType>,
1375) -> HashMap<String, Vec<SimpleColumnRef>> {
1376    let aggregations = pivot_aggregation_expressions(pivot);
1377    let output_columns = pivot_generated_output_columns(pivot, dialect);
1378    if aggregations.is_empty() || output_columns.is_empty() {
1379        return HashMap::new();
1380    }
1381
1382    let mut mapping = HashMap::new();
1383    for (agg_index, agg) in aggregations.iter().enumerate() {
1384        let input_columns = find_column_refs_in_expr(agg, dialect);
1385        if input_columns.is_empty() {
1386            continue;
1387        }
1388        for col_index in (agg_index..output_columns.len()).step_by(aggregations.len()) {
1389            mapping.insert(
1390                normalize_column_name(&output_columns[col_index], dialect),
1391                input_columns.clone(),
1392            );
1393        }
1394    }
1395    mapping
1396}
1397
1398fn pivot_lineage_column_mapping(
1399    pivot: &crate::expressions::Pivot,
1400    scope: &Scope,
1401    dialect: Option<DialectType>,
1402) -> HashMap<String, Vec<SimpleColumnRef>> {
1403    let mut mapping = pivot_column_mapping(pivot, dialect);
1404    let Some(pre_pivot_columns) = pre_pivot_output_columns(&pivot.this, scope) else {
1405        return mapping;
1406    };
1407
1408    let output_columns = pivot_output_columns(pivot, &pre_pivot_columns, dialect);
1409    if output_columns.is_empty() {
1410        return mapping;
1411    }
1412
1413    let base_mapping = mapping.clone();
1414    for (post_name, pre_name) in output_columns {
1415        let normalized_pre = normalize_column_name(&pre_name, dialect);
1416        let normalized_post = normalize_column_name(&post_name, dialect);
1417
1418        if let Some(input_columns) = base_mapping.get(&normalized_pre) {
1419            mapping.insert(normalized_post, input_columns.clone());
1420        } else {
1421            mapping.insert(
1422                normalized_post,
1423                vec![SimpleColumnRef {
1424                    table: None,
1425                    column: pre_name,
1426                }],
1427            );
1428        }
1429    }
1430
1431    mapping
1432}
1433
1434fn pre_pivot_output_columns(source: &Expression, scope: &Scope) -> Option<Vec<String>> {
1435    match source {
1436        Expression::Subquery(subquery) => known_output_columns(&subquery.this),
1437        Expression::Table(table) if table.schema.is_none() && table.catalog.is_none() => scope
1438            .cte_sources
1439            .get(&table.name.name)
1440            .and_then(|source| known_output_columns(&source.expression)),
1441        Expression::Paren(paren) => pre_pivot_output_columns(&paren.this, scope),
1442        _ => None,
1443    }
1444}
1445
1446fn known_output_columns(expression: &Expression) -> Option<Vec<String>> {
1447    let expression = match expression {
1448        Expression::Cte(cte) => &cte.this,
1449        Expression::Subquery(subquery) => &subquery.this,
1450        other => other,
1451    };
1452    let columns = crate::ast_transforms::get_output_column_names(expression);
1453    if columns.is_empty() || columns.iter().any(|column| column == "*") {
1454        None
1455    } else {
1456        Some(columns)
1457    }
1458}
1459
1460fn pivot_output_columns(
1461    pivot: &crate::expressions::Pivot,
1462    pre_pivot_columns: &[String],
1463    dialect: Option<DialectType>,
1464) -> Vec<(String, String)> {
1465    let generated_outputs = pivot_generated_output_columns(pivot, dialect);
1466    let excluded = pivot_excluded_source_columns(pivot, dialect);
1467
1468    if excluded.is_empty() || generated_outputs.is_empty() {
1469        return Vec::new();
1470    }
1471
1472    let mut pre_rename: Vec<String> = pre_pivot_columns
1473        .iter()
1474        .filter(|column| !excluded.contains(&normalize_column_name(column, dialect)))
1475        .cloned()
1476        .collect();
1477    pre_rename.extend(generated_outputs);
1478
1479    let post_rename = if pivot.alias_columns.is_empty() {
1480        pre_rename.clone()
1481    } else {
1482        let mut names: Vec<String> = pivot
1483            .alias_columns
1484            .iter()
1485            .map(|column| column.name.clone())
1486            .collect();
1487        names.extend(pre_rename.iter().skip(names.len()).cloned());
1488        names
1489    };
1490
1491    post_rename.into_iter().zip(pre_rename).collect()
1492}
1493
1494fn pivot_excluded_source_columns(
1495    pivot: &crate::expressions::Pivot,
1496    dialect: Option<DialectType>,
1497) -> HashSet<String> {
1498    pivot
1499        .fields
1500        .iter()
1501        .chain(pivot.expressions.iter())
1502        .chain(pivot.using.iter())
1503        .flat_map(|expr| find_column_refs_in_expr(expr, dialect))
1504        .map(|column| normalize_column_name(&column.column, dialect))
1505        .collect()
1506}
1507
1508fn pivot_generated_output_columns(
1509    pivot: &crate::expressions::Pivot,
1510    _dialect: Option<DialectType>,
1511) -> Vec<String> {
1512    let fields = pivot_field_output_names(pivot);
1513    if fields.is_empty() {
1514        return Vec::new();
1515    }
1516
1517    let aggregations = pivot_aggregation_expressions(pivot);
1518    if aggregations.is_empty() {
1519        return Vec::new();
1520    }
1521
1522    let needs_suffix = aggregations.len() > 1;
1523    let mut outputs = Vec::new();
1524    for field in fields {
1525        for aggregation in aggregations {
1526            if let Some(suffix) = pivot_aggregation_output_suffix(aggregation, needs_suffix) {
1527                outputs.push(format!("{field}_{suffix}"));
1528            } else {
1529                outputs.push(field.clone());
1530            }
1531        }
1532    }
1533    outputs
1534}
1535
1536fn pivot_aggregation_expressions(pivot: &crate::expressions::Pivot) -> &[Expression] {
1537    if pivot.using.is_empty() {
1538        &pivot.expressions
1539    } else {
1540        &pivot.using
1541    }
1542}
1543
1544fn pivot_aggregation_output_suffix(expr: &Expression, needs_suffix: bool) -> Option<String> {
1545    match expr {
1546        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1547        _ if needs_suffix => Generator::sql(expr).ok().map(|sql| sql.to_lowercase()),
1548        _ => None,
1549    }
1550}
1551
1552fn pivot_field_output_names(pivot: &crate::expressions::Pivot) -> Vec<String> {
1553    let mut names = Vec::new();
1554    for field in &pivot.fields {
1555        if let Expression::In(in_expr) = field {
1556            for expr in &in_expr.expressions {
1557                if let Some(name) = pivot_expr_output_name(expr) {
1558                    names.push(name);
1559                }
1560            }
1561        }
1562    }
1563    names
1564}
1565
1566fn pivot_expr_output_name(expr: &Expression) -> Option<String> {
1567    match expr {
1568        Expression::PivotAlias(alias) => pivot_expr_output_name(&alias.alias),
1569        Expression::Alias(alias) => Some(alias.alias.name.clone()),
1570        Expression::Identifier(identifier) => Some(identifier.name.clone()),
1571        Expression::Column(column) => Some(column.name.name.clone()),
1572        Expression::Literal(literal) => Some(literal.value_str().to_string()),
1573        Expression::Var(var) => Some(var.this.clone()),
1574        Expression::Tuple(tuple) => tuple.expressions.first().and_then(pivot_expr_output_name),
1575        _ => None,
1576    }
1577}
1578
1579fn pivot_implicit_source_column(pivot: &crate::expressions::Pivot, col_name: &str) -> bool {
1580    let pivot_columns: HashSet<String> = pivot
1581        .fields
1582        .iter()
1583        .filter_map(|field| match field {
1584            Expression::In(in_expr) => Some(&in_expr.this),
1585            _ => None,
1586        })
1587        .flat_map(|expr| find_column_refs_in_expr(expr, None))
1588        .map(|col| col.column.to_lowercase())
1589        .collect();
1590    let aggregation_columns: HashSet<String> = pivot
1591        .expressions
1592        .iter()
1593        .flat_map(|expr| find_column_refs_in_expr(expr, None))
1594        .map(|col| col.column.to_lowercase())
1595        .collect();
1596
1597    let normalized = col_name.to_lowercase();
1598    !pivot_columns.contains(&normalized) && !aggregation_columns.contains(&normalized)
1599}
1600
1601fn unpivot_column_mapping(
1602    unpivot: &crate::expressions::Unpivot,
1603    dialect: Option<DialectType>,
1604) -> HashMap<String, Vec<SimpleColumnRef>> {
1605    let value_columns: Vec<String> = std::iter::once(unpivot.value_column.name.clone())
1606        .chain(
1607            unpivot
1608                .extra_value_columns
1609                .iter()
1610                .map(|column| column.name.clone()),
1611        )
1612        .collect();
1613    let mut all_input_columns = Vec::new();
1614    let mut value_input_columns: Vec<Vec<SimpleColumnRef>> = vec![Vec::new(); value_columns.len()];
1615
1616    for entry in &unpivot.columns {
1617        let columns = unpivot_entry_columns(entry);
1618        all_input_columns.extend(columns.clone());
1619        if columns.len() == value_columns.len() {
1620            for (idx, col_ref) in columns.into_iter().enumerate() {
1621                value_input_columns[idx].push(col_ref);
1622            }
1623        } else {
1624            for inputs in &mut value_input_columns {
1625                inputs.extend(columns.clone());
1626            }
1627        }
1628    }
1629
1630    let mut mapping = HashMap::new();
1631    mapping.insert(
1632        normalize_column_name(&unpivot.name_column.name, dialect),
1633        all_input_columns.clone(),
1634    );
1635    for (idx, value_column) in value_columns.iter().enumerate() {
1636        mapping.insert(
1637            normalize_column_name(value_column, dialect),
1638            value_input_columns.get(idx).cloned().unwrap_or_default(),
1639        );
1640    }
1641    mapping
1642}
1643
1644fn unpivot_entry_columns(expr: &Expression) -> Vec<SimpleColumnRef> {
1645    match expr {
1646        Expression::PivotAlias(alias) => unpivot_entry_columns(&alias.this),
1647        Expression::Tuple(tuple) => tuple
1648            .expressions
1649            .iter()
1650            .flat_map(unpivot_entry_columns)
1651            .collect(),
1652        Expression::Column(column) => vec![SimpleColumnRef {
1653            table: column.table.clone(),
1654            column: column.name.name.clone(),
1655        }],
1656        Expression::Identifier(identifier) => vec![SimpleColumnRef {
1657            table: None,
1658            column: identifier.name.clone(),
1659        }],
1660        _ => find_column_refs_in_expr(expr, None),
1661    }
1662}
1663
1664fn attach_pivot_input_column(
1665    node: &mut LineageNode,
1666    scope: &Scope,
1667    dialect: Option<DialectType>,
1668    source_expr: &Expression,
1669    col_ref: &SimpleColumnRef,
1670    trim_selects: bool,
1671    all_cte_scopes: &[&Scope],
1672    depth: usize,
1673) {
1674    match source_expr {
1675        Expression::Table(table) => {
1676            let table_name = col_ref
1677                .table
1678                .as_ref()
1679                .map(|identifier| identifier.name.as_str())
1680                .unwrap_or(table.name.name.as_str());
1681            if scope.cte_sources.contains_key(table_name) {
1682                resolve_qualified_column(
1683                    node,
1684                    scope,
1685                    dialect,
1686                    table_name,
1687                    &col_ref.column,
1688                    &node.name.clone(),
1689                    trim_selects,
1690                    all_cte_scopes,
1691                    depth + 1,
1692                );
1693            } else {
1694                let mut source = ScopeSourceInfo::new(
1695                    Expression::Table(Box::new(table.as_ref().clone())),
1696                    false,
1697                    SourceKind::Table,
1698                );
1699                if let Some(alias) = &table.alias {
1700                    source = source.with_alias(alias.name.clone());
1701                }
1702                let source_key = table
1703                    .alias
1704                    .as_ref()
1705                    .map(|alias| alias.name.as_str())
1706                    .unwrap_or(table.name.name.as_str());
1707                node.downstream.push(make_table_column_node_from_source(
1708                    source_key,
1709                    &col_ref.column,
1710                    &source,
1711                ));
1712            }
1713        }
1714        Expression::Subquery(subquery) => {
1715            let source_scope = build_scope(&subquery.this);
1716            let child = if let Some(table) = &col_ref.table {
1717                let mut child_node = LineageNode::new(
1718                    &col_ref.column,
1719                    subquery.this.clone(),
1720                    subquery.this.clone(),
1721                );
1722                resolve_qualified_column(
1723                    &mut child_node,
1724                    &source_scope,
1725                    dialect,
1726                    &table.name,
1727                    &col_ref.column,
1728                    &node.name.clone(),
1729                    trim_selects,
1730                    all_cte_scopes,
1731                    depth + 1,
1732                );
1733                Ok(child_node)
1734            } else {
1735                to_node_inner(
1736                    ColumnRef::Name(&col_ref.column),
1737                    &source_scope,
1738                    dialect,
1739                    "",
1740                    "",
1741                    "",
1742                    trim_selects,
1743                    &all_cte_scopes
1744                        .iter()
1745                        .map(|scope| (*scope).clone())
1746                        .collect::<Vec<_>>(),
1747                    depth + 1,
1748                )
1749            };
1750            if let Ok(child) = child {
1751                node.downstream.push(child);
1752            }
1753        }
1754        Expression::Paren(paren) => attach_pivot_input_column(
1755            node,
1756            scope,
1757            dialect,
1758            &paren.this,
1759            col_ref,
1760            trim_selects,
1761            all_cte_scopes,
1762            depth,
1763        ),
1764        _ => {
1765            if let Some(table) = &col_ref.table {
1766                resolve_qualified_column(
1767                    node,
1768                    scope,
1769                    dialect,
1770                    &table.name,
1771                    &col_ref.column,
1772                    &node.name.clone(),
1773                    trim_selects,
1774                    all_cte_scopes,
1775                    depth + 1,
1776                );
1777            } else {
1778                node.downstream
1779                    .push(make_table_column_node("_", &col_ref.column));
1780            }
1781        }
1782    }
1783}
1784
1785/// Resolve a FROM alias to the original CTE name.
1786///
1787/// When a query uses `FROM my_cte AS alias`, the scope's `sources` map contains
1788/// `"alias"` → CTE expression, but `cte_sources` only contains `"my_cte"`.
1789/// This function checks if `name` is such an alias and returns the CTE name.
1790fn resolve_cte_alias(scope: &Scope, name: &str) -> Option<String> {
1791    // If it's already a known CTE name, no resolution needed
1792    if scope.cte_sources.contains_key(name) {
1793        return None;
1794    }
1795    // Check if the source's expression is a CTE — if so, extract the CTE name
1796    if let Some(source_info) = scope.sources.get(name) {
1797        if source_info.is_scope {
1798            if let Expression::Cte(cte) = &source_info.expression {
1799                let cte_name = &cte.alias.name;
1800                if scope.cte_sources.contains_key(cte_name) {
1801                    return Some(cte_name.clone());
1802                }
1803            }
1804        }
1805    }
1806    None
1807}
1808
1809fn resolve_unqualified_column(
1810    node: &mut LineageNode,
1811    scope: &Scope,
1812    dialect: Option<DialectType>,
1813    col_name: &str,
1814    parent_name: &str,
1815    trim_selects: bool,
1816    all_cte_scopes: &[&Scope],
1817    depth: usize,
1818) {
1819    // Try to find which source this column belongs to.
1820    // Build the source list from the actual FROM/JOIN clauses to avoid
1821    // mixing in CTE definitions that are in scope but not referenced.
1822    let from_source_names = source_names_from_from_join(scope);
1823
1824    if let Some(tbl) = unique_virtual_source_for_column(scope, &from_source_names, col_name) {
1825        resolve_qualified_column(
1826            node,
1827            scope,
1828            dialect,
1829            &tbl,
1830            col_name,
1831            parent_name,
1832            trim_selects,
1833            all_cte_scopes,
1834            depth,
1835        );
1836        return;
1837    }
1838
1839    if from_source_names.len() == 1 {
1840        let tbl = &from_source_names[0];
1841        resolve_qualified_column(
1842            node,
1843            scope,
1844            dialect,
1845            tbl,
1846            col_name,
1847            parent_name,
1848            trim_selects,
1849            all_cte_scopes,
1850            depth,
1851        );
1852        return;
1853    }
1854
1855    // Multiple sources — can't resolve without schema info, add unqualified node
1856    let child = LineageNode::new(
1857        col_name.to_string(),
1858        Expression::Column(Box::new(crate::expressions::Column {
1859            name: crate::expressions::Identifier::new(col_name.to_string()),
1860            table: None,
1861            join_mark: false,
1862            trailing_comments: vec![],
1863            span: None,
1864            inferred_type: None,
1865        })),
1866        node.source.clone(),
1867    );
1868    node.downstream.push(child);
1869}
1870
1871fn unique_virtual_source_for_column(
1872    scope: &Scope,
1873    source_names: &[String],
1874    col_name: &str,
1875) -> Option<String> {
1876    let mut matches = source_names.iter().filter_map(|source_name| {
1877        let source = scope.sources.get(source_name)?;
1878        if source.kind == SourceKind::Virtual
1879            && virtual_source_output_columns(source)
1880                .any(|column| column.eq_ignore_ascii_case(col_name))
1881        {
1882            Some(source_name.clone())
1883        } else {
1884            None
1885        }
1886    });
1887
1888    let first = matches.next()?;
1889    if matches.next().is_none() {
1890        Some(first)
1891    } else {
1892        None
1893    }
1894}
1895
1896fn virtual_source_output_columns(
1897    source_info: &ScopeSourceInfo,
1898) -> Box<dyn Iterator<Item = String> + '_> {
1899    match &source_info.expression {
1900        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1901        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1902            Box::new(alias_output_columns(alias))
1903        }
1904        Expression::Lateral(lateral) => Box::new(lateral_output_columns(lateral)),
1905        Expression::LateralView(lateral_view) => {
1906            Box::new(lateral_view_output_columns(lateral_view))
1907        }
1908        _ => Box::new(source_info.alias.clone().into_iter()),
1909    }
1910}
1911
1912fn unnest_output_columns(
1913    unnest: &crate::expressions::UnnestFunc,
1914) -> impl Iterator<Item = String> + '_ {
1915    unnest
1916        .alias
1917        .iter()
1918        .map(|alias| alias.name.clone())
1919        .chain(unnest.offset_alias.iter().map(|alias| alias.name.clone()))
1920}
1921
1922fn alias_output_columns(
1923    alias: &crate::expressions::Alias,
1924) -> Box<dyn Iterator<Item = String> + '_> {
1925    if alias.column_aliases.is_empty() {
1926        Box::new(std::iter::once(alias.alias.name.clone()))
1927    } else {
1928        Box::new(
1929            alias
1930                .column_aliases
1931                .iter()
1932                .map(|column| column.name.clone()),
1933        )
1934    }
1935}
1936
1937fn lateral_output_columns(
1938    lateral: &crate::expressions::Lateral,
1939) -> Box<dyn Iterator<Item = String> + '_> {
1940    if lateral.column_aliases.is_empty() {
1941        default_virtual_output_columns(&lateral.this)
1942    } else {
1943        Box::new(lateral.column_aliases.iter().cloned())
1944    }
1945}
1946
1947fn lateral_view_output_columns(
1948    lateral_view: &crate::expressions::LateralView,
1949) -> Box<dyn Iterator<Item = String> + '_> {
1950    Box::new(
1951        lateral_view
1952            .column_aliases
1953            .iter()
1954            .map(|column| column.name.clone()),
1955    )
1956}
1957
1958fn default_virtual_output_columns(expr: &Expression) -> Box<dyn Iterator<Item = String> + '_> {
1959    match expr {
1960        Expression::Unnest(unnest) => Box::new(unnest_output_columns(unnest)),
1961        Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
1962            alias_output_columns(alias)
1963        }
1964        Expression::Function(function) if function.name.eq_ignore_ascii_case("FLATTEN") => {
1965            Box::new(
1966                ["seq", "key", "path", "index", "value", "this"]
1967                    .into_iter()
1968                    .map(String::from),
1969            )
1970        }
1971        _ => Box::new(std::iter::empty()),
1972    }
1973}
1974
1975fn attach_virtual_source_dependencies(
1976    node: &mut LineageNode,
1977    scope: &Scope,
1978    dialect: Option<DialectType>,
1979    source_alias: &str,
1980    source_expr: &Expression,
1981    trim_selects: bool,
1982    all_cte_scopes: &[&Scope],
1983    depth: usize,
1984) {
1985    let parent_name = node.name.clone();
1986    let mut seen = HashSet::new();
1987    for col_ref in find_column_refs_in_expr(source_expr, dialect) {
1988        let key = (
1989            col_ref.table.as_ref().map(|t| t.name.clone()),
1990            col_ref.column.clone(),
1991        );
1992        if !seen.insert(key) {
1993            continue;
1994        }
1995
1996        if let Some(table_id) = col_ref.table {
1997            let table = table_id.name;
1998            if table == source_alias {
1999                continue;
2000            }
2001            resolve_qualified_column(
2002                node,
2003                scope,
2004                dialect,
2005                &table,
2006                &col_ref.column,
2007                &parent_name,
2008                trim_selects,
2009                all_cte_scopes,
2010                depth + 1,
2011            );
2012        } else {
2013            let non_virtual_sources = non_virtual_source_names_from_from_join(scope);
2014            if non_virtual_sources.len() == 1 {
2015                resolve_qualified_column(
2016                    node,
2017                    scope,
2018                    dialect,
2019                    &non_virtual_sources[0],
2020                    &col_ref.column,
2021                    &parent_name,
2022                    trim_selects,
2023                    all_cte_scopes,
2024                    depth + 1,
2025                );
2026            }
2027        }
2028    }
2029}
2030
2031fn source_names_from_from_join(scope: &Scope) -> Vec<String> {
2032    fn source_name(expr: &Expression) -> Option<String> {
2033        match expr {
2034            Expression::Table(table) => Some(
2035                table
2036                    .alias
2037                    .as_ref()
2038                    .map(|a| a.name.clone())
2039                    .unwrap_or_else(|| table.name.name.clone()),
2040            ),
2041            Expression::Subquery(subquery) => {
2042                subquery.alias.as_ref().map(|alias| alias.name.clone())
2043            }
2044            Expression::Unnest(unnest) => unnest.alias.as_ref().map(|alias| alias.name.clone()),
2045            Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) => {
2046                Some(alias.alias.name.clone())
2047            }
2048            Expression::Alias(alias) if is_query_like_relation(&alias.this) => {
2049                Some(alias.alias.name.clone())
2050            }
2051            Expression::Lateral(lateral) => lateral.alias.clone(),
2052            Expression::LateralView(lateral_view) => lateral_view
2053                .table_alias
2054                .as_ref()
2055                .or_else(|| lateral_view.column_aliases.first())
2056                .map(|alias| alias.name.clone()),
2057            Expression::Pivot(pivot) => Some(pivot_lineage_source_name(
2058                &pivot.this,
2059                pivot.alias.as_ref().map(|alias| alias.name.as_str()),
2060            )),
2061            Expression::Unpivot(unpivot) => Some(pivot_lineage_source_name(
2062                &unpivot.this,
2063                unpivot.alias.as_ref().map(|alias| alias.name.as_str()),
2064            )),
2065            Expression::Paren(paren) => source_name(&paren.this),
2066            _ => None,
2067        }
2068    }
2069
2070    let effective_expr = match &scope.expression {
2071        Expression::Cte(cte) => &cte.this,
2072        expr => expr,
2073    };
2074
2075    let mut names = Vec::new();
2076    let mut seen = std::collections::HashSet::new();
2077
2078    if let Expression::Select(select) = effective_expr {
2079        if let Some(from) = &select.from {
2080            for expr in &from.expressions {
2081                if let Some(name) = source_name(expr) {
2082                    if !name.is_empty() && seen.insert(name.clone()) {
2083                        names.push(name);
2084                    }
2085                }
2086            }
2087        }
2088        for join in &select.joins {
2089            if is_semi_or_anti_join_kind(join.kind) {
2090                continue;
2091            }
2092            if let Some(name) = source_name(&join.this) {
2093                if !name.is_empty() && seen.insert(name.clone()) {
2094                    names.push(name);
2095                }
2096            }
2097        }
2098        for lateral_view in &select.lateral_views {
2099            if let Some(name) =
2100                source_name(&Expression::LateralView(Box::new(lateral_view.clone())))
2101            {
2102                if !name.is_empty() && seen.insert(name.clone()) {
2103                    names.push(name);
2104                }
2105            }
2106        }
2107    }
2108
2109    names
2110}
2111
2112fn is_semi_or_anti_join_kind(kind: JoinKind) -> bool {
2113    matches!(
2114        kind,
2115        JoinKind::Semi
2116            | JoinKind::Anti
2117            | JoinKind::LeftSemi
2118            | JoinKind::LeftAnti
2119            | JoinKind::RightSemi
2120            | JoinKind::RightAnti
2121    )
2122}
2123
2124fn is_query_like_relation(expr: &Expression) -> bool {
2125    match expr {
2126        Expression::Select(_)
2127        | Expression::Subquery(_)
2128        | Expression::Union(_)
2129        | Expression::Intersect(_)
2130        | Expression::Except(_) => true,
2131        Expression::Paren(paren) => is_query_like_relation(&paren.this),
2132        _ => false,
2133    }
2134}
2135
2136fn derived_source_query(expr: &Expression) -> Option<&Expression> {
2137    match expr {
2138        Expression::Subquery(subquery) => Some(&subquery.this),
2139        Expression::Alias(alias) if is_query_like_relation(&alias.this) => Some(&alias.this),
2140        Expression::Select(_)
2141        | Expression::Union(_)
2142        | Expression::Intersect(_)
2143        | Expression::Except(_) => Some(expr),
2144        Expression::Paren(paren) => derived_source_query(&paren.this),
2145        _ => None,
2146    }
2147}
2148
2149fn expressions_equivalent_after_wrappers(left: &Expression, right: &Expression) -> bool {
2150    left == right || effective_scope_expression(left) == effective_scope_expression(right)
2151}
2152
2153fn non_virtual_source_names_from_from_join(scope: &Scope) -> Vec<String> {
2154    source_names_from_from_join(scope)
2155        .into_iter()
2156        .filter(|name| {
2157            !matches!(
2158                scope.sources.get(name).map(|source| source.kind),
2159                Some(SourceKind::Virtual)
2160            )
2161        })
2162        .collect()
2163}
2164
2165// ---------------------------------------------------------------------------
2166// Helper functions
2167// ---------------------------------------------------------------------------
2168
2169/// Get the alias or name of an expression
2170fn get_alias_or_name(expr: &Expression) -> Option<String> {
2171    match expr {
2172        Expression::Alias(alias) => Some(alias.alias.name.clone()),
2173        Expression::Column(col) => Some(col.name.name.clone()),
2174        Expression::Identifier(id) => Some(id.name.clone()),
2175        Expression::Star(_) => Some("*".to_string()),
2176        // Annotated wraps an expression with trailing comments (e.g. `SELECT\n-- comment\na`).
2177        // Unwrap to get the actual column/alias name from the inner expression.
2178        Expression::Annotated(a) => get_alias_or_name(&a.this),
2179        _ => None,
2180    }
2181}
2182
2183fn find_prior_select_alias_expr(
2184    scope_expr: &Expression,
2185    target_expr: &Expression,
2186    alias_name: &str,
2187    dialect: Option<DialectType>,
2188) -> Option<Expression> {
2189    let Expression::Select(select) = scope_expr else {
2190        return None;
2191    };
2192
2193    let normalized_alias = normalize_column_name(alias_name, dialect);
2194    for expr in &select.expressions {
2195        if expr == target_expr {
2196            return None;
2197        }
2198
2199        if let Expression::Alias(alias) = expr {
2200            if normalize_column_name(&alias.alias.name, dialect) == normalized_alias {
2201                return Some(alias.this.clone());
2202            }
2203        }
2204    }
2205
2206    None
2207}
2208
2209/// Resolve the display name for a column reference.
2210fn resolve_column_name(column: &ColumnRef<'_>, select_expr: &Expression) -> String {
2211    match column {
2212        ColumnRef::Name(n) => n.to_string(),
2213        ColumnRef::Index(_) => get_alias_or_name(select_expr).unwrap_or_else(|| "?".to_string()),
2214    }
2215}
2216
2217/// Find the select expression matching a column reference.
2218fn find_select_expr(
2219    scope_expr: &Expression,
2220    column: &ColumnRef<'_>,
2221    dialect: Option<DialectType>,
2222) -> Result<Expression> {
2223    if let Expression::Select(ref select) = scope_expr {
2224        match column {
2225            ColumnRef::Name(name) => {
2226                let normalized_name = normalize_column_name(name, dialect);
2227                for expr in &select.expressions {
2228                    if let Some(alias_or_name) = get_alias_or_name(expr) {
2229                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
2230                            return Ok(expr.clone());
2231                        }
2232                    }
2233                }
2234                if let Some(expr) = synthesize_star_passthrough_expr(select, name) {
2235                    return Ok(expr);
2236                }
2237                Err(crate::error::Error::parse(
2238                    format!("Cannot find column '{}' in query", name),
2239                    0,
2240                    0,
2241                    0,
2242                    0,
2243                ))
2244            }
2245            ColumnRef::Index(idx) => select.expressions.get(*idx).cloned().ok_or_else(|| {
2246                crate::error::Error::parse(format!("Column index {} out of range", idx), 0, 0, 0, 0)
2247            }),
2248        }
2249    } else {
2250        Err(crate::error::Error::parse(
2251            "Expected SELECT expression for column lookup",
2252            0,
2253            0,
2254            0,
2255            0,
2256        ))
2257    }
2258}
2259
2260fn synthesize_star_passthrough_expr(select: &Select, name: &str) -> Option<Expression> {
2261    let sources = get_select_sources(select);
2262    if sources.is_empty() {
2263        return None;
2264    }
2265
2266    let mut candidate_aliases = Vec::new();
2267    let mut seen = HashSet::new();
2268
2269    for expr in &select.expressions {
2270        let aliases = match star_passthrough_source_aliases(expr, &sources) {
2271            StarPassthroughSources::None => continue,
2272            StarPassthroughSources::Ambiguous => return None,
2273            StarPassthroughSources::Aliases(aliases) => aliases,
2274        };
2275
2276        for alias in aliases {
2277            if seen.insert(alias.clone()) {
2278                candidate_aliases.push(alias);
2279            }
2280        }
2281    }
2282
2283    match candidate_aliases.as_slice() {
2284        [alias] => {
2285            let table = Identifier::new(alias.clone());
2286            Some(make_column_expr(name, Some(&table)))
2287        }
2288        _ => None,
2289    }
2290}
2291
2292enum StarPassthroughSources {
2293    None,
2294    Ambiguous,
2295    Aliases(Vec<String>),
2296}
2297
2298fn star_passthrough_source_aliases(
2299    expr: &Expression,
2300    sources: &[SourceInfo],
2301) -> StarPassthroughSources {
2302    match expr {
2303        Expression::Star(star) => star_source_aliases(star.table.as_ref(), sources),
2304        Expression::Column(column) if column.name.name == "*" => {
2305            star_source_aliases(column.table.as_ref(), sources)
2306        }
2307        Expression::Annotated(annotated) => {
2308            star_passthrough_source_aliases(&annotated.this, sources)
2309        }
2310        _ => StarPassthroughSources::None,
2311    }
2312}
2313
2314fn star_source_aliases(
2315    qualifier: Option<&Identifier>,
2316    sources: &[SourceInfo],
2317) -> StarPassthroughSources {
2318    if let Some(qualifier) = qualifier {
2319        let mut aliases = Vec::new();
2320
2321        for source in sources {
2322            if source_matches_star_qualifier(source, qualifier) {
2323                aliases.push(source.alias.clone());
2324            }
2325        }
2326
2327        return match aliases.len() {
2328            0 => StarPassthroughSources::None,
2329            1 => StarPassthroughSources::Aliases(aliases),
2330            _ => StarPassthroughSources::Ambiguous,
2331        };
2332    }
2333
2334    match sources {
2335        // Do not synthesize a source column for unresolved quoted table stars.
2336        // This keeps quoted CTE/table case semantics intact while still allowing
2337        // the schema-less fallback for common unquoted SELECT * passthroughs.
2338        [source] if source.quoted => StarPassthroughSources::None,
2339        [source] => StarPassthroughSources::Aliases(vec![source.alias.clone()]),
2340        [] => StarPassthroughSources::None,
2341        _ => StarPassthroughSources::Ambiguous,
2342    }
2343}
2344
2345fn source_matches_star_qualifier(source: &SourceInfo, qualifier: &Identifier) -> bool {
2346    if source.normalized == normalize_cte_name(qualifier) {
2347        return true;
2348    }
2349
2350    if qualifier.quoted {
2351        source.alias == qualifier.name
2352    } else {
2353        source.alias.eq_ignore_ascii_case(&qualifier.name)
2354    }
2355}
2356
2357/// Find the positional index of a column name in a set operation's first SELECT branch.
2358fn column_to_index(
2359    set_op_expr: &Expression,
2360    name: &str,
2361    dialect: Option<DialectType>,
2362) -> Result<usize> {
2363    let normalized_name = normalize_column_name(name, dialect);
2364    let mut expr = set_op_expr;
2365    loop {
2366        match expr {
2367            Expression::Union(u) => expr = &u.left,
2368            Expression::Intersect(i) => expr = &i.left,
2369            Expression::Except(e) => expr = &e.left,
2370            Expression::Subquery(subquery) => expr = &subquery.this,
2371            Expression::Cte(cte) => expr = &cte.this,
2372            Expression::Paren(paren) => expr = &paren.this,
2373            Expression::Select(select) => {
2374                for (i, e) in select.expressions.iter().enumerate() {
2375                    if let Some(alias_or_name) = get_alias_or_name(e) {
2376                        if normalize_column_name(&alias_or_name, dialect) == normalized_name {
2377                            return Ok(i);
2378                        }
2379                    }
2380                }
2381                return Err(crate::error::Error::parse(
2382                    format!("Cannot find column '{}' in set operation", name),
2383                    0,
2384                    0,
2385                    0,
2386                    0,
2387                ));
2388            }
2389            _ => {
2390                return Err(crate::error::Error::parse(
2391                    "Expected SELECT or set operation",
2392                    0,
2393                    0,
2394                    0,
2395                    0,
2396                ))
2397            }
2398        }
2399    }
2400}
2401
2402fn normalize_column_name(name: &str, dialect: Option<DialectType>) -> String {
2403    normalize_name(name, dialect, false, true)
2404}
2405
2406/// If trim_selects is enabled, return a copy of the SELECT with only the target column.
2407fn trim_source(select_expr: &Expression, target_expr: &Expression) -> Expression {
2408    if let Expression::Select(select) = select_expr {
2409        let mut trimmed = select.as_ref().clone();
2410        trimmed.expressions = vec![target_expr.clone()];
2411        Expression::Select(Box::new(trimmed))
2412    } else {
2413        select_expr.clone()
2414    }
2415}
2416
2417/// Find the child scope (CTE or derived table) for a given source name.
2418fn find_child_scope<'a>(scope: &'a Scope, source_name: &str) -> Option<&'a Scope> {
2419    // Check CTE scopes
2420    if scope.cte_sources.contains_key(source_name) {
2421        for cte_scope in &scope.cte_scopes {
2422            if let Expression::Cte(cte) = &cte_scope.expression {
2423                if cte.alias.name == source_name {
2424                    return Some(cte_scope);
2425                }
2426            }
2427        }
2428    }
2429
2430    // Check derived table scopes
2431    if let Some(source_info) = scope.sources.get(source_name) {
2432        if source_info.is_scope && !scope.cte_sources.contains_key(source_name) {
2433            if let Some(query) = derived_source_query(&source_info.expression) {
2434                for dt_scope in &scope.derived_table_scopes {
2435                    if expressions_equivalent_after_wrappers(&dt_scope.expression, query) {
2436                        return Some(dt_scope);
2437                    }
2438                }
2439            }
2440        }
2441    }
2442
2443    None
2444}
2445
2446/// Find a CTE scope by name, searching through a combined list of CTE scopes.
2447/// This handles nested CTEs where the current scope doesn't have the CTE scope
2448/// as a direct child but knows about it via cte_sources.
2449fn find_child_scope_in<'a>(
2450    all_cte_scopes: &[&'a Scope],
2451    scope: &'a Scope,
2452    source_name: &str,
2453) -> Option<&'a Scope> {
2454    // First try the scope's own cte_scopes
2455    for cte_scope in &scope.cte_scopes {
2456        if let Expression::Cte(cte) = &cte_scope.expression {
2457            if cte.alias.name == source_name {
2458                return Some(cte_scope);
2459            }
2460        }
2461    }
2462
2463    // Then search through all ancestor CTE scopes
2464    for cte_scope in all_cte_scopes {
2465        if let Expression::Cte(cte) = &cte_scope.expression {
2466            if cte.alias.name == source_name {
2467                return Some(cte_scope);
2468            }
2469        }
2470    }
2471
2472    // Fall back to derived table scopes
2473    if let Some(source_info) = scope.sources.get(source_name) {
2474        if source_info.is_scope {
2475            if let Some(query) = derived_source_query(&source_info.expression) {
2476                for dt_scope in &scope.derived_table_scopes {
2477                    if expressions_equivalent_after_wrappers(&dt_scope.expression, query) {
2478                        return Some(dt_scope);
2479                    }
2480                }
2481            }
2482        }
2483    }
2484
2485    None
2486}
2487
2488/// Create a terminal lineage node for a table.column reference.
2489fn make_table_column_node(table: &str, column: &str) -> LineageNode {
2490    let mut node = LineageNode::new(
2491        format!("{}.{}", table, column),
2492        Expression::Column(Box::new(crate::expressions::Column {
2493            name: crate::expressions::Identifier::new(column.to_string()),
2494            table: Some(crate::expressions::Identifier::new(table.to_string())),
2495            join_mark: false,
2496            trailing_comments: vec![],
2497            span: None,
2498            inferred_type: None,
2499        })),
2500        Expression::Table(Box::new(crate::expressions::TableRef::new(table))),
2501    );
2502    node.source_name = table.to_string();
2503    node.source_kind = SourceKind::Table;
2504    node
2505}
2506
2507fn table_name_from_table_ref(table_ref: &crate::expressions::TableRef) -> String {
2508    let mut parts: Vec<String> = Vec::new();
2509    if let Some(catalog) = &table_ref.catalog {
2510        parts.push(catalog.name.clone());
2511    }
2512    if let Some(schema) = &table_ref.schema {
2513        parts.push(schema.name.clone());
2514    }
2515    parts.push(table_ref.name.name.clone());
2516    parts.join(".")
2517}
2518
2519fn apply_source_info_context(
2520    node: &mut LineageNode,
2521    source_key: &str,
2522    source_info: &ScopeSourceInfo,
2523) {
2524    node.source_kind = source_info.kind;
2525    node.source_name =
2526        source_info
2527            .lineage_name
2528            .clone()
2529            .unwrap_or_else(|| match &source_info.expression {
2530                Expression::Table(table_ref) => table_name_from_table_ref(table_ref),
2531                _ => source_key.to_string(),
2532            });
2533    node.source_alias = source_info.alias.clone();
2534}
2535
2536fn make_table_column_node_from_source(
2537    source_key: &str,
2538    column: &str,
2539    source_info: &ScopeSourceInfo,
2540) -> LineageNode {
2541    let lineage_name = source_info.lineage_name.as_deref().unwrap_or(source_key);
2542    let mut node = LineageNode::new(
2543        format!("{}.{}", lineage_name, column),
2544        Expression::Column(Box::new(crate::expressions::Column {
2545            name: crate::expressions::Identifier::new(column.to_string()),
2546            table: Some(crate::expressions::Identifier::new(
2547                lineage_name.to_string(),
2548            )),
2549            join_mark: false,
2550            trailing_comments: vec![],
2551            span: None,
2552            inferred_type: None,
2553        })),
2554        source_info.expression.clone(),
2555    );
2556
2557    apply_source_info_context(&mut node, source_key, source_info);
2558
2559    node
2560}
2561
2562/// Simple column reference extracted from an expression
2563#[derive(Debug, Clone)]
2564struct SimpleColumnRef {
2565    table: Option<crate::expressions::Identifier>,
2566    column: String,
2567}
2568
2569/// Find all column references in an expression (does not recurse into subqueries).
2570fn find_column_refs_in_expr(
2571    expr: &Expression,
2572    dialect: Option<DialectType>,
2573) -> Vec<SimpleColumnRef> {
2574    let mut refs = Vec::new();
2575    collect_column_refs(expr, dialect, &mut refs, None);
2576    refs
2577}
2578
2579fn find_column_refs_in_expr_with_select(
2580    expr: &Expression,
2581    select_expr: &Expression,
2582    dialect: Option<DialectType>,
2583) -> Vec<SimpleColumnRef> {
2584    let named_windows = match select_expr {
2585        Expression::Select(select) => select.windows.as_deref(),
2586        _ => None,
2587    };
2588    let mut refs = Vec::new();
2589    collect_column_refs(expr, dialect, &mut refs, named_windows);
2590    refs
2591}
2592
2593fn is_bigquery_safe_namespace_receiver(expr: &Expression) -> bool {
2594    match expr {
2595        Expression::Column(col) => {
2596            col.table.is_none() && !col.name.quoted && col.name.name.eq_ignore_ascii_case("SAFE")
2597        }
2598        Expression::Identifier(id) => !id.quoted && id.name.eq_ignore_ascii_case("SAFE"),
2599        _ => false,
2600    }
2601}
2602
2603fn collect_column_refs(
2604    expr: &Expression,
2605    dialect: Option<DialectType>,
2606    refs: &mut Vec<SimpleColumnRef>,
2607    named_windows: Option<&[NamedWindow]>,
2608) {
2609    let mut stack: Vec<&Expression> = vec![expr];
2610
2611    while let Some(current) = stack.pop() {
2612        match current {
2613            // === Leaf: collect Column references ===
2614            Expression::Column(col) => {
2615                refs.push(SimpleColumnRef {
2616                    table: col.table.clone(),
2617                    column: col.name.name.clone(),
2618                });
2619            }
2620
2621            // === Boundary: don't recurse into subqueries (handled separately) ===
2622            Expression::Subquery(_) | Expression::Exists(_) => {}
2623
2624            // === BinaryOp variants: left, right ===
2625            Expression::And(op)
2626            | Expression::Or(op)
2627            | Expression::Eq(op)
2628            | Expression::Neq(op)
2629            | Expression::Lt(op)
2630            | Expression::Lte(op)
2631            | Expression::Gt(op)
2632            | Expression::Gte(op)
2633            | Expression::Add(op)
2634            | Expression::Sub(op)
2635            | Expression::Mul(op)
2636            | Expression::Div(op)
2637            | Expression::Mod(op)
2638            | Expression::BitwiseAnd(op)
2639            | Expression::BitwiseOr(op)
2640            | Expression::BitwiseXor(op)
2641            | Expression::BitwiseLeftShift(op)
2642            | Expression::BitwiseRightShift(op)
2643            | Expression::Concat(op)
2644            | Expression::Adjacent(op)
2645            | Expression::TsMatch(op)
2646            | Expression::PropertyEQ(op)
2647            | Expression::ArrayContainsAll(op)
2648            | Expression::ArrayContainedBy(op)
2649            | Expression::ArrayOverlaps(op)
2650            | Expression::JSONBContainsAllTopKeys(op)
2651            | Expression::JSONBContainsAnyTopKeys(op)
2652            | Expression::JSONBDeleteAtPath(op)
2653            | Expression::ExtendsLeft(op)
2654            | Expression::ExtendsRight(op)
2655            | Expression::Is(op)
2656            | Expression::MemberOf(op)
2657            | Expression::NullSafeEq(op)
2658            | Expression::NullSafeNeq(op)
2659            | Expression::Glob(op)
2660            | Expression::Match(op) => {
2661                stack.push(&op.left);
2662                stack.push(&op.right);
2663            }
2664
2665            // === UnaryOp variants: this ===
2666            Expression::Not(u) | Expression::Neg(u) | Expression::BitwiseNot(u) => {
2667                stack.push(&u.this);
2668            }
2669
2670            // === UnaryFunc variants: this ===
2671            Expression::Upper(f)
2672            | Expression::Lower(f)
2673            | Expression::Length(f)
2674            | Expression::LTrim(f)
2675            | Expression::RTrim(f)
2676            | Expression::Reverse(f)
2677            | Expression::Abs(f)
2678            | Expression::Sqrt(f)
2679            | Expression::Cbrt(f)
2680            | Expression::Ln(f)
2681            | Expression::Exp(f)
2682            | Expression::Sign(f)
2683            | Expression::Date(f)
2684            | Expression::Time(f)
2685            | Expression::DateFromUnixDate(f)
2686            | Expression::UnixDate(f)
2687            | Expression::UnixSeconds(f)
2688            | Expression::UnixMillis(f)
2689            | Expression::UnixMicros(f)
2690            | Expression::TimeStrToDate(f)
2691            | Expression::DateToDi(f)
2692            | Expression::DiToDate(f)
2693            | Expression::TsOrDiToDi(f)
2694            | Expression::TsOrDsToDatetime(f)
2695            | Expression::TsOrDsToTimestamp(f)
2696            | Expression::YearOfWeek(f)
2697            | Expression::YearOfWeekIso(f)
2698            | Expression::Initcap(f)
2699            | Expression::Ascii(f)
2700            | Expression::Chr(f)
2701            | Expression::Soundex(f)
2702            | Expression::ByteLength(f)
2703            | Expression::Hex(f)
2704            | Expression::LowerHex(f)
2705            | Expression::Unicode(f)
2706            | Expression::Radians(f)
2707            | Expression::Degrees(f)
2708            | Expression::Sin(f)
2709            | Expression::Cos(f)
2710            | Expression::Tan(f)
2711            | Expression::Asin(f)
2712            | Expression::Acos(f)
2713            | Expression::Atan(f)
2714            | Expression::IsNan(f)
2715            | Expression::IsInf(f)
2716            | Expression::ArrayLength(f)
2717            | Expression::ArraySize(f)
2718            | Expression::Cardinality(f)
2719            | Expression::ArrayReverse(f)
2720            | Expression::ArrayDistinct(f)
2721            | Expression::ArrayFlatten(f)
2722            | Expression::ArrayCompact(f)
2723            | Expression::Explode(f)
2724            | Expression::ExplodeOuter(f)
2725            | Expression::ToArray(f)
2726            | Expression::MapFromEntries(f)
2727            | Expression::MapKeys(f)
2728            | Expression::MapValues(f)
2729            | Expression::JsonArrayLength(f)
2730            | Expression::JsonKeys(f)
2731            | Expression::JsonType(f)
2732            | Expression::ParseJson(f)
2733            | Expression::ToJson(f)
2734            | Expression::Typeof(f)
2735            | Expression::BitwiseCount(f)
2736            | Expression::Year(f)
2737            | Expression::Month(f)
2738            | Expression::Day(f)
2739            | Expression::Hour(f)
2740            | Expression::Minute(f)
2741            | Expression::Second(f)
2742            | Expression::DayOfWeek(f)
2743            | Expression::DayOfWeekIso(f)
2744            | Expression::DayOfMonth(f)
2745            | Expression::DayOfYear(f)
2746            | Expression::WeekOfYear(f)
2747            | Expression::Quarter(f)
2748            | Expression::Epoch(f)
2749            | Expression::EpochMs(f)
2750            | Expression::TimeStrToUnix(f)
2751            | Expression::SHA(f)
2752            | Expression::SHA1Digest(f)
2753            | Expression::TimeToUnix(f)
2754            | Expression::JSONBool(f)
2755            | Expression::Int64(f)
2756            | Expression::MD5NumberLower64(f)
2757            | Expression::MD5NumberUpper64(f)
2758            | Expression::DateStrToDate(f)
2759            | Expression::DateToDateStr(f) => {
2760                stack.push(&f.this);
2761            }
2762
2763            // === BinaryFunc variants: this, expression ===
2764            Expression::Power(f)
2765            | Expression::NullIf(f)
2766            | Expression::IfNull(f)
2767            | Expression::Nvl(f)
2768            | Expression::UnixToTimeStr(f)
2769            | Expression::Contains(f)
2770            | Expression::StartsWith(f)
2771            | Expression::EndsWith(f)
2772            | Expression::Levenshtein(f)
2773            | Expression::ModFunc(f)
2774            | Expression::Atan2(f)
2775            | Expression::IntDiv(f)
2776            | Expression::AddMonths(f)
2777            | Expression::MonthsBetween(f)
2778            | Expression::NextDay(f)
2779            | Expression::ArrayContains(f)
2780            | Expression::ArrayPosition(f)
2781            | Expression::ArrayAppend(f)
2782            | Expression::ArrayPrepend(f)
2783            | Expression::ArrayUnion(f)
2784            | Expression::ArrayExcept(f)
2785            | Expression::ArrayRemove(f)
2786            | Expression::StarMap(f)
2787            | Expression::MapFromArrays(f)
2788            | Expression::MapContainsKey(f)
2789            | Expression::ElementAt(f)
2790            | Expression::JsonMergePatch(f)
2791            | Expression::JSONBContains(f)
2792            | Expression::JSONBExtract(f) => {
2793                stack.push(&f.this);
2794                stack.push(&f.expression);
2795            }
2796
2797            // === VarArgFunc variants: expressions ===
2798            Expression::Greatest(f)
2799            | Expression::Least(f)
2800            | Expression::Coalesce(f)
2801            | Expression::ArrayConcat(f)
2802            | Expression::ArrayIntersect(f)
2803            | Expression::ArrayZip(f)
2804            | Expression::MapConcat(f)
2805            | Expression::JsonArray(f) => {
2806                for e in &f.expressions {
2807                    stack.push(e);
2808                }
2809            }
2810
2811            // === AggFunc variants: this, filter, having_max, limit ===
2812            Expression::Sum(f)
2813            | Expression::Avg(f)
2814            | Expression::Min(f)
2815            | Expression::Max(f)
2816            | Expression::ArrayAgg(f)
2817            | Expression::CountIf(f)
2818            | Expression::Stddev(f)
2819            | Expression::StddevPop(f)
2820            | Expression::StddevSamp(f)
2821            | Expression::Variance(f)
2822            | Expression::VarPop(f)
2823            | Expression::VarSamp(f)
2824            | Expression::Median(f)
2825            | Expression::Mode(f)
2826            | Expression::First(f)
2827            | Expression::Last(f)
2828            | Expression::AnyValue(f)
2829            | Expression::ApproxDistinct(f)
2830            | Expression::ApproxCountDistinct(f)
2831            | Expression::LogicalAnd(f)
2832            | Expression::LogicalOr(f)
2833            | Expression::Skewness(f)
2834            | Expression::ArrayConcatAgg(f)
2835            | Expression::ArrayUniqueAgg(f)
2836            | Expression::BoolXorAgg(f)
2837            | Expression::BitwiseAndAgg(f)
2838            | Expression::BitwiseOrAgg(f)
2839            | Expression::BitwiseXorAgg(f) => {
2840                stack.push(&f.this);
2841                if let Some(ref filter) = f.filter {
2842                    stack.push(filter);
2843                }
2844                if let Some((ref expr, _)) = f.having_max {
2845                    stack.push(expr);
2846                }
2847                if let Some(ref limit) = f.limit {
2848                    stack.push(limit);
2849                }
2850            }
2851
2852            // === Generic Function / AggregateFunction: args ===
2853            Expression::Function(func) => {
2854                for arg in &func.args {
2855                    stack.push(arg);
2856                }
2857            }
2858            Expression::AggregateFunction(func) => {
2859                for arg in &func.args {
2860                    stack.push(arg);
2861                }
2862                if let Some(ref filter) = func.filter {
2863                    stack.push(filter);
2864                }
2865                if let Some(ref limit) = func.limit {
2866                    stack.push(limit);
2867                }
2868            }
2869
2870            // === WindowFunction: this (skip Over for lineage purposes) ===
2871            Expression::WindowFunction(wf) => {
2872                stack.push(&wf.this);
2873                for e in &wf.over.partition_by {
2874                    stack.push(e);
2875                }
2876                for e in &wf.over.order_by {
2877                    stack.push(&e.this);
2878                }
2879                if let Some(keep) = &wf.keep {
2880                    for e in &keep.order_by {
2881                        stack.push(&e.this);
2882                    }
2883                }
2884                if let (Some(window_name), Some(named_windows)) =
2885                    (&wf.over.window_name, named_windows)
2886                {
2887                    for named_window in named_windows {
2888                        if named_window
2889                            .name
2890                            .name
2891                            .eq_ignore_ascii_case(&window_name.name)
2892                        {
2893                            for e in &named_window.spec.partition_by {
2894                                stack.push(e);
2895                            }
2896                            for e in &named_window.spec.order_by {
2897                                stack.push(&e.this);
2898                            }
2899                        }
2900                    }
2901                }
2902            }
2903
2904            // === Containers and special expressions ===
2905            Expression::Alias(a) => {
2906                stack.push(&a.this);
2907            }
2908            Expression::Cast(c) | Expression::TryCast(c) | Expression::SafeCast(c) => {
2909                stack.push(&c.this);
2910                if let Some(ref fmt) = c.format {
2911                    stack.push(fmt);
2912                }
2913                if let Some(ref def) = c.default {
2914                    stack.push(def);
2915                }
2916            }
2917            Expression::Paren(p) => {
2918                stack.push(&p.this);
2919            }
2920            Expression::Annotated(a) => {
2921                stack.push(&a.this);
2922            }
2923            Expression::Case(case) => {
2924                if let Some(ref operand) = case.operand {
2925                    stack.push(operand);
2926                }
2927                for (cond, result) in &case.whens {
2928                    stack.push(cond);
2929                    stack.push(result);
2930                }
2931                if let Some(ref else_expr) = case.else_ {
2932                    stack.push(else_expr);
2933                }
2934            }
2935            Expression::Collation(c) => {
2936                stack.push(&c.this);
2937            }
2938            Expression::In(i) => {
2939                stack.push(&i.this);
2940                for e in &i.expressions {
2941                    stack.push(e);
2942                }
2943                if let Some(ref q) = i.query {
2944                    stack.push(q);
2945                }
2946                if let Some(ref u) = i.unnest {
2947                    stack.push(u);
2948                }
2949            }
2950            Expression::Between(b) => {
2951                stack.push(&b.this);
2952                stack.push(&b.low);
2953                stack.push(&b.high);
2954            }
2955            Expression::IsNull(n) => {
2956                stack.push(&n.this);
2957            }
2958            Expression::IsTrue(t) | Expression::IsFalse(t) => {
2959                stack.push(&t.this);
2960            }
2961            Expression::IsJson(j) => {
2962                stack.push(&j.this);
2963            }
2964            Expression::Like(l) | Expression::ILike(l) => {
2965                stack.push(&l.left);
2966                stack.push(&l.right);
2967                if let Some(ref esc) = l.escape {
2968                    stack.push(esc);
2969                }
2970            }
2971            Expression::SimilarTo(s) => {
2972                stack.push(&s.this);
2973                stack.push(&s.pattern);
2974                if let Some(ref esc) = s.escape {
2975                    stack.push(esc);
2976                }
2977            }
2978            Expression::Ordered(o) => {
2979                stack.push(&o.this);
2980            }
2981            Expression::Array(a) => {
2982                for e in &a.expressions {
2983                    stack.push(e);
2984                }
2985            }
2986            Expression::Tuple(t) => {
2987                for e in &t.expressions {
2988                    stack.push(e);
2989                }
2990            }
2991            Expression::Struct(s) => {
2992                for (_, e) in &s.fields {
2993                    stack.push(e);
2994                }
2995            }
2996            Expression::Subscript(s) => {
2997                stack.push(&s.this);
2998                stack.push(&s.index);
2999            }
3000            Expression::Dot(d) => {
3001                stack.push(&d.this);
3002            }
3003            Expression::MethodCall(m) => {
3004                if !matches!(dialect, Some(DialectType::BigQuery))
3005                    || !is_bigquery_safe_namespace_receiver(&m.this)
3006                {
3007                    stack.push(&m.this);
3008                }
3009                for arg in &m.args {
3010                    stack.push(arg);
3011                }
3012            }
3013            Expression::ArraySlice(s) => {
3014                stack.push(&s.this);
3015                if let Some(ref start) = s.start {
3016                    stack.push(start);
3017                }
3018                if let Some(ref end) = s.end {
3019                    stack.push(end);
3020                }
3021            }
3022            Expression::Lambda(l) => {
3023                stack.push(&l.body);
3024            }
3025            Expression::NamedArgument(n) => {
3026                stack.push(&n.value);
3027            }
3028            Expression::Lateral(l) => {
3029                stack.push(&l.this);
3030                if let Some(ref view) = l.view {
3031                    stack.push(view);
3032                }
3033                if let Some(ref outer) = l.outer {
3034                    stack.push(outer);
3035                }
3036                if let Some(ref ordinality) = l.ordinality {
3037                    stack.push(ordinality);
3038                }
3039            }
3040            Expression::LateralView(lv) => {
3041                stack.push(&lv.this);
3042            }
3043            Expression::TryCatch(t) => {
3044                for stmt in &t.try_body {
3045                    stack.push(stmt);
3046                }
3047                if let Some(catch_body) = &t.catch_body {
3048                    for stmt in catch_body {
3049                        stack.push(stmt);
3050                    }
3051                }
3052            }
3053            Expression::BracedWildcard(e) | Expression::ReturnStmt(e) => {
3054                stack.push(e);
3055            }
3056
3057            // === Custom function structs ===
3058            Expression::Substring(f) => {
3059                stack.push(&f.this);
3060                stack.push(&f.start);
3061                if let Some(ref len) = f.length {
3062                    stack.push(len);
3063                }
3064            }
3065            Expression::Trim(f) => {
3066                stack.push(&f.this);
3067                if let Some(ref chars) = f.characters {
3068                    stack.push(chars);
3069                }
3070            }
3071            Expression::Replace(f) => {
3072                stack.push(&f.this);
3073                stack.push(&f.old);
3074                stack.push(&f.new);
3075            }
3076            Expression::IfFunc(f) => {
3077                stack.push(&f.condition);
3078                stack.push(&f.true_value);
3079                if let Some(ref fv) = f.false_value {
3080                    stack.push(fv);
3081                }
3082            }
3083            Expression::Nvl2(f) => {
3084                stack.push(&f.this);
3085                stack.push(&f.true_value);
3086                stack.push(&f.false_value);
3087            }
3088            Expression::ConcatWs(f) => {
3089                stack.push(&f.separator);
3090                for e in &f.expressions {
3091                    stack.push(e);
3092                }
3093            }
3094            Expression::Count(f) => {
3095                if let Some(ref this) = f.this {
3096                    stack.push(this);
3097                }
3098                if let Some(ref filter) = f.filter {
3099                    stack.push(filter);
3100                }
3101            }
3102            Expression::GroupConcat(f) => {
3103                stack.push(&f.this);
3104                if let Some(ref sep) = f.separator {
3105                    stack.push(sep);
3106                }
3107                if let Some(ref filter) = f.filter {
3108                    stack.push(filter);
3109                }
3110            }
3111            Expression::StringAgg(f) => {
3112                stack.push(&f.this);
3113                if let Some(ref sep) = f.separator {
3114                    stack.push(sep);
3115                }
3116                if let Some(ref filter) = f.filter {
3117                    stack.push(filter);
3118                }
3119                if let Some(ref limit) = f.limit {
3120                    stack.push(limit);
3121                }
3122            }
3123            Expression::ListAgg(f) => {
3124                stack.push(&f.this);
3125                if let Some(ref sep) = f.separator {
3126                    stack.push(sep);
3127                }
3128                if let Some(ref filter) = f.filter {
3129                    stack.push(filter);
3130                }
3131            }
3132            Expression::SumIf(f) => {
3133                stack.push(&f.this);
3134                stack.push(&f.condition);
3135                if let Some(ref filter) = f.filter {
3136                    stack.push(filter);
3137                }
3138            }
3139            Expression::DateAdd(f) | Expression::DateSub(f) => {
3140                stack.push(&f.this);
3141                stack.push(&f.interval);
3142            }
3143            Expression::DateDiff(f) => {
3144                stack.push(&f.this);
3145                stack.push(&f.expression);
3146            }
3147            Expression::DateTrunc(f) | Expression::TimestampTrunc(f) => {
3148                stack.push(&f.this);
3149            }
3150            Expression::Extract(f) => {
3151                stack.push(&f.this);
3152            }
3153            Expression::Round(f) => {
3154                stack.push(&f.this);
3155                if let Some(ref d) = f.decimals {
3156                    stack.push(d);
3157                }
3158            }
3159            Expression::Floor(f) => {
3160                stack.push(&f.this);
3161                if let Some(ref s) = f.scale {
3162                    stack.push(s);
3163                }
3164                if let Some(ref t) = f.to {
3165                    stack.push(t);
3166                }
3167            }
3168            Expression::Ceil(f) => {
3169                stack.push(&f.this);
3170                if let Some(ref d) = f.decimals {
3171                    stack.push(d);
3172                }
3173                if let Some(ref t) = f.to {
3174                    stack.push(t);
3175                }
3176            }
3177            Expression::Log(f) => {
3178                stack.push(&f.this);
3179                if let Some(ref b) = f.base {
3180                    stack.push(b);
3181                }
3182            }
3183            Expression::AtTimeZone(f) => {
3184                stack.push(&f.this);
3185                stack.push(&f.zone);
3186            }
3187            Expression::Lead(f) | Expression::Lag(f) => {
3188                stack.push(&f.this);
3189                if let Some(ref off) = f.offset {
3190                    stack.push(off);
3191                }
3192                if let Some(ref def) = f.default {
3193                    stack.push(def);
3194                }
3195            }
3196            Expression::FirstValue(f) | Expression::LastValue(f) => {
3197                stack.push(&f.this);
3198            }
3199            Expression::NthValue(f) => {
3200                stack.push(&f.this);
3201                stack.push(&f.offset);
3202            }
3203            Expression::Position(f) => {
3204                stack.push(&f.substring);
3205                stack.push(&f.string);
3206                if let Some(ref start) = f.start {
3207                    stack.push(start);
3208                }
3209            }
3210            Expression::Decode(f) => {
3211                stack.push(&f.this);
3212                for (search, result) in &f.search_results {
3213                    stack.push(search);
3214                    stack.push(result);
3215                }
3216                if let Some(ref def) = f.default {
3217                    stack.push(def);
3218                }
3219            }
3220            Expression::CharFunc(f) => {
3221                for arg in &f.args {
3222                    stack.push(arg);
3223                }
3224            }
3225            Expression::ArraySort(f) => {
3226                stack.push(&f.this);
3227                if let Some(ref cmp) = f.comparator {
3228                    stack.push(cmp);
3229                }
3230            }
3231            Expression::ArrayJoin(f) | Expression::ArrayToString(f) => {
3232                stack.push(&f.this);
3233                stack.push(&f.separator);
3234                if let Some(ref nr) = f.null_replacement {
3235                    stack.push(nr);
3236                }
3237            }
3238            Expression::ArrayFilter(f) => {
3239                stack.push(&f.this);
3240                stack.push(&f.filter);
3241            }
3242            Expression::ArrayTransform(f) => {
3243                stack.push(&f.this);
3244                stack.push(&f.transform);
3245            }
3246            Expression::Sequence(f)
3247            | Expression::Generate(f)
3248            | Expression::ExplodingGenerateSeries(f) => {
3249                stack.push(&f.start);
3250                stack.push(&f.stop);
3251                if let Some(ref step) = f.step {
3252                    stack.push(step);
3253                }
3254            }
3255            Expression::JsonExtract(f)
3256            | Expression::JsonExtractScalar(f)
3257            | Expression::JsonQuery(f)
3258            | Expression::JsonValue(f) => {
3259                stack.push(&f.this);
3260                stack.push(&f.path);
3261            }
3262            Expression::JsonExtractPath(f) | Expression::JsonRemove(f) => {
3263                stack.push(&f.this);
3264                for p in &f.paths {
3265                    stack.push(p);
3266                }
3267            }
3268            Expression::JsonObject(f) => {
3269                for (k, v) in &f.pairs {
3270                    stack.push(k);
3271                    stack.push(v);
3272                }
3273            }
3274            Expression::JsonSet(f) | Expression::JsonInsert(f) => {
3275                stack.push(&f.this);
3276                for (path, val) in &f.path_values {
3277                    stack.push(path);
3278                    stack.push(val);
3279                }
3280            }
3281            Expression::Overlay(f) => {
3282                stack.push(&f.this);
3283                stack.push(&f.replacement);
3284                stack.push(&f.from);
3285                if let Some(ref len) = f.length {
3286                    stack.push(len);
3287                }
3288            }
3289            Expression::Convert(f) => {
3290                stack.push(&f.this);
3291                if let Some(ref style) = f.style {
3292                    stack.push(style);
3293                }
3294            }
3295            Expression::ApproxPercentile(f) => {
3296                stack.push(&f.this);
3297                stack.push(&f.percentile);
3298                if let Some(ref acc) = f.accuracy {
3299                    stack.push(acc);
3300                }
3301                if let Some(ref filter) = f.filter {
3302                    stack.push(filter);
3303                }
3304            }
3305            Expression::Percentile(f)
3306            | Expression::PercentileCont(f)
3307            | Expression::PercentileDisc(f) => {
3308                stack.push(&f.this);
3309                stack.push(&f.percentile);
3310                if let Some(ref filter) = f.filter {
3311                    stack.push(filter);
3312                }
3313            }
3314            Expression::WithinGroup(f) => {
3315                stack.push(&f.this);
3316                for e in &f.order_by {
3317                    stack.push(&e.this);
3318                }
3319            }
3320            Expression::Left(f) | Expression::Right(f) => {
3321                stack.push(&f.this);
3322                stack.push(&f.length);
3323            }
3324            Expression::Repeat(f) => {
3325                stack.push(&f.this);
3326                stack.push(&f.times);
3327            }
3328            Expression::Lpad(f) | Expression::Rpad(f) => {
3329                stack.push(&f.this);
3330                stack.push(&f.length);
3331                if let Some(ref fill) = f.fill {
3332                    stack.push(fill);
3333                }
3334            }
3335            Expression::Split(f) => {
3336                stack.push(&f.this);
3337                stack.push(&f.delimiter);
3338            }
3339            Expression::RegexpLike(f) => {
3340                stack.push(&f.this);
3341                stack.push(&f.pattern);
3342                if let Some(ref flags) = f.flags {
3343                    stack.push(flags);
3344                }
3345            }
3346            Expression::RegexpReplace(f) => {
3347                stack.push(&f.this);
3348                stack.push(&f.pattern);
3349                stack.push(&f.replacement);
3350                if let Some(ref flags) = f.flags {
3351                    stack.push(flags);
3352                }
3353            }
3354            Expression::RegexpExtract(f) => {
3355                stack.push(&f.this);
3356                stack.push(&f.pattern);
3357                if let Some(ref group) = f.group {
3358                    stack.push(group);
3359                }
3360            }
3361            Expression::ToDate(f) => {
3362                stack.push(&f.this);
3363                if let Some(ref fmt) = f.format {
3364                    stack.push(fmt);
3365                }
3366            }
3367            Expression::ToTimestamp(f) => {
3368                stack.push(&f.this);
3369                if let Some(ref fmt) = f.format {
3370                    stack.push(fmt);
3371                }
3372            }
3373            Expression::DateFormat(f) | Expression::FormatDate(f) => {
3374                stack.push(&f.this);
3375                stack.push(&f.format);
3376            }
3377            Expression::LastDay(f) => {
3378                stack.push(&f.this);
3379            }
3380            Expression::FromUnixtime(f) => {
3381                stack.push(&f.this);
3382                if let Some(ref fmt) = f.format {
3383                    stack.push(fmt);
3384                }
3385            }
3386            Expression::UnixTimestamp(f) => {
3387                if let Some(ref this) = f.this {
3388                    stack.push(this);
3389                }
3390                if let Some(ref fmt) = f.format {
3391                    stack.push(fmt);
3392                }
3393            }
3394            Expression::MakeDate(f) => {
3395                stack.push(&f.year);
3396                stack.push(&f.month);
3397                stack.push(&f.day);
3398            }
3399            Expression::MakeTimestamp(f) => {
3400                stack.push(&f.year);
3401                stack.push(&f.month);
3402                stack.push(&f.day);
3403                stack.push(&f.hour);
3404                stack.push(&f.minute);
3405                stack.push(&f.second);
3406                if let Some(ref tz) = f.timezone {
3407                    stack.push(tz);
3408                }
3409            }
3410            Expression::TruncFunc(f) => {
3411                stack.push(&f.this);
3412                if let Some(ref d) = f.decimals {
3413                    stack.push(d);
3414                }
3415            }
3416            Expression::ArrayFunc(f) => {
3417                for e in &f.expressions {
3418                    stack.push(e);
3419                }
3420            }
3421            Expression::Unnest(f) => {
3422                stack.push(&f.this);
3423                for e in &f.expressions {
3424                    stack.push(e);
3425                }
3426            }
3427            Expression::StructFunc(f) => {
3428                for (_, e) in &f.fields {
3429                    stack.push(e);
3430                }
3431            }
3432            Expression::StructExtract(f) => {
3433                stack.push(&f.this);
3434            }
3435            Expression::NamedStruct(f) => {
3436                for (k, v) in &f.pairs {
3437                    stack.push(k);
3438                    stack.push(v);
3439                }
3440            }
3441            Expression::MapFunc(f) => {
3442                for k in &f.keys {
3443                    stack.push(k);
3444                }
3445                for v in &f.values {
3446                    stack.push(v);
3447                }
3448            }
3449            Expression::TransformKeys(f) | Expression::TransformValues(f) => {
3450                stack.push(&f.this);
3451                stack.push(&f.transform);
3452            }
3453            Expression::JsonArrayAgg(f) => {
3454                stack.push(&f.this);
3455                if let Some(ref filter) = f.filter {
3456                    stack.push(filter);
3457                }
3458            }
3459            Expression::JsonObjectAgg(f) => {
3460                stack.push(&f.key);
3461                stack.push(&f.value);
3462                if let Some(ref filter) = f.filter {
3463                    stack.push(filter);
3464                }
3465            }
3466            Expression::NTile(f) => {
3467                if let Some(ref n) = f.num_buckets {
3468                    stack.push(n);
3469                }
3470            }
3471            Expression::Rand(f) => {
3472                if let Some(ref s) = f.seed {
3473                    stack.push(s);
3474                }
3475                if let Some(ref lo) = f.lower {
3476                    stack.push(lo);
3477                }
3478                if let Some(ref hi) = f.upper {
3479                    stack.push(hi);
3480                }
3481            }
3482            Expression::Any(q) | Expression::All(q) => {
3483                stack.push(&q.this);
3484                stack.push(&q.subquery);
3485            }
3486            Expression::Overlaps(o) => {
3487                if let Some(ref this) = o.this {
3488                    stack.push(this);
3489                }
3490                if let Some(ref expr) = o.expression {
3491                    stack.push(expr);
3492                }
3493                if let Some(ref ls) = o.left_start {
3494                    stack.push(ls);
3495                }
3496                if let Some(ref le) = o.left_end {
3497                    stack.push(le);
3498                }
3499                if let Some(ref rs) = o.right_start {
3500                    stack.push(rs);
3501                }
3502                if let Some(ref re) = o.right_end {
3503                    stack.push(re);
3504                }
3505            }
3506            Expression::Interval(i) => {
3507                if let Some(ref this) = i.this {
3508                    stack.push(this);
3509                }
3510            }
3511            Expression::TimeStrToTime(f) => {
3512                stack.push(&f.this);
3513                if let Some(ref zone) = f.zone {
3514                    stack.push(zone);
3515                }
3516            }
3517            Expression::JSONBExtractScalar(f) => {
3518                stack.push(&f.this);
3519                stack.push(&f.expression);
3520                if let Some(ref jt) = f.json_type {
3521                    stack.push(jt);
3522                }
3523            }
3524            Expression::JSONExtract(f) => {
3525                stack.push(&f.this);
3526                stack.push(&f.expression);
3527                for e in &f.expressions {
3528                    stack.push(e);
3529                }
3530                if let Some(ref option) = f.option {
3531                    stack.push(option);
3532                }
3533                if let Some(ref on_condition) = f.on_condition {
3534                    stack.push(on_condition);
3535                }
3536            }
3537
3538            // === True leaves and non-expression-bearing nodes ===
3539            // Literals, Identifier, Star, DataType, Placeholder, Boolean, Null,
3540            // CurrentDate/Time/Timestamp, RowNumber, Rank, DenseRank, PercentRank,
3541            // CumeDist, Random, Pi, SessionUser, DDL statements, clauses, etc.
3542            _ => {}
3543        }
3544    }
3545}
3546
3547// ---------------------------------------------------------------------------
3548// Tests
3549// ---------------------------------------------------------------------------
3550
3551#[cfg(test)]
3552mod tests {
3553    use super::*;
3554    use crate::dialects::{Dialect, DialectType};
3555    use crate::expressions::DataType;
3556    use crate::optimizer::annotate_types::annotate_types;
3557    use crate::parse_one;
3558    use crate::schema::{MappingSchema, Schema};
3559
3560    fn parse(sql: &str) -> Expression {
3561        let dialect = Dialect::get(DialectType::Generic);
3562        let ast = dialect.parse(sql).unwrap();
3563        ast.into_iter().next().unwrap()
3564    }
3565
3566    fn parse_dialect(sql: &str, dialect_type: DialectType) -> Expression {
3567        let dialect = Dialect::get(dialect_type);
3568        let ast = dialect.parse(sql).unwrap();
3569        ast.into_iter().next().unwrap()
3570    }
3571
3572    fn lineage_names(node: &LineageNode) -> Vec<String> {
3573        node.walk().map(|n| n.name.clone()).collect()
3574    }
3575
3576    fn assert_lineage_contains(node: &LineageNode, expected: &str) {
3577        let names = lineage_names(node);
3578        assert!(
3579            names.iter().any(|name| name == expected),
3580            "expected {expected} in lineage, got {names:?}"
3581        );
3582    }
3583
3584    #[test]
3585    fn test_simple_lineage() {
3586        let expr = parse("SELECT a FROM t");
3587        let node = lineage("a", &expr, None, false).unwrap();
3588
3589        assert_eq!(node.name, "a");
3590        assert!(!node.downstream.is_empty(), "Should have downstream nodes");
3591        // Should trace to t.a
3592        let names = node.downstream_names();
3593        assert!(
3594            names.iter().any(|n| n == "t.a"),
3595            "Expected t.a in downstream, got: {:?}",
3596            names
3597        );
3598    }
3599
3600    #[test]
3601    fn test_lineage_walk() {
3602        let root = LineageNode {
3603            name: "col_a".to_string(),
3604            expression: Expression::Null(crate::expressions::Null),
3605            source: Expression::Null(crate::expressions::Null),
3606            downstream: vec![LineageNode::new(
3607                "t.a",
3608                Expression::Null(crate::expressions::Null),
3609                Expression::Null(crate::expressions::Null),
3610            )],
3611            source_name: String::new(),
3612            source_kind: SourceKind::Unknown,
3613            source_alias: None,
3614            reference_node_name: String::new(),
3615        };
3616
3617        let names: Vec<_> = root.walk().map(|n| n.name.clone()).collect();
3618        assert_eq!(names.len(), 2);
3619        assert_eq!(names[0], "col_a");
3620        assert_eq!(names[1], "t.a");
3621    }
3622
3623    #[test]
3624    fn test_aliased_column() {
3625        let expr = parse("SELECT a + 1 AS b FROM t");
3626        let node = lineage("b", &expr, None, false).unwrap();
3627
3628        assert_eq!(node.name, "b");
3629        // Should trace through the expression to t.a
3630        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
3631        assert!(
3632            all_names.iter().any(|n| n.contains("a")),
3633            "Expected to trace to column a, got: {:?}",
3634            all_names
3635        );
3636    }
3637
3638    #[test]
3639    fn test_qualified_column() {
3640        let expr = parse("SELECT t.a FROM t");
3641        let node = lineage("a", &expr, None, false).unwrap();
3642
3643        assert_eq!(node.name, "a");
3644        let names = node.downstream_names();
3645        assert!(
3646            names.iter().any(|n| n == "t.a"),
3647            "Expected t.a, got: {:?}",
3648            names
3649        );
3650    }
3651
3652    #[test]
3653    fn test_unqualified_column() {
3654        let expr = parse("SELECT a FROM t");
3655        let node = lineage("a", &expr, None, false).unwrap();
3656
3657        // Unqualified but single source → resolved to t.a
3658        let names = node.downstream_names();
3659        assert!(
3660            names.iter().any(|n| n == "t.a"),
3661            "Expected t.a, got: {:?}",
3662            names
3663        );
3664    }
3665
3666    #[test]
3667    fn test_lineage_with_schema_qualifies_root_expression_issue_40() {
3668        let query = "SELECT name FROM users";
3669        let dialect = Dialect::get(DialectType::BigQuery);
3670        let expr = dialect
3671            .parse(query)
3672            .unwrap()
3673            .into_iter()
3674            .next()
3675            .expect("expected one expression");
3676
3677        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3678        schema
3679            .add_table("users", &[("name".into(), DataType::Text)], None)
3680            .expect("schema setup");
3681
3682        let node_without_schema = lineage("name", &expr, Some(DialectType::BigQuery), false)
3683            .expect("lineage without schema");
3684        let mut expr_without = node_without_schema.expression.clone();
3685        annotate_types(
3686            &mut expr_without,
3687            Some(&schema),
3688            Some(DialectType::BigQuery),
3689        );
3690        assert_eq!(
3691            expr_without.inferred_type(),
3692            None,
3693            "Expected unresolved root type without schema-aware lineage qualification"
3694        );
3695
3696        let node_with_schema = lineage_with_schema(
3697            "name",
3698            &expr,
3699            Some(&schema),
3700            Some(DialectType::BigQuery),
3701            false,
3702        )
3703        .expect("lineage with schema");
3704        let mut expr_with = node_with_schema.expression.clone();
3705        annotate_types(&mut expr_with, Some(&schema), Some(DialectType::BigQuery));
3706
3707        assert_eq!(expr_with.inferred_type(), Some(&DataType::Text));
3708    }
3709
3710    #[test]
3711    fn test_lineage_with_schema_tolerates_partial_schema_for_known_column() {
3712        let expr = parse_dialect("SELECT order_id, amount FROM t", DialectType::DuckDB);
3713        let mut schema = MappingSchema::with_dialect(DialectType::DuckDB);
3714        schema
3715            .add_table(
3716                "t",
3717                &[("amount".into(), DataType::BigInt { length: None })],
3718                None,
3719            )
3720            .expect("schema setup");
3721
3722        let node = lineage_with_schema(
3723            "amount",
3724            &expr,
3725            Some(&schema),
3726            Some(DialectType::DuckDB),
3727            false,
3728        )
3729        .expect("lineage_with_schema should tolerate unrelated unknown columns");
3730
3731        assert_lineage_contains(&node, "t.amount");
3732    }
3733
3734    #[test]
3735    fn test_lineage_with_schema_tolerates_partial_schema_for_unknown_column() {
3736        let expr = parse_dialect("SELECT order_id, amount FROM t", DialectType::DuckDB);
3737        let mut schema = MappingSchema::with_dialect(DialectType::DuckDB);
3738        schema
3739            .add_table(
3740                "t",
3741                &[("amount".into(), DataType::BigInt { length: None })],
3742                None,
3743            )
3744            .expect("schema setup");
3745
3746        let node = lineage_with_schema(
3747            "order_id",
3748            &expr,
3749            Some(&schema),
3750            Some(DialectType::DuckDB),
3751            false,
3752        )
3753        .expect("lineage_with_schema should keep unknown selected columns");
3754
3755        assert_lineage_contains(&node, "t.order_id");
3756    }
3757
3758    #[test]
3759    fn test_lineage_with_schema_tolerates_partial_schema_for_join_conditions() {
3760        let expr = parse_dialect(
3761            "SELECT a.order_id, b.amount FROM t a JOIN u b ON a.id = b.id",
3762            DialectType::DuckDB,
3763        );
3764        let mut schema = MappingSchema::with_dialect(DialectType::DuckDB);
3765        schema
3766            .add_table(
3767                "t",
3768                &[("order_id".into(), DataType::BigInt { length: None })],
3769                None,
3770            )
3771            .expect("schema setup");
3772        schema
3773            .add_table(
3774                "u",
3775                &[("amount".into(), DataType::BigInt { length: None })],
3776                None,
3777            )
3778            .expect("schema setup");
3779
3780        let node = lineage_with_schema(
3781            "amount",
3782            &expr,
3783            Some(&schema),
3784            Some(DialectType::DuckDB),
3785            false,
3786        )
3787        .expect("lineage_with_schema should tolerate unknown join keys");
3788
3789        assert_lineage_contains(&node, "b.amount");
3790    }
3791
3792    #[test]
3793    fn test_lineage_with_schema_correlated_scalar_subquery() {
3794        let query = "SELECT id, (SELECT AVG(val) FROM t2 WHERE t2.id = t1.id) AS avg_val FROM t1";
3795        let dialect = Dialect::get(DialectType::BigQuery);
3796        let expr = dialect
3797            .parse(query)
3798            .unwrap()
3799            .into_iter()
3800            .next()
3801            .expect("expected one expression");
3802
3803        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3804        schema
3805            .add_table(
3806                "t1",
3807                &[("id".into(), DataType::BigInt { length: None })],
3808                None,
3809            )
3810            .expect("schema setup");
3811        schema
3812            .add_table(
3813                "t2",
3814                &[
3815                    ("id".into(), DataType::BigInt { length: None }),
3816                    ("val".into(), DataType::BigInt { length: None }),
3817                ],
3818                None,
3819            )
3820            .expect("schema setup");
3821
3822        let node = lineage_with_schema(
3823            "id",
3824            &expr,
3825            Some(&schema),
3826            Some(DialectType::BigQuery),
3827            false,
3828        )
3829        .expect("lineage_with_schema should handle correlated scalar subqueries");
3830
3831        assert_eq!(node.name, "id");
3832    }
3833
3834    #[test]
3835    fn test_lineage_with_schema_join_using() {
3836        let query = "SELECT a FROM t1 JOIN t2 USING(a)";
3837        let dialect = Dialect::get(DialectType::BigQuery);
3838        let expr = dialect
3839            .parse(query)
3840            .unwrap()
3841            .into_iter()
3842            .next()
3843            .expect("expected one expression");
3844
3845        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3846        schema
3847            .add_table(
3848                "t1",
3849                &[("a".into(), DataType::BigInt { length: None })],
3850                None,
3851            )
3852            .expect("schema setup");
3853        schema
3854            .add_table(
3855                "t2",
3856                &[("a".into(), DataType::BigInt { length: None })],
3857                None,
3858            )
3859            .expect("schema setup");
3860
3861        let node = lineage_with_schema(
3862            "a",
3863            &expr,
3864            Some(&schema),
3865            Some(DialectType::BigQuery),
3866            false,
3867        )
3868        .expect("lineage_with_schema should handle JOIN USING");
3869
3870        assert_eq!(node.name, "a");
3871    }
3872
3873    #[test]
3874    fn test_lineage_with_schema_qualified_table_name() {
3875        let query = "SELECT a FROM raw.t1";
3876        let dialect = Dialect::get(DialectType::BigQuery);
3877        let expr = dialect
3878            .parse(query)
3879            .unwrap()
3880            .into_iter()
3881            .next()
3882            .expect("expected one expression");
3883
3884        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3885        schema
3886            .add_table(
3887                "raw.t1",
3888                &[("a".into(), DataType::BigInt { length: None })],
3889                None,
3890            )
3891            .expect("schema setup");
3892
3893        let node = lineage_with_schema(
3894            "a",
3895            &expr,
3896            Some(&schema),
3897            Some(DialectType::BigQuery),
3898            false,
3899        )
3900        .expect("lineage_with_schema should handle dotted schema.table names");
3901
3902        assert_eq!(node.name, "a");
3903    }
3904
3905    #[test]
3906    fn test_lineage_with_schema_none_matches_lineage() {
3907        let expr = parse("SELECT a FROM t");
3908        let baseline = lineage("a", &expr, None, false).expect("lineage baseline");
3909        let with_none =
3910            lineage_with_schema("a", &expr, None, None, false).expect("lineage_with_schema");
3911
3912        assert_eq!(with_none.name, baseline.name);
3913        assert_eq!(with_none.downstream_names(), baseline.downstream_names());
3914    }
3915
3916    #[test]
3917    fn test_lineage_with_schema_bigquery_mixed_case_column_names_issue_60() {
3918        let dialect = Dialect::get(DialectType::BigQuery);
3919        let expr = dialect
3920            .parse("SELECT Name AS name FROM teams")
3921            .unwrap()
3922            .into_iter()
3923            .next()
3924            .expect("expected one expression");
3925
3926        let mut schema = MappingSchema::with_dialect(DialectType::BigQuery);
3927        schema
3928            .add_table(
3929                "teams",
3930                &[("Name".into(), DataType::String { length: None })],
3931                None,
3932            )
3933            .expect("schema setup");
3934
3935        let node = lineage_with_schema(
3936            "name",
3937            &expr,
3938            Some(&schema),
3939            Some(DialectType::BigQuery),
3940            false,
3941        )
3942        .expect("lineage_with_schema should resolve mixed-case BigQuery columns");
3943
3944        let names = node.downstream_names();
3945        assert!(
3946            names.iter().any(|n| n == "teams.Name"),
3947            "Expected teams.Name in downstream, got: {:?}",
3948            names
3949        );
3950    }
3951
3952    #[test]
3953    fn test_lineage_bigquery_mixed_case_alias_lookup() {
3954        let dialect = Dialect::get(DialectType::BigQuery);
3955        let expr = dialect
3956            .parse("SELECT Name AS Name FROM teams")
3957            .unwrap()
3958            .into_iter()
3959            .next()
3960            .expect("expected one expression");
3961
3962        let node = lineage("name", &expr, Some(DialectType::BigQuery), false)
3963            .expect("lineage should resolve mixed-case aliases in BigQuery");
3964
3965        assert_eq!(node.name, "name");
3966    }
3967
3968    #[test]
3969    fn test_lineage_bigquery_unnest_alias_source_issue_209() {
3970        let expr = parse_one(
3971            r#"
3972SELECT date_val AS week_start
3973FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
3974"#,
3975            DialectType::BigQuery,
3976        )
3977        .expect("parse");
3978
3979        let node = lineage("week_start", &expr, Some(DialectType::BigQuery), false)
3980            .expect("lineage should resolve UNNEST alias as a source");
3981        let child = node
3982            .downstream
3983            .first()
3984            .expect("week_start should have downstream lineage");
3985
3986        assert_eq!(child.name, "_0.date_val");
3987        assert_eq!(child.source_name, "_0");
3988        assert_eq!(child.source_kind, SourceKind::Virtual);
3989        assert_eq!(child.source_alias.as_deref(), Some("date_val"));
3990
3991        let Expression::Column(column) = &child.expression else {
3992            panic!(
3993                "expected downstream column expression, got {:?}",
3994                child.expression
3995            );
3996        };
3997        assert_eq!(column.name.name, "date_val");
3998        assert_eq!(
3999            column.table.as_ref().map(|table| table.name.as_str()),
4000            Some("_0")
4001        );
4002        assert!(
4003            matches!(&child.source, Expression::Alias(alias) if matches!(&alias.this, Expression::Unnest(_)) && alias.alias.name == "date_val"),
4004            "expected UNNEST source expression, got {:?}",
4005            child.source
4006        );
4007    }
4008
4009    #[test]
4010    fn test_lineage_real_table_named_like_unnest_alias_is_not_virtual() {
4011        let expr =
4012            parse_one("SELECT date_val.id FROM date_val", DialectType::BigQuery).expect("parse");
4013
4014        let node = lineage("id", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4015        let child = node.downstream.first().expect("id should have lineage");
4016
4017        assert_eq!(child.name, "date_val.id");
4018        assert_eq!(child.source_name, "date_val");
4019        assert_eq!(child.source_kind, SourceKind::Table);
4020        assert_eq!(child.source_alias, None);
4021    }
4022
4023    #[test]
4024    fn test_lineage_multiple_bigquery_unnest_sources_get_stable_virtual_names() {
4025        let expr = parse_one(
4026            r#"
4027SELECT a.a AS first_value, b.b AS second_value
4028FROM UNNEST(GENERATE_ARRAY(1, 2)) AS a
4029JOIN UNNEST(GENERATE_ARRAY(3, 4)) AS b ON TRUE
4030"#,
4031            DialectType::BigQuery,
4032        )
4033        .expect("parse");
4034
4035        let first =
4036            lineage("first_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4037        let second =
4038            lineage("second_value", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4039
4040        let first_child = first.downstream.first().expect("first source");
4041        let second_child = second.downstream.first().expect("second source");
4042
4043        assert_eq!(first_child.name, "_0.a");
4044        assert_eq!(first_child.source_name, "_0");
4045        assert_eq!(first_child.source_alias.as_deref(), Some("a"));
4046        assert_eq!(first_child.source_kind, SourceKind::Virtual);
4047
4048        assert_eq!(second_child.name, "_1.b");
4049        assert_eq!(second_child.source_name, "_1");
4050        assert_eq!(second_child.source_alias.as_deref(), Some("b"));
4051        assert_eq!(second_child.source_kind, SourceKind::Virtual);
4052    }
4053
4054    #[test]
4055    fn test_lineage_table_backed_unnest_points_to_real_source_column() {
4056        let expr = parse_one(
4057            r#"
4058SELECT item.item AS item
4059FROM t JOIN UNNEST(t.items) AS item ON TRUE
4060"#,
4061            DialectType::BigQuery,
4062        )
4063        .expect("parse");
4064
4065        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4066        let virtual_child = node.downstream.first().expect("virtual item source");
4067        assert_eq!(virtual_child.name, "_0.item");
4068        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4069
4070        let real_child = virtual_child
4071            .downstream
4072            .first()
4073            .expect("UNNEST(t.items) should depend on t.items");
4074        assert_eq!(real_child.name, "t.items");
4075        assert_eq!(real_child.source_name, "t");
4076        assert_eq!(real_child.source_kind, SourceKind::Table);
4077    }
4078
4079    #[test]
4080    fn test_lineage_table_backed_unnest_unqualified_column_resolves_to_virtual_source() {
4081        let expr = parse_one(
4082            r#"
4083SELECT item AS item
4084FROM t JOIN UNNEST(t.items) AS item ON TRUE
4085"#,
4086            DialectType::BigQuery,
4087        )
4088        .expect("parse");
4089
4090        let node = lineage("item", &expr, Some(DialectType::BigQuery), false).expect("lineage");
4091        let virtual_child = node.downstream.first().expect("virtual item source");
4092        assert_eq!(virtual_child.name, "_0.item");
4093        assert_eq!(virtual_child.source_name, "_0");
4094        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4095        assert_eq!(virtual_child.source_alias.as_deref(), Some("item"));
4096
4097        let real_child = virtual_child
4098            .downstream
4099            .first()
4100            .expect("UNNEST(t.items) should depend on t.items");
4101        assert_eq!(real_child.name, "t.items");
4102        assert_eq!(real_child.source_name, "t");
4103        assert_eq!(real_child.source_kind, SourceKind::Table);
4104    }
4105
4106    #[test]
4107    fn test_lineage_unnest_alias_columns_resolve_to_virtual_sources_across_dialects() {
4108        let cases = [
4109            (
4110                DialectType::PostgreSQL,
4111                "SELECT x AS out FROM t CROSS JOIN LATERAL UNNEST(items) AS u(x)",
4112            ),
4113            (
4114                DialectType::Presto,
4115                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
4116            ),
4117            (
4118                DialectType::Trino,
4119                "SELECT x AS out FROM t CROSS JOIN UNNEST(items) AS u(x)",
4120            ),
4121        ];
4122
4123        for (dialect, sql) in cases {
4124            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
4125            let node = lineage("out", &expr, Some(dialect), false)
4126                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
4127            let virtual_child = node
4128                .downstream
4129                .first()
4130                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
4131
4132            assert_eq!(
4133                virtual_child.name, "_0.x",
4134                "unexpected virtual child for {dialect:?}"
4135            );
4136            assert_eq!(virtual_child.source_name, "_0");
4137            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4138            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
4139
4140            let real_child = virtual_child
4141                .downstream
4142                .first()
4143                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
4144            assert_eq!(real_child.name, "t.items");
4145            assert_eq!(real_child.source_kind, SourceKind::Table);
4146        }
4147    }
4148
4149    #[test]
4150    fn test_lineage_lateral_view_columns_resolve_to_virtual_sources() {
4151        let cases = [
4152            (
4153                DialectType::Spark,
4154                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
4155            ),
4156            (
4157                DialectType::Hive,
4158                "SELECT x AS out FROM t LATERAL VIEW EXPLODE(items) u AS x",
4159            ),
4160        ];
4161
4162        for (dialect, sql) in cases {
4163            let expr = parse_one(sql, dialect).unwrap_or_else(|e| panic!("parse {dialect:?}: {e}"));
4164            let node = lineage("out", &expr, Some(dialect), false)
4165                .unwrap_or_else(|e| panic!("lineage {dialect:?}: {e}"));
4166            let virtual_child = node
4167                .downstream
4168                .first()
4169                .unwrap_or_else(|| panic!("expected virtual child for {dialect:?}"));
4170
4171            assert_eq!(virtual_child.name, "_0.x");
4172            assert_eq!(virtual_child.source_name, "_0");
4173            assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4174            assert_eq!(virtual_child.source_alias.as_deref(), Some("u"));
4175
4176            let real_child = virtual_child
4177                .downstream
4178                .first()
4179                .unwrap_or_else(|| panic!("expected table dependency for {dialect:?}"));
4180            assert_eq!(real_child.name, "t.items");
4181            assert_eq!(real_child.source_kind, SourceKind::Table);
4182        }
4183    }
4184
4185    #[test]
4186    fn test_lineage_snowflake_lateral_flatten_is_virtual_source() {
4187        let expr = parse_one(
4188            "SELECT f.value AS value FROM raw_events, LATERAL FLATTEN(INPUT => payload:items) AS f",
4189            DialectType::Snowflake,
4190        )
4191        .expect("parse");
4192
4193        let node = lineage("value", &expr, Some(DialectType::Snowflake), false).expect("lineage");
4194        let virtual_child = node.downstream.first().expect("virtual flatten source");
4195        assert_eq!(virtual_child.name, "_0.value");
4196        assert_eq!(virtual_child.source_name, "_0");
4197        assert_eq!(virtual_child.source_kind, SourceKind::Virtual);
4198        assert_eq!(virtual_child.source_alias.as_deref(), Some("f"));
4199
4200        let real_child = virtual_child
4201            .downstream
4202            .first()
4203            .expect("FLATTEN input should depend on raw_events.payload");
4204        assert_eq!(real_child.name, "raw_events.payload");
4205        assert_eq!(real_child.source_kind, SourceKind::Table);
4206    }
4207
4208    #[test]
4209    fn test_lineage_with_schema_snowflake_datediff_date_part_issue_61() {
4210        let expr = parse_one(
4211            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
4212            DialectType::Snowflake,
4213        )
4214        .expect("parse");
4215
4216        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4217        schema
4218            .add_table(
4219                "fact.some_daily_metrics",
4220                &[("date_utc".to_string(), DataType::Date)],
4221                None,
4222            )
4223            .expect("schema setup");
4224
4225        let node = lineage_with_schema(
4226            "recency",
4227            &expr,
4228            Some(&schema),
4229            Some(DialectType::Snowflake),
4230            false,
4231        )
4232        .expect("lineage_with_schema should not treat date part as a column");
4233
4234        let names = node.downstream_names();
4235        assert!(
4236            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4237            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4238            names
4239        );
4240        assert!(
4241            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4242            "Did not expect date part to appear as lineage column, got: {:?}",
4243            names
4244        );
4245    }
4246
4247    #[test]
4248    fn test_snowflake_datediff_parses_to_typed_ast() {
4249        let expr = parse_one(
4250            "SELECT DATEDIFF(day, date_utc, CURRENT_DATE()) AS recency FROM fact.some_daily_metrics",
4251            DialectType::Snowflake,
4252        )
4253        .expect("parse");
4254
4255        match expr {
4256            Expression::Select(select) => match &select.expressions[0] {
4257                Expression::Alias(alias) => match &alias.this {
4258                    Expression::DateDiff(f) => {
4259                        assert_eq!(f.unit, Some(crate::expressions::IntervalUnit::Day));
4260                    }
4261                    other => panic!("expected DateDiff, got {other:?}"),
4262                },
4263                other => panic!("expected Alias, got {other:?}"),
4264            },
4265            other => panic!("expected Select, got {other:?}"),
4266        }
4267    }
4268
4269    #[test]
4270    fn test_lineage_with_schema_snowflake_dateadd_date_part_issue_followup() {
4271        let expr = parse_one(
4272            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
4273            DialectType::Snowflake,
4274        )
4275        .expect("parse");
4276
4277        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4278        schema
4279            .add_table(
4280                "fact.some_daily_metrics",
4281                &[("date_utc".to_string(), DataType::Date)],
4282                None,
4283            )
4284            .expect("schema setup");
4285
4286        let node = lineage_with_schema(
4287            "next_day",
4288            &expr,
4289            Some(&schema),
4290            Some(DialectType::Snowflake),
4291            false,
4292        )
4293        .expect("lineage_with_schema should not treat DATEADD date part as a column");
4294
4295        let names = node.downstream_names();
4296        assert!(
4297            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4298            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4299            names
4300        );
4301        assert!(
4302            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4303            "Did not expect date part to appear as lineage column, got: {:?}",
4304            names
4305        );
4306    }
4307
4308    #[test]
4309    fn test_lineage_with_schema_snowflake_date_part_identifier_issue_followup() {
4310        let expr = parse_one(
4311            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
4312            DialectType::Snowflake,
4313        )
4314        .expect("parse");
4315
4316        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4317        schema
4318            .add_table(
4319                "fact.some_daily_metrics",
4320                &[("date_utc".to_string(), DataType::Date)],
4321                None,
4322            )
4323            .expect("schema setup");
4324
4325        let node = lineage_with_schema(
4326            "day_part",
4327            &expr,
4328            Some(&schema),
4329            Some(DialectType::Snowflake),
4330            false,
4331        )
4332        .expect("lineage_with_schema should not treat DATE_PART identifier as a column");
4333
4334        let names = node.downstream_names();
4335        assert!(
4336            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4337            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4338            names
4339        );
4340        assert!(
4341            !names.iter().any(|n| n.ends_with(".day") || n == "day"),
4342            "Did not expect date part to appear as lineage column, got: {:?}",
4343            names
4344        );
4345    }
4346
4347    #[test]
4348    fn test_lineage_with_schema_snowflake_date_part_string_literal_control() {
4349        let expr = parse_one(
4350            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
4351            DialectType::Snowflake,
4352        )
4353        .expect("parse");
4354
4355        let mut schema = MappingSchema::with_dialect(DialectType::Snowflake);
4356        schema
4357            .add_table(
4358                "fact.some_daily_metrics",
4359                &[("date_utc".to_string(), DataType::Date)],
4360                None,
4361            )
4362            .expect("schema setup");
4363
4364        let node = lineage_with_schema(
4365            "day_part",
4366            &expr,
4367            Some(&schema),
4368            Some(DialectType::Snowflake),
4369            false,
4370        )
4371        .expect("quoted DATE_PART should continue to work");
4372
4373        let names = node.downstream_names();
4374        assert!(
4375            names.iter().any(|n| n == "some_daily_metrics.date_utc"),
4376            "Expected some_daily_metrics.date_utc in downstream, got: {:?}",
4377            names
4378        );
4379    }
4380
4381    #[test]
4382    fn test_snowflake_dateadd_date_part_identifier_stays_generic_function() {
4383        let expr = parse_one(
4384            "SELECT DATEADD(day, 1, date_utc) AS next_day FROM fact.some_daily_metrics",
4385            DialectType::Snowflake,
4386        )
4387        .expect("parse");
4388
4389        match expr {
4390            Expression::Select(select) => match &select.expressions[0] {
4391                Expression::Alias(alias) => match &alias.this {
4392                    Expression::Function(f) => {
4393                        assert_eq!(f.name.to_uppercase(), "DATEADD");
4394                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4395                    }
4396                    other => panic!("expected generic DATEADD function, got {other:?}"),
4397                },
4398                other => panic!("expected Alias, got {other:?}"),
4399            },
4400            other => panic!("expected Select, got {other:?}"),
4401        }
4402    }
4403
4404    #[test]
4405    fn test_snowflake_date_part_identifier_stays_generic_function_with_var_arg() {
4406        let expr = parse_one(
4407            "SELECT DATE_PART(day, date_utc) AS day_part FROM fact.some_daily_metrics",
4408            DialectType::Snowflake,
4409        )
4410        .expect("parse");
4411
4412        match expr {
4413            Expression::Select(select) => match &select.expressions[0] {
4414                Expression::Alias(alias) => match &alias.this {
4415                    Expression::Function(f) => {
4416                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
4417                        assert!(matches!(&f.args[0], Expression::Var(v) if v.this == "day"));
4418                    }
4419                    other => panic!("expected generic DATE_PART function, got {other:?}"),
4420                },
4421                other => panic!("expected Alias, got {other:?}"),
4422            },
4423            other => panic!("expected Select, got {other:?}"),
4424        }
4425    }
4426
4427    #[test]
4428    fn test_snowflake_date_part_string_literal_stays_generic_function() {
4429        let expr = parse_one(
4430            "SELECT DATE_PART('day', date_utc) AS day_part FROM fact.some_daily_metrics",
4431            DialectType::Snowflake,
4432        )
4433        .expect("parse");
4434
4435        match expr {
4436            Expression::Select(select) => match &select.expressions[0] {
4437                Expression::Alias(alias) => match &alias.this {
4438                    Expression::Function(f) => {
4439                        assert_eq!(f.name.to_uppercase(), "DATE_PART");
4440                    }
4441                    other => panic!("expected generic DATE_PART function, got {other:?}"),
4442                },
4443                other => panic!("expected Alias, got {other:?}"),
4444            },
4445            other => panic!("expected Select, got {other:?}"),
4446        }
4447    }
4448
4449    #[test]
4450    fn test_lineage_join() {
4451        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4452
4453        let node_a = lineage("a", &expr, None, false).unwrap();
4454        let names_a = node_a.downstream_names();
4455        assert!(
4456            names_a.iter().any(|n| n == "t.a"),
4457            "Expected t.a, got: {:?}",
4458            names_a
4459        );
4460
4461        let node_b = lineage("b", &expr, None, false).unwrap();
4462        let names_b = node_b.downstream_names();
4463        assert!(
4464            names_b.iter().any(|n| n == "s.b"),
4465            "Expected s.b, got: {:?}",
4466            names_b
4467        );
4468    }
4469
4470    #[test]
4471    fn test_lineage_alias_leaf_has_resolved_source_name() {
4472        let expr = parse("SELECT t1.col1 FROM table1 t1 JOIN table2 t2 ON t1.id = t2.id");
4473        let node = lineage("col1", &expr, None, false).unwrap();
4474
4475        // Keep alias in the display lineage edge.
4476        let names = node.downstream_names();
4477        assert!(
4478            names.iter().any(|n| n == "t1.col1"),
4479            "Expected aliased column edge t1.col1, got: {:?}",
4480            names
4481        );
4482
4483        // Leaf should expose the resolved base table for consumers.
4484        let leaf = node
4485            .downstream
4486            .iter()
4487            .find(|n| n.name == "t1.col1")
4488            .expect("Expected t1.col1 leaf");
4489        assert_eq!(leaf.source_name, "table1");
4490        match &leaf.source {
4491            Expression::Table(table) => assert_eq!(table.name.name, "table1"),
4492            _ => panic!("Expected leaf source to be a table expression"),
4493        }
4494    }
4495
4496    #[test]
4497    fn test_lineage_derived_table() {
4498        let expr = parse("SELECT x.a FROM (SELECT a FROM t) AS x");
4499        let node = lineage("a", &expr, None, false).unwrap();
4500
4501        assert_eq!(node.name, "a");
4502        // Should trace through the derived table to t.a
4503        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4504        assert!(
4505            all_names.iter().any(|n| n == "t.a"),
4506            "Expected to trace through derived table to t.a, got: {:?}",
4507            all_names
4508        );
4509    }
4510
4511    #[test]
4512    fn test_lineage_cte() {
4513        let expr = parse("WITH cte AS (SELECT a FROM t) SELECT a FROM cte");
4514        let node = lineage("a", &expr, None, false).unwrap();
4515
4516        assert_eq!(node.name, "a");
4517        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4518        assert!(
4519            all_names.iter().any(|n| n == "t.a"),
4520            "Expected to trace through CTE to t.a, got: {:?}",
4521            all_names
4522        );
4523    }
4524
4525    #[test]
4526    fn test_lineage_union() {
4527        let expr = parse("SELECT a FROM t1 UNION SELECT a FROM t2");
4528        let node = lineage("a", &expr, None, false).unwrap();
4529
4530        assert_eq!(node.name, "a");
4531        // Should have 2 downstream branches
4532        assert_eq!(
4533            node.downstream.len(),
4534            2,
4535            "Expected 2 branches for UNION, got {}",
4536            node.downstream.len()
4537        );
4538    }
4539
4540    #[test]
4541    fn test_lineage_cte_union() {
4542        let expr = parse("WITH cte AS (SELECT a FROM t1 UNION SELECT a FROM t2) SELECT a FROM cte");
4543        let node = lineage("a", &expr, None, false).unwrap();
4544
4545        // Should trace through CTE into both UNION branches
4546        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4547        assert!(
4548            all_names.len() >= 3,
4549            "Expected at least 3 nodes for CTE with UNION, got: {:?}",
4550            all_names
4551        );
4552    }
4553
4554    #[test]
4555    fn test_lineage_star() {
4556        let expr = parse("SELECT * FROM t");
4557        let node = lineage("*", &expr, None, false).unwrap();
4558
4559        assert_eq!(node.name, "*");
4560        // Should have downstream for table t
4561        assert!(
4562            !node.downstream.is_empty(),
4563            "Star should produce downstream nodes"
4564        );
4565    }
4566
4567    #[test]
4568    fn test_lineage_subquery_in_select() {
4569        let expr = parse("SELECT (SELECT MAX(b) FROM s) AS x FROM t");
4570        let node = lineage("x", &expr, None, false).unwrap();
4571
4572        assert_eq!(node.name, "x");
4573        // Should have traced into the scalar subquery
4574        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4575        assert!(
4576            all_names.len() >= 2,
4577            "Expected tracing into scalar subquery, got: {:?}",
4578            all_names
4579        );
4580    }
4581
4582    #[test]
4583    fn test_lineage_multiple_columns() {
4584        let expr = parse("SELECT a, b FROM t");
4585
4586        let node_a = lineage("a", &expr, None, false).unwrap();
4587        let node_b = lineage("b", &expr, None, false).unwrap();
4588
4589        assert_eq!(node_a.name, "a");
4590        assert_eq!(node_b.name, "b");
4591
4592        // Each should trace independently
4593        let names_a = node_a.downstream_names();
4594        let names_b = node_b.downstream_names();
4595        assert!(names_a.iter().any(|n| n == "t.a"));
4596        assert!(names_b.iter().any(|n| n == "t.b"));
4597    }
4598
4599    #[test]
4600    fn test_get_source_tables() {
4601        let expr = parse("SELECT t.a, s.b FROM t JOIN s ON t.id = s.id");
4602        let node = lineage("a", &expr, None, false).unwrap();
4603
4604        let tables = get_source_tables(&node);
4605        assert!(
4606            tables.contains("t"),
4607            "Expected source table 't', got: {:?}",
4608            tables
4609        );
4610    }
4611
4612    #[test]
4613    fn test_lineage_column_not_found() {
4614        let expr = parse("SELECT a FROM t");
4615        let result = lineage("nonexistent", &expr, None, false);
4616        assert!(result.is_err());
4617    }
4618
4619    #[test]
4620    fn test_lineage_nested_cte() {
4621        let expr = parse(
4622            "WITH cte1 AS (SELECT a FROM t), \
4623             cte2 AS (SELECT a FROM cte1) \
4624             SELECT a FROM cte2",
4625        );
4626        let node = lineage("a", &expr, None, false).unwrap();
4627
4628        // Should trace through cte2 → cte1 → t
4629        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4630        assert!(
4631            all_names.len() >= 3,
4632            "Expected to trace through nested CTEs, got: {:?}",
4633            all_names
4634        );
4635    }
4636
4637    #[test]
4638    fn test_trim_selects_true() {
4639        let expr = parse("SELECT a, b, c FROM t");
4640        let node = lineage("a", &expr, None, true).unwrap();
4641
4642        // The source should be trimmed to only include 'a'
4643        if let Expression::Select(select) = &node.source {
4644            assert_eq!(
4645                select.expressions.len(),
4646                1,
4647                "Trimmed source should have 1 expression, got {}",
4648                select.expressions.len()
4649            );
4650        } else {
4651            panic!("Expected Select source");
4652        }
4653    }
4654
4655    #[test]
4656    fn test_trim_selects_false() {
4657        let expr = parse("SELECT a, b, c FROM t");
4658        let node = lineage("a", &expr, None, false).unwrap();
4659
4660        // The source should keep all columns
4661        if let Expression::Select(select) = &node.source {
4662            assert_eq!(
4663                select.expressions.len(),
4664                3,
4665                "Untrimmed source should have 3 expressions"
4666            );
4667        } else {
4668            panic!("Expected Select source");
4669        }
4670    }
4671
4672    #[test]
4673    fn test_lineage_expression_in_select() {
4674        let expr = parse("SELECT a + b AS c FROM t");
4675        let node = lineage("c", &expr, None, false).unwrap();
4676
4677        // Should trace to both a and b from t
4678        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4679        assert!(
4680            all_names.len() >= 3,
4681            "Expected to trace a + b to both columns, got: {:?}",
4682            all_names
4683        );
4684    }
4685
4686    #[test]
4687    fn test_set_operation_by_index() {
4688        let expr = parse("SELECT a FROM t1 UNION SELECT b FROM t2");
4689
4690        // Trace column "a" which is at index 0
4691        let node = lineage("a", &expr, None, false).unwrap();
4692
4693        // UNION branches should be traced by index
4694        assert_eq!(node.downstream.len(), 2);
4695    }
4696
4697    // --- Tests for column lineage inside function calls (issue #18) ---
4698
4699    fn print_node(node: &LineageNode, indent: usize) {
4700        let pad = "  ".repeat(indent);
4701        println!(
4702            "{pad}name={:?} source_name={:?}",
4703            node.name, node.source_name
4704        );
4705        for child in &node.downstream {
4706            print_node(child, indent + 1);
4707        }
4708    }
4709
4710    #[test]
4711    fn test_issue18_repro() {
4712        // Exact scenario from the issue
4713        let query = "SELECT UPPER(name) as upper_name FROM users";
4714        println!("Query: {query}\n");
4715
4716        let dialect = crate::dialects::Dialect::get(DialectType::BigQuery);
4717        let exprs = dialect.parse(query).unwrap();
4718        let expr = &exprs[0];
4719
4720        let node = lineage("upper_name", expr, Some(DialectType::BigQuery), false).unwrap();
4721        println!("lineage(\"upper_name\"):");
4722        print_node(&node, 1);
4723
4724        let names = node.downstream_names();
4725        assert!(
4726            names.iter().any(|n| n == "users.name"),
4727            "Expected users.name in downstream, got: {:?}",
4728            names
4729        );
4730    }
4731
4732    #[test]
4733    fn test_lineage_bigquery_safe_namespace_issue207() {
4734        let query = r#"
4735WITH import_cte AS (
4736  SELECT timestamp, data, operation
4737  FROM `project`.`dataset`.`source_table`
4738),
4739transform_cte AS (
4740  SELECT
4741    timestamp,
4742    SAFE.PARSE_JSON(data) AS json_data
4743  FROM import_cte
4744)
4745SELECT json_data FROM transform_cte
4746"#;
4747        let expr = parse_one(query, DialectType::BigQuery).expect("parse");
4748        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4749            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4750        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4751
4752        assert!(
4753            names.iter().any(|name| name == "source_table.data"),
4754            "expected source_table.data in lineage, got {names:?}"
4755        );
4756        assert!(
4757            !names
4758                .iter()
4759                .any(|name| name.eq_ignore_ascii_case("import_cte.safe")),
4760            "did not expect SAFE namespace receiver in lineage, got {names:?}"
4761        );
4762    }
4763
4764    #[test]
4765    fn test_lineage_bigquery_safe_namespace_method_call_guard() {
4766        let expr = parse("SELECT SAFE.PARSE_JSON(data) AS json_data FROM t");
4767        let node = lineage("json_data", &expr, Some(DialectType::BigQuery), false)
4768            .expect("lineage should resolve SAFE.PARSE_JSON arguments");
4769        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4770
4771        assert!(
4772            names.iter().any(|name| name == "t.data"),
4773            "expected t.data in lineage, got {names:?}"
4774        );
4775        assert!(
4776            !names.iter().any(|name| name.eq_ignore_ascii_case("t.safe")),
4777            "did not expect SAFE namespace receiver in lineage, got {names:?}"
4778        );
4779    }
4780
4781    #[test]
4782    fn test_lineage_method_call_receiver_control() {
4783        let expr = parse("SELECT obj.METHOD(arg) AS out FROM t");
4784        let node = lineage("out", &expr, None, false).expect("lineage");
4785        let names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4786
4787        assert!(
4788            names.iter().any(|name| name == "t.obj"),
4789            "expected ordinary method receiver to remain in lineage, got {names:?}"
4790        );
4791        assert!(
4792            names.iter().any(|name| name == "t.arg"),
4793            "expected method argument in lineage, got {names:?}"
4794        );
4795    }
4796
4797    #[test]
4798    fn test_lineage_upper_function() {
4799        let expr = parse("SELECT UPPER(name) AS upper_name FROM users");
4800        let node = lineage("upper_name", &expr, None, false).unwrap();
4801
4802        let names = node.downstream_names();
4803        assert!(
4804            names.iter().any(|n| n == "users.name"),
4805            "Expected users.name in downstream, got: {:?}",
4806            names
4807        );
4808    }
4809
4810    #[test]
4811    fn test_lineage_round_function() {
4812        let expr = parse("SELECT ROUND(price, 2) AS rounded FROM products");
4813        let node = lineage("rounded", &expr, None, false).unwrap();
4814
4815        let names = node.downstream_names();
4816        assert!(
4817            names.iter().any(|n| n == "products.price"),
4818            "Expected products.price in downstream, got: {:?}",
4819            names
4820        );
4821    }
4822
4823    #[test]
4824    fn test_lineage_coalesce_function() {
4825        let expr = parse("SELECT COALESCE(a, b) AS val FROM t");
4826        let node = lineage("val", &expr, None, false).unwrap();
4827
4828        let names = node.downstream_names();
4829        assert!(
4830            names.iter().any(|n| n == "t.a"),
4831            "Expected t.a in downstream, got: {:?}",
4832            names
4833        );
4834        assert!(
4835            names.iter().any(|n| n == "t.b"),
4836            "Expected t.b in downstream, got: {:?}",
4837            names
4838        );
4839    }
4840
4841    #[test]
4842    fn test_lineage_count_function() {
4843        let expr = parse("SELECT COUNT(id) AS cnt FROM t");
4844        let node = lineage("cnt", &expr, None, false).unwrap();
4845
4846        let names = node.downstream_names();
4847        assert!(
4848            names.iter().any(|n| n == "t.id"),
4849            "Expected t.id in downstream, got: {:?}",
4850            names
4851        );
4852    }
4853
4854    #[test]
4855    fn test_lineage_sum_function() {
4856        let expr = parse("SELECT SUM(amount) AS total FROM t");
4857        let node = lineage("total", &expr, None, false).unwrap();
4858
4859        let names = node.downstream_names();
4860        assert!(
4861            names.iter().any(|n| n == "t.amount"),
4862            "Expected t.amount in downstream, got: {:?}",
4863            names
4864        );
4865    }
4866
4867    #[test]
4868    fn test_lineage_case_with_nested_functions() {
4869        let expr =
4870            parse("SELECT CASE WHEN x > 0 THEN UPPER(name) ELSE LOWER(name) END AS result FROM t");
4871        let node = lineage("result", &expr, None, false).unwrap();
4872
4873        let names = node.downstream_names();
4874        assert!(
4875            names.iter().any(|n| n == "t.x"),
4876            "Expected t.x in downstream, got: {:?}",
4877            names
4878        );
4879        assert!(
4880            names.iter().any(|n| n == "t.name"),
4881            "Expected t.name in downstream, got: {:?}",
4882            names
4883        );
4884    }
4885
4886    #[test]
4887    fn test_lineage_substring_function() {
4888        let expr = parse("SELECT SUBSTRING(name, 1, 3) AS short FROM t");
4889        let node = lineage("short", &expr, None, false).unwrap();
4890
4891        let names = node.downstream_names();
4892        assert!(
4893            names.iter().any(|n| n == "t.name"),
4894            "Expected t.name in downstream, got: {:?}",
4895            names
4896        );
4897    }
4898
4899    // --- CTE + SELECT * tests (ported from sqlglot test_lineage.py) ---
4900
4901    #[test]
4902    fn test_lineage_cte_select_star() {
4903        // Ported from sqlglot: test_lineage_source_with_star
4904        // WITH y AS (SELECT * FROM x) SELECT a FROM y
4905        // After star expansion: SELECT y.a AS a FROM y
4906        let expr = parse("WITH y AS (SELECT * FROM x) SELECT a FROM y");
4907        let node = lineage("a", &expr, None, false).unwrap();
4908
4909        assert_eq!(node.name, "a");
4910        // Should successfully resolve column 'a' through the CTE
4911        // (previously failed with "Cannot find column 'a' in query")
4912        assert!(
4913            !node.downstream.is_empty(),
4914            "Expected downstream nodes tracing through CTE, got none"
4915        );
4916    }
4917
4918    #[test]
4919    fn test_lineage_schema_less_cte_star_passthrough_resolves_base_column() {
4920        let expr = parse("WITH c AS (SELECT * FROM t) SELECT c.x FROM c");
4921        let node = lineage("x", &expr, None, false).unwrap();
4922
4923        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4924        assert!(
4925            all_names.iter().any(|name| name == "t.x"),
4926            "Expected schema-less CTE star passthrough to reach t.x, got: {:?}",
4927            all_names
4928        );
4929
4930        let cte_node = node
4931            .walk()
4932            .find(|child| child.source_kind == SourceKind::Cte && child.source_name == "c")
4933            .expect("expected CTE hop with source_name c");
4934        assert_eq!(cte_node.source_kind, SourceKind::Cte);
4935        assert_eq!(cte_node.source_name, "c");
4936    }
4937
4938    #[test]
4939    fn test_lineage_schema_less_cte_star_passthrough_with_aggregation() {
4940        let expr = parse(
4941            "WITH c AS (SELECT * FROM t) \
4942             SELECT SUM(c.x) AS s FROM c GROUP BY 1",
4943        );
4944        let node = lineage("s", &expr, None, false).unwrap();
4945
4946        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4947        assert!(
4948            all_names.iter().any(|name| name == "t.x"),
4949            "Expected aggregate over CTE star passthrough to reach t.x, got: {:?}",
4950            all_names
4951        );
4952    }
4953
4954    #[test]
4955    fn test_lineage_schema_less_cte_star_passthrough_with_join_and_alias() {
4956        let expr = parse(
4957            "WITH a AS (SELECT * FROM t1), b AS (SELECT * FROM t2) \
4958             SELECT SUM(b.x) AS s FROM a LEFT JOIN b ON b.id = a.id GROUP BY a.k",
4959        );
4960        let node = lineage("s", &expr, None, false).unwrap();
4961
4962        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4963        assert!(
4964            all_names.iter().any(|name| name == "t2.x"),
4965            "Expected joined CTE star passthrough to reach t2.x, got: {:?}",
4966            all_names
4967        );
4968    }
4969
4970    #[test]
4971    fn test_lineage_schema_less_chained_cte_star_passthrough() {
4972        let expr = parse(
4973            "WITH c1 AS (SELECT * FROM t), \
4974             c2 AS (SELECT * FROM c1), \
4975             c3 AS (SELECT * FROM c2) \
4976             SELECT c3.x FROM c3",
4977        );
4978        let node = lineage("x", &expr, None, false).unwrap();
4979
4980        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
4981        assert!(
4982            all_names.iter().any(|name| name == "t.x"),
4983            "Expected chained CTE star passthrough to reach t.x, got: {:?}",
4984            all_names
4985        );
4986    }
4987
4988    #[test]
4989    fn test_lineage_schema_less_unqualified_star_with_multiple_sources_does_not_guess() {
4990        let expr = parse("SELECT * FROM t1 JOIN t2 ON t1.id = t2.id");
4991        let result = lineage("x", &expr, None, false);
4992
4993        assert!(
4994            result.is_err(),
4995            "Unqualified star over multiple sources should remain ambiguous, got: {:?}",
4996            result
4997        );
4998    }
4999
5000    #[test]
5001    fn test_lineage_cte_select_star_renamed_column() {
5002        // dbt standard pattern: CTE with column rename + outer SELECT *
5003        // This is the primary use case for dbt projects (jaffle-shop etc.)
5004        let expr =
5005            parse("WITH renamed AS (SELECT id AS customer_id FROM source) SELECT * FROM renamed");
5006        let node = lineage("customer_id", &expr, None, false).unwrap();
5007
5008        assert_eq!(node.name, "customer_id");
5009        // Should trace customer_id → renamed CTE → source.id
5010        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5011        assert!(
5012            all_names.len() >= 2,
5013            "Expected at least 2 nodes (customer_id → source), got: {:?}",
5014            all_names
5015        );
5016    }
5017
5018    #[test]
5019    fn test_lineage_cte_select_star_multiple_columns() {
5020        // CTE exposes multiple columns, outer SELECT * should resolve each
5021        let expr = parse("WITH cte AS (SELECT a, b, c FROM t) SELECT * FROM cte");
5022
5023        for col in &["a", "b", "c"] {
5024            let node = lineage(col, &expr, None, false).unwrap();
5025            assert_eq!(node.name, *col);
5026            // Verify lineage resolves without error (star expanded to explicit columns)
5027            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5028            assert!(
5029                all_names.len() >= 2,
5030                "Expected at least 2 nodes for column {}, got: {:?}",
5031                col,
5032                all_names
5033            );
5034        }
5035    }
5036
5037    #[test]
5038    fn test_lineage_nested_cte_select_star() {
5039        // Nested CTE star expansion: cte2 references cte1 via SELECT *
5040        let expr = parse(
5041            "WITH cte1 AS (SELECT a FROM t), \
5042             cte2 AS (SELECT * FROM cte1) \
5043             SELECT * FROM cte2",
5044        );
5045        let node = lineage("a", &expr, None, false).unwrap();
5046
5047        assert_eq!(node.name, "a");
5048        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5049        assert!(
5050            all_names.len() >= 3,
5051            "Expected at least 3 nodes (a → cte2 → cte1 → t.a), got: {:?}",
5052            all_names
5053        );
5054    }
5055
5056    #[test]
5057    fn test_lineage_three_level_nested_cte_star() {
5058        // Three-level nested CTE: cte3 → cte2 → cte1 → t
5059        let expr = parse(
5060            "WITH cte1 AS (SELECT x FROM t), \
5061             cte2 AS (SELECT * FROM cte1), \
5062             cte3 AS (SELECT * FROM cte2) \
5063             SELECT * FROM cte3",
5064        );
5065        let node = lineage("x", &expr, None, false).unwrap();
5066
5067        assert_eq!(node.name, "x");
5068        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5069        assert!(
5070            all_names.len() >= 4,
5071            "Expected at least 4 nodes through 3-level CTE chain, got: {:?}",
5072            all_names
5073        );
5074    }
5075
5076    #[test]
5077    fn test_lineage_cte_union_star() {
5078        // CTE with UNION body, outer SELECT * should resolve from left branch
5079        let expr = parse(
5080            "WITH cte AS (SELECT a, b FROM t1 UNION ALL SELECT a, b FROM t2) \
5081             SELECT * FROM cte",
5082        );
5083        let node = lineage("a", &expr, None, false).unwrap();
5084
5085        assert_eq!(node.name, "a");
5086        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5087        assert!(
5088            all_names.len() >= 2,
5089            "Expected at least 2 nodes for CTE union star, got: {:?}",
5090            all_names
5091        );
5092    }
5093
5094    #[test]
5095    fn test_lineage_cte_star_unknown_table() {
5096        // When CTE references an unknown table, star expansion is skipped gracefully
5097        // and lineage falls back to normal resolution (which may fail)
5098        let expr = parse(
5099            "WITH cte AS (SELECT * FROM unknown_table) \
5100             SELECT * FROM cte",
5101        );
5102        // This should not panic — it may succeed or fail depending on resolution,
5103        // but should not crash
5104        let _result = lineage("x", &expr, None, false);
5105    }
5106
5107    #[test]
5108    fn test_lineage_cte_explicit_columns() {
5109        // CTE with explicit column list: cte(x, y) AS (SELECT a, b FROM t)
5110        let expr = parse(
5111            "WITH cte(x, y) AS (SELECT a, b FROM t) \
5112             SELECT * FROM cte",
5113        );
5114        let node = lineage("x", &expr, None, false).unwrap();
5115        assert_eq!(node.name, "x");
5116    }
5117
5118    #[test]
5119    fn test_lineage_cte_qualified_star() {
5120        // Qualified star: SELECT cte.* FROM cte
5121        let expr = parse(
5122            "WITH cte AS (SELECT a, b FROM t) \
5123             SELECT cte.* FROM cte",
5124        );
5125        for col in &["a", "b"] {
5126            let node = lineage(col, &expr, None, false).unwrap();
5127            assert_eq!(node.name, *col);
5128            let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5129            assert!(
5130                all_names.len() >= 2,
5131                "Expected at least 2 nodes for qualified star column {}, got: {:?}",
5132                col,
5133                all_names
5134            );
5135        }
5136    }
5137
5138    #[test]
5139    fn test_lineage_subquery_select_star() {
5140        // Ported from sqlglot: test_select_star
5141        // SELECT x FROM (SELECT * FROM table_a)
5142        let expr = parse("SELECT x FROM (SELECT * FROM table_a)");
5143        let node = lineage("x", &expr, None, false).unwrap();
5144
5145        assert_eq!(node.name, "x");
5146        assert!(
5147            !node.downstream.is_empty(),
5148            "Expected downstream nodes for subquery with SELECT *, got none"
5149        );
5150    }
5151
5152    #[test]
5153    fn test_lineage_cte_star_with_schema_external_table() {
5154        // CTE references an external table via SELECT * — schema enables expansion
5155        let sql = r#"WITH orders AS (SELECT * FROM stg_orders)
5156SELECT * FROM orders"#;
5157        let expr = parse(sql);
5158
5159        let mut schema = MappingSchema::new();
5160        let cols = vec![
5161            ("order_id".to_string(), DataType::Unknown),
5162            ("customer_id".to_string(), DataType::Unknown),
5163            ("amount".to_string(), DataType::Unknown),
5164        ];
5165        schema.add_table("stg_orders", &cols, None).unwrap();
5166
5167        let node =
5168            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5169                .unwrap();
5170        assert_eq!(node.name, "order_id");
5171    }
5172
5173    #[test]
5174    fn test_lineage_cte_star_with_schema_three_part_name() {
5175        // CTE references an external table with fully-qualified 3-part name
5176        let sql = r#"WITH orders AS (SELECT * FROM "db"."schema"."stg_orders")
5177SELECT * FROM orders"#;
5178        let expr = parse(sql);
5179
5180        let mut schema = MappingSchema::new();
5181        let cols = vec![
5182            ("order_id".to_string(), DataType::Unknown),
5183            ("customer_id".to_string(), DataType::Unknown),
5184        ];
5185        schema
5186            .add_table("db.schema.stg_orders", &cols, None)
5187            .unwrap();
5188
5189        let node = lineage_with_schema(
5190            "customer_id",
5191            &expr,
5192            Some(&schema as &dyn Schema),
5193            None,
5194            false,
5195        )
5196        .unwrap();
5197        assert_eq!(node.name, "customer_id");
5198    }
5199
5200    #[test]
5201    fn test_lineage_cte_star_with_schema_nested() {
5202        // Nested CTEs: outer CTE references inner CTE with SELECT *,
5203        // inner CTE references external table with SELECT *
5204        let sql = r#"WITH
5205            raw AS (SELECT * FROM external_table),
5206            enriched AS (SELECT * FROM raw)
5207        SELECT * FROM enriched"#;
5208        let expr = parse(sql);
5209
5210        let mut schema = MappingSchema::new();
5211        let cols = vec![
5212            ("id".to_string(), DataType::Unknown),
5213            ("name".to_string(), DataType::Unknown),
5214        ];
5215        schema.add_table("external_table", &cols, None).unwrap();
5216
5217        let node =
5218            lineage_with_schema("name", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
5219        assert_eq!(node.name, "name");
5220    }
5221
5222    #[test]
5223    fn test_lineage_cte_qualified_star_with_schema() {
5224        // CTE uses qualified star (orders.*) from a CTE whose columns
5225        // come from an external table via SELECT *
5226        let sql = r#"WITH
5227            orders AS (SELECT * FROM stg_orders),
5228            enriched AS (
5229                SELECT orders.*, 'extra' AS extra
5230                FROM orders
5231            )
5232        SELECT * FROM enriched"#;
5233        let expr = parse(sql);
5234
5235        let mut schema = MappingSchema::new();
5236        let cols = vec![
5237            ("order_id".to_string(), DataType::Unknown),
5238            ("total".to_string(), DataType::Unknown),
5239        ];
5240        schema.add_table("stg_orders", &cols, None).unwrap();
5241
5242        let node =
5243            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5244                .unwrap();
5245        assert_eq!(node.name, "order_id");
5246
5247        // Also verify the extra column works
5248        let extra =
5249            lineage_with_schema("extra", &expr, Some(&schema as &dyn Schema), None, false).unwrap();
5250        assert_eq!(extra.name, "extra");
5251    }
5252
5253    #[test]
5254    fn test_lineage_cte_star_without_schema_still_works() {
5255        // Without schema, CTE-to-CTE star expansion still works
5256        let sql = r#"WITH
5257            cte1 AS (SELECT id, name FROM raw_table),
5258            cte2 AS (SELECT * FROM cte1)
5259        SELECT * FROM cte2"#;
5260        let expr = parse(sql);
5261
5262        // No schema — should still resolve through CTE chain
5263        let node = lineage("id", &expr, None, false).unwrap();
5264        assert_eq!(node.name, "id");
5265    }
5266
5267    #[test]
5268    fn test_lineage_nested_cte_star_with_join_and_schema() {
5269        // Reproduces dbt pattern: CTE chain with qualified star and JOIN
5270        // base_orders -> with_payments (JOIN) -> final -> outer SELECT
5271        let sql = r#"WITH
5272base_orders AS (
5273    SELECT * FROM stg_orders
5274),
5275with_payments AS (
5276    SELECT
5277        base_orders.*,
5278        p.amount
5279    FROM base_orders
5280    LEFT JOIN stg_payments p ON base_orders.order_id = p.order_id
5281),
5282final_cte AS (
5283    SELECT * FROM with_payments
5284)
5285SELECT * FROM final_cte"#;
5286        let expr = parse(sql);
5287
5288        let mut schema = MappingSchema::new();
5289        let order_cols = vec![
5290            (
5291                "order_id".to_string(),
5292                crate::expressions::DataType::Unknown,
5293            ),
5294            (
5295                "customer_id".to_string(),
5296                crate::expressions::DataType::Unknown,
5297            ),
5298            ("status".to_string(), crate::expressions::DataType::Unknown),
5299        ];
5300        let pay_cols = vec![
5301            (
5302                "payment_id".to_string(),
5303                crate::expressions::DataType::Unknown,
5304            ),
5305            (
5306                "order_id".to_string(),
5307                crate::expressions::DataType::Unknown,
5308            ),
5309            ("amount".to_string(), crate::expressions::DataType::Unknown),
5310        ];
5311        schema.add_table("stg_orders", &order_cols, None).unwrap();
5312        schema.add_table("stg_payments", &pay_cols, None).unwrap();
5313
5314        // order_id should trace back to stg_orders
5315        let node =
5316            lineage_with_schema("order_id", &expr, Some(&schema as &dyn Schema), None, false)
5317                .unwrap();
5318        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5319
5320        // The leaf should be "stg_orders.order_id" (not just "order_id")
5321        let has_table_qualified = all_names
5322            .iter()
5323            .any(|n| n.contains('.') && n.contains("order_id"));
5324        assert!(
5325            has_table_qualified,
5326            "Expected table-qualified leaf like 'stg_orders.order_id', got: {:?}",
5327            all_names
5328        );
5329
5330        // amount should trace back to stg_payments
5331        let node = lineage_with_schema("amount", &expr, Some(&schema as &dyn Schema), None, false)
5332            .unwrap();
5333        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5334
5335        let has_table_qualified = all_names
5336            .iter()
5337            .any(|n| n.contains('.') && n.contains("amount"));
5338        assert!(
5339            has_table_qualified,
5340            "Expected table-qualified leaf like 'stg_payments.amount', got: {:?}",
5341            all_names
5342        );
5343    }
5344
5345    #[test]
5346    fn test_lineage_cte_alias_resolution() {
5347        // FROM cte_name AS alias pattern: alias should resolve through CTE to source table
5348        let sql = r#"WITH import_stg_items AS (
5349    SELECT item_id, name, status FROM stg_items
5350)
5351SELECT base.item_id, base.status
5352FROM import_stg_items AS base"#;
5353        let expr = parse(sql);
5354
5355        let node = lineage("item_id", &expr, None, false).unwrap();
5356        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5357        // Should trace through alias "base" → CTE "import_stg_items" → "stg_items.item_id"
5358        assert!(
5359            all_names.iter().any(|n| n == "stg_items.item_id"),
5360            "Expected leaf 'stg_items.item_id', got: {:?}",
5361            all_names
5362        );
5363    }
5364
5365    #[test]
5366    fn test_lineage_cte_alias_with_schema_and_star() {
5367        // CTE alias + SELECT * expansion: FROM cte AS alias with star in CTE body
5368        let sql = r#"WITH import_stg AS (
5369    SELECT * FROM stg_items
5370)
5371SELECT base.item_id, base.status
5372FROM import_stg AS base"#;
5373        let expr = parse(sql);
5374
5375        let mut schema = MappingSchema::new();
5376        schema
5377            .add_table(
5378                "stg_items",
5379                &[
5380                    ("item_id".to_string(), DataType::Unknown),
5381                    ("name".to_string(), DataType::Unknown),
5382                    ("status".to_string(), DataType::Unknown),
5383                ],
5384                None,
5385            )
5386            .unwrap();
5387
5388        let node = lineage_with_schema("item_id", &expr, Some(&schema as &dyn Schema), None, false)
5389            .unwrap();
5390        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5391        assert!(
5392            all_names.iter().any(|n| n == "stg_items.item_id"),
5393            "Expected leaf 'stg_items.item_id', got: {:?}",
5394            all_names
5395        );
5396    }
5397
5398    #[test]
5399    fn test_lineage_cte_alias_with_join() {
5400        // Multiple CTE aliases in a JOIN: each should resolve independently
5401        let sql = r#"WITH
5402    import_users AS (SELECT id, name FROM users),
5403    import_orders AS (SELECT id, user_id, amount FROM orders)
5404SELECT u.name, o.amount
5405FROM import_users AS u
5406LEFT JOIN import_orders AS o ON u.id = o.user_id"#;
5407        let expr = parse(sql);
5408
5409        let node = lineage("name", &expr, None, false).unwrap();
5410        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5411        assert!(
5412            all_names.iter().any(|n| n == "users.name"),
5413            "Expected leaf 'users.name', got: {:?}",
5414            all_names
5415        );
5416
5417        let node = lineage("amount", &expr, None, false).unwrap();
5418        let all_names: Vec<_> = node.walk().map(|n| n.name.clone()).collect();
5419        assert!(
5420            all_names.iter().any(|n| n == "orders.amount"),
5421            "Expected leaf 'orders.amount', got: {:?}",
5422            all_names
5423        );
5424    }
5425
5426    // -----------------------------------------------------------------------
5427    // Quoted CTE name tests — verifying SQL identifier case semantics
5428    // -----------------------------------------------------------------------
5429
5430    #[test]
5431    fn test_lineage_unquoted_cte_case_insensitive() {
5432        // Unquoted CTE names are case-insensitive (both normalized to lowercase).
5433        // MyCte and MYCTE should match.
5434        let expr = parse("WITH MyCte AS (SELECT id AS col FROM source) SELECT * FROM MYCTE");
5435        let node = lineage("col", &expr, None, false).unwrap();
5436        assert_eq!(node.name, "col");
5437        assert!(
5438            !node.downstream.is_empty(),
5439            "Unquoted CTE should resolve case-insensitively"
5440        );
5441    }
5442
5443    #[test]
5444    fn test_lineage_quoted_cte_case_preserved() {
5445        // Quoted CTE name preserves case. "MyCte" referenced as "MyCte" should match.
5446        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "MyCte""#);
5447        let node = lineage("col", &expr, None, false).unwrap();
5448        assert_eq!(node.name, "col");
5449        assert!(
5450            !node.downstream.is_empty(),
5451            "Quoted CTE with matching case should resolve"
5452        );
5453    }
5454
5455    #[test]
5456    fn test_lineage_quoted_cte_case_mismatch_no_expansion() {
5457        // Quoted CTE "MyCte" referenced as "mycte" — case mismatch.
5458        // sqlglot treats this as a table reference, not a CTE match.
5459        // Star expansion should NOT resolve through the CTE.
5460        let expr = parse(r#"WITH "MyCte" AS (SELECT id AS col FROM source) SELECT * FROM "mycte""#);
5461        // lineage("col", ...) should fail because "mycte" is treated as an external
5462        // table (not matching CTE "MyCte"), and SELECT * cannot be expanded.
5463        let result = lineage("col", &expr, None, false);
5464        assert!(
5465            result.is_err(),
5466            "Quoted CTE with case mismatch should not expand star: {:?}",
5467            result
5468        );
5469    }
5470
5471    #[test]
5472    fn test_lineage_mixed_quoted_unquoted_cte() {
5473        // Mix of unquoted and quoted CTEs in a nested chain.
5474        let expr = parse(
5475            r#"WITH unquoted AS (SELECT 1 AS a FROM t), "Quoted" AS (SELECT a FROM unquoted) SELECT * FROM "Quoted""#,
5476        );
5477        let node = lineage("a", &expr, None, false).unwrap();
5478        assert_eq!(node.name, "a");
5479        assert!(
5480            !node.downstream.is_empty(),
5481            "Mixed quoted/unquoted CTE chain should resolve"
5482        );
5483    }
5484
5485    // -----------------------------------------------------------------------
5486    // Known bugs: quoted CTE case sensitivity in scope/lineage tracing paths
5487    // -----------------------------------------------------------------------
5488    //
5489    // expand_cte_stars correctly handles quoted vs unquoted CTE names via
5490    // normalize_cte_name(). However, the scope system (scope.rs add_table_to_scope)
5491    // and the lineage tracing path (to_node_inner) use eq_ignore_ascii_case or
5492    // direct string comparison for CTE name matching, ignoring the quoted status.
5493    //
5494    // sqlglot's normalize_identifiers treats quoted identifiers as case-sensitive
5495    // and unquoted as case-insensitive. The scope system should do the same.
5496    //
5497    // Fixing these requires changes across scope.rs and lineage.rs CTE resolution,
5498    // which is broader than the star expansion scope of this PR.
5499
5500    #[test]
5501    fn test_lineage_quoted_cte_case_mismatch_non_star_known_bug() {
5502        // Known bug: scope.rs add_table_to_scope uses eq_ignore_ascii_case for
5503        // all identifiers including quoted ones, so quoted CTE "MyCte" referenced
5504        // as "mycte" incorrectly resolves to the CTE.
5505        //
5506        // Per SQL semantics (and sqlglot behavior), quoted identifiers are
5507        // case-sensitive: "mycte" should NOT match CTE "MyCte".
5508        //
5509        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
5510        // this test should fail — update the assertion to match correct behavior:
5511        //   child.source_name should be "" (table ref), not "MyCte" (CTE ref).
5512        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT col FROM "mycte""#);
5513        let node = lineage("col", &expr, None, false).unwrap();
5514        assert!(!node.downstream.is_empty());
5515        let child = &node.downstream[0];
5516        // BUG: "mycte" incorrectly resolves to CTE "MyCte"
5517        assert_eq!(
5518            child.source_name, "MyCte",
5519            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5520             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5521        );
5522    }
5523
5524    #[test]
5525    fn test_lineage_quoted_cte_case_mismatch_qualified_col_known_bug() {
5526        // Known bug: same as above but with qualified column reference ("mycte".col).
5527        // scope.rs resolves "mycte" to CTE "MyCte" case-insensitively even for
5528        // quoted identifiers, so "mycte".col incorrectly traces through CTE "MyCte".
5529        //
5530        // This test asserts the CURRENT BUGGY behavior. When the bug is fixed,
5531        // this test should fail — update to assert source_name != "MyCte".
5532        let expr = parse(r#"WITH "MyCte" AS (SELECT 1 AS col) SELECT "mycte".col FROM "mycte""#);
5533        let node = lineage("col", &expr, None, false).unwrap();
5534        assert!(!node.downstream.is_empty());
5535        let child = &node.downstream[0];
5536        // BUG: "mycte".col incorrectly resolves through CTE "MyCte"
5537        assert_eq!(
5538            child.source_name, "MyCte",
5539            "Known bug: quoted CTE case mismatch should NOT resolve, but currently does. \
5540             If this fails, the bug may be fixed — update to assert source_name != \"MyCte\""
5541        );
5542    }
5543
5544    #[test]
5545    fn test_lineage_recursive_cte_terminates_at_base_case() {
5546        let expr = parse_dialect(
5547            "WITH RECURSIVE nums AS (\
5548             SELECT 1 AS n \
5549             UNION ALL \
5550             SELECT n + 1 FROM nums WHERE n < 5\
5551             ) SELECT n FROM nums",
5552            DialectType::DuckDB,
5553        );
5554        let node = lineage("n", &expr, Some(DialectType::DuckDB), false).unwrap();
5555        let names = lineage_names(&node);
5556
5557        assert!(
5558            names.len() <= 12,
5559            "recursive CTE lineage should not unroll repeatedly, got {names:?}"
5560        );
5561        assert!(
5562            node.walk()
5563                .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "nums"),
5564            "expected recursive source to be marked as a CTE, got {names:?}"
5565        );
5566    }
5567
5568    #[test]
5569    fn test_lineage_window_partition_and_order_columns() {
5570        let expr = parse(
5571            "WITH c AS (SELECT user_id, ts FROM events) \
5572             SELECT ROW_NUMBER() OVER (PARTITION BY c.user_id ORDER BY c.ts) AS out FROM c",
5573        );
5574        let node = lineage("out", &expr, None, false).unwrap();
5575
5576        assert_lineage_contains(&node, "events.user_id");
5577        assert_lineage_contains(&node, "events.ts");
5578    }
5579
5580    #[test]
5581    fn test_lineage_window_aggregate_order_column() {
5582        let expr = parse(
5583            "WITH c AS (SELECT amount, d FROM txns) \
5584             SELECT SUM(c.amount) OVER (ORDER BY c.d) AS running FROM c",
5585        );
5586        let node = lineage("running", &expr, None, false).unwrap();
5587
5588        assert_lineage_contains(&node, "txns.amount");
5589        assert_lineage_contains(&node, "txns.d");
5590    }
5591
5592    #[test]
5593    fn test_lineage_named_window_columns() {
5594        let expr = parse(
5595            "SELECT ROW_NUMBER() OVER w AS out \
5596             FROM events \
5597             WINDOW w AS (PARTITION BY user_id ORDER BY ts)",
5598        );
5599        let node = lineage("out", &expr, None, false).unwrap();
5600
5601        assert_lineage_contains(&node, "events.user_id");
5602        assert_lineage_contains(&node, "events.ts");
5603    }
5604
5605    #[test]
5606    fn test_lineage_within_group_order_column() {
5607        let expr =
5608            parse("SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY amount) AS p FROM txns");
5609        let node = lineage("p", &expr, None, false).unwrap();
5610
5611        assert_lineage_contains(&node, "txns.amount");
5612    }
5613
5614    #[test]
5615    fn test_lineage_query_wrappers_resolve_inner_select() {
5616        for sql in [
5617            "CREATE TABLE tgt AS SELECT x FROM src",
5618            "CREATE VIEW v AS SELECT x FROM src",
5619            "INSERT INTO tgt SELECT x FROM src",
5620        ] {
5621            let expr = parse(sql);
5622            let node = lineage("x", &expr, None, false).unwrap();
5623            assert_lineage_contains(&node, "src.x");
5624        }
5625    }
5626
5627    #[test]
5628    fn test_lineage_scalar_subquery_through_cte_reaches_base_table() {
5629        let expr = parse(
5630            "WITH c AS (SELECT x FROM t) \
5631             SELECT (SELECT SUM(x) FROM c) AS s FROM c LIMIT 1",
5632        );
5633        let node = lineage("s", &expr, None, false).unwrap();
5634
5635        assert_lineage_contains(&node, "t.x");
5636        assert!(
5637            node.walk()
5638                .any(|child| child.source_kind == SourceKind::Cte && child.source_name == "c"),
5639            "expected scalar subquery CTE hop in lineage, got {:?}",
5640            lineage_names(&node)
5641        );
5642    }
5643
5644    #[test]
5645    fn test_lineage_scalar_subqueries_inside_expression_wrappers() {
5646        for sql in [
5647            "WITH c AS (SELECT a, b FROM t) \
5648             SELECT CASE WHEN c.a > 0 THEN c.b ELSE (SELECT MAX(z) FROM o) END AS r FROM c",
5649            "WITH c AS (SELECT a FROM t) \
5650             SELECT COALESCE(c.a, (SELECT MAX(z) FROM o)) AS r FROM c",
5651            "WITH c AS (SELECT a FROM t) \
5652             SELECT CAST((SELECT MAX(z) FROM o) AS INT) + c.a AS r FROM c",
5653            "WITH c AS (SELECT a FROM t) \
5654             SELECT CASE WHEN c.a BETWEEN 0 AND (SELECT MAX(z) FROM o) THEN c.a END AS r FROM c",
5655        ] {
5656            let expr = parse_dialect(sql, DialectType::DuckDB);
5657            let node = lineage("r", &expr, Some(DialectType::DuckDB), false)
5658                .unwrap_or_else(|error| panic!("lineage failed for {sql}: {error}"));
5659
5660            assert_lineage_contains(&node, "o.z");
5661            assert_lineage_contains(&node, "t.a");
5662        }
5663    }
5664
5665    #[test]
5666    fn test_lineage_nested_set_operation_inside_derived_table() {
5667        let expr = parse_dialect(
5668            "SELECT v FROM ((SELECT v FROM t1 UNION ALL SELECT v FROM t2) \
5669             UNION ALL SELECT v FROM t3) u",
5670            DialectType::DuckDB,
5671        );
5672        let node = lineage("v", &expr, Some(DialectType::DuckDB), false).unwrap();
5673
5674        assert_lineage_contains(&node, "t1.v");
5675        assert_lineage_contains(&node, "t2.v");
5676        assert_lineage_contains(&node, "t3.v");
5677    }
5678
5679    #[test]
5680    fn test_lineage_select_alias_reference_resolves_to_alias_source() {
5681        let expr = parse_dialect(
5682            "WITH c AS (SELECT x FROM t) SELECT c.x AS a, a + 1 AS b FROM c",
5683            DialectType::DuckDB,
5684        );
5685        let node = lineage("b", &expr, Some(DialectType::DuckDB), false).unwrap();
5686
5687        assert_lineage_contains(&node, "t.x");
5688    }
5689
5690    #[test]
5691    fn test_lineage_pivot_output_resolves_aggregation_input() {
5692        let expr = parse_dialect(
5693            "SELECT * FROM (SELECT region, q, amt FROM sales) \
5694             PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5695            DialectType::DuckDB,
5696        );
5697        let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5698
5699        assert_lineage_contains(&node, "sales.amt");
5700    }
5701
5702    #[test]
5703    fn test_lineage_pivot_multi_aggregate_and_alias_columns() {
5704        let multi = parse_dialect(
5705            "SELECT * FROM (SELECT category, value, price FROM t) \
5706             PIVOT(SUM(value) AS value_sum, MAX(price) FOR category IN ('a' AS cat_a, 'b'))",
5707            DialectType::DuckDB,
5708        );
5709        let value_sum =
5710            lineage("cat_a_value_sum", &multi, Some(DialectType::DuckDB), false).unwrap();
5711        assert_lineage_contains(&value_sum, "t.value");
5712
5713        let max_price =
5714            lineage("cat_a_max(price)", &multi, Some(DialectType::DuckDB), false).unwrap();
5715        assert_lineage_contains(&max_price, "t.price");
5716
5717        let aliased = parse_dialect(
5718            "SELECT * FROM (SELECT region, q, amt FROM sales) \
5719             PIVOT(SUM(amt) FOR q IN ('Q1')) AS p(region2, p1)",
5720            DialectType::DuckDB,
5721        );
5722        let region = lineage("region2", &aliased, Some(DialectType::DuckDB), false).unwrap();
5723        assert_lineage_contains(&region, "sales.region");
5724
5725        let pivot_value = lineage("p1", &aliased, Some(DialectType::DuckDB), false).unwrap();
5726        assert_lineage_contains(&pivot_value, "sales.amt");
5727    }
5728
5729    #[test]
5730    fn test_lineage_pivot_through_cte_resolves_aggregation_input() {
5731        let expr = parse_dialect(
5732            "WITH src AS (SELECT region, q, amt FROM sales) \
5733             SELECT q1 FROM src PIVOT(SUM(amt) FOR q IN ('Q1' AS q1))",
5734            DialectType::DuckDB,
5735        );
5736        let node = lineage("q1", &expr, Some(DialectType::DuckDB), false).unwrap();
5737
5738        assert_lineage_contains(&node, "sales.amt");
5739    }
5740
5741    #[test]
5742    fn test_lineage_unpivot_value_resolves_input_columns() {
5743        let expr = parse_dialect(
5744            "SELECT name, val FROM t UNPIVOT(val FOR col IN (a, b, c))",
5745            DialectType::DuckDB,
5746        );
5747        let node = lineage("val", &expr, Some(DialectType::DuckDB), false).unwrap();
5748
5749        assert_lineage_contains(&node, "t.a");
5750        assert_lineage_contains(&node, "t.b");
5751        assert_lineage_contains(&node, "t.c");
5752    }
5753
5754    #[test]
5755    fn test_lineage_unpivot_multi_value_columns_resolve_positionally() {
5756        let expr = parse_dialect(
5757            "SELECT first_half_sales, second_half_sales, semester \
5758             FROM produce \
5759             UNPIVOT((first_half_sales, second_half_sales) \
5760             FOR semester IN ((q1, q2) AS 'semester_1', (q3, q4) AS 'semester_2'))",
5761            DialectType::BigQuery,
5762        );
5763
5764        let first = lineage(
5765            "first_half_sales",
5766            &expr,
5767            Some(DialectType::BigQuery),
5768            false,
5769        )
5770        .unwrap();
5771        assert_lineage_contains(&first, "produce.q1");
5772        assert_lineage_contains(&first, "produce.q3");
5773
5774        let second = lineage(
5775            "second_half_sales",
5776            &expr,
5777            Some(DialectType::BigQuery),
5778            false,
5779        )
5780        .unwrap();
5781        assert_lineage_contains(&second, "produce.q2");
5782        assert_lineage_contains(&second, "produce.q4");
5783    }
5784
5785    #[test]
5786    fn test_lineage_top_level_union_over_ctes_reaches_base_tables() {
5787        let expr = parse(
5788            "WITH a AS (SELECT x FROM t1), b AS (SELECT x FROM t2) \
5789             SELECT x FROM a UNION SELECT x FROM b",
5790        );
5791        let node = lineage("x", &expr, None, false).unwrap();
5792
5793        assert_lineage_contains(&node, "t1.x");
5794        assert_lineage_contains(&node, "t2.x");
5795    }
5796
5797    #[test]
5798    fn test_lineage_star_excludes_semi_join_rhs_source() {
5799        let expr = parse_dialect(
5800            "SELECT * FROM orders LEFT SEMI JOIN customers ON orders.customer_id = customers.id",
5801            DialectType::DuckDB,
5802        );
5803        let node = lineage("customer_id", &expr, Some(DialectType::DuckDB), false).unwrap();
5804
5805        assert_lineage_contains(&node, "orders.customer_id");
5806    }
5807
5808    // --- Comment handling tests (ported from sqlglot test_lineage.py) ---
5809
5810    /// sqlglot: test_node_name_doesnt_contain_comment
5811    /// Comments in column expressions should not affect lineage resolution.
5812    /// NOTE: This test uses SELECT * from a derived table, which is a separate
5813    /// known limitation in polyglot-sql (star expansion in subqueries).
5814    #[test]
5815    #[ignore = "requires derived table star expansion (separate issue)"]
5816    fn test_node_name_doesnt_contain_comment() {
5817        let expr = parse("SELECT * FROM (SELECT x /* c */ FROM t1) AS t2");
5818        let node = lineage("x", &expr, None, false).unwrap();
5819
5820        assert_eq!(node.name, "x");
5821        assert!(!node.downstream.is_empty());
5822    }
5823
5824    /// A line comment between SELECT and the first column wraps the column
5825    /// in an Annotated node. Lineage must unwrap it to find the column name.
5826    /// Verify that commented and uncommented queries produce identical lineage.
5827    #[test]
5828    fn test_comment_before_first_column_in_cte() {
5829        let sql_with_comment = "with t as (select 1 as a) select\n  -- comment\n  a from t";
5830        let sql_without_comment = "with t as (select 1 as a) select a from t";
5831
5832        // Without comment — baseline
5833        let expr_ok = parse(sql_without_comment);
5834        let node_ok = lineage("a", &expr_ok, None, false).expect("without comment should succeed");
5835
5836        // With comment — should produce identical lineage
5837        let expr_comment = parse(sql_with_comment);
5838        let node_comment = lineage("a", &expr_comment, None, false)
5839            .expect("with comment before first column should succeed");
5840
5841        assert_eq!(node_ok.name, node_comment.name, "node names should match");
5842        assert_eq!(
5843            node_ok.downstream_names(),
5844            node_comment.downstream_names(),
5845            "downstream lineage should be identical with or without comment"
5846        );
5847    }
5848
5849    /// Block comment between SELECT and first column.
5850    #[test]
5851    fn test_block_comment_before_first_column() {
5852        let sql = "with t as (select 1 as a) select /* section */ a from t";
5853        let expr = parse(sql);
5854        let node = lineage("a", &expr, None, false)
5855            .expect("block comment before first column should succeed");
5856        assert_eq!(node.name, "a");
5857        assert!(
5858            !node.downstream.is_empty(),
5859            "should have downstream lineage"
5860        );
5861    }
5862
5863    /// Comment before first column should not affect second column resolution.
5864    #[test]
5865    fn test_comment_before_first_column_second_col_ok() {
5866        let sql = "with t as (select 1 as a, 2 as b) select\n  -- comment\n  a, b from t";
5867        let expr = parse(sql);
5868
5869        let node_a =
5870            lineage("a", &expr, None, false).expect("column a with comment should succeed");
5871        assert_eq!(node_a.name, "a");
5872
5873        let node_b =
5874            lineage("b", &expr, None, false).expect("column b with comment should succeed");
5875        assert_eq!(node_b.name, "b");
5876    }
5877
5878    /// Aliased column with preceding comment.
5879    #[test]
5880    fn test_comment_before_aliased_column() {
5881        let sql = "with t as (select 1 as x) select\n  -- renamed\n  x as y from t";
5882        let expr = parse(sql);
5883        let node =
5884            lineage("y", &expr, None, false).expect("aliased column with comment should succeed");
5885        assert_eq!(node.name, "y");
5886        assert!(
5887            !node.downstream.is_empty(),
5888            "aliased column should have downstream lineage"
5889        );
5890    }
5891}