Skip to main content

sqlglot_rust/optimizer/
lineage.rs

1//! Column lineage tracking for SQL queries.
2//!
3//! Provides functionality to trace data flow from source columns through
4//! query transformations to output columns. This is the foundation for
5//! data governance tools and impact analysis.
6//!
7//! Inspired by Python sqlglot's `lineage.py`.
8//!
9//! # Example
10//!
11//! ```rust
12//! use sqlglot_rust::parser::parse;
13//! use sqlglot_rust::dialects::Dialect;
14//! use sqlglot_rust::optimizer::lineage::{lineage, LineageConfig};
15//! use sqlglot_rust::schema::MappingSchema;
16//!
17//! let sql = "SELECT a, b + 1 AS c FROM t";
18//! let ast = parse(sql, Dialect::Ansi).unwrap();
19//! let schema = MappingSchema::new(Dialect::Ansi);
20//! let config = LineageConfig::default();
21//!
22//! let graph = lineage("c", &ast, &schema, &config).unwrap();
23//! assert_eq!(graph.node.name, "c");
24//! ```
25
26use std::collections::{HashMap, HashSet};
27
28use crate::ast::*;
29use crate::dialects::Dialect;
30use crate::errors::SqlglotError;
31use crate::schema::{MappingSchema, Schema};
32
33// ═══════════════════════════════════════════════════════════════════════
34// Error types
35// ═══════════════════════════════════════════════════════════════════════
36
37/// Errors specific to lineage operations.
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub enum LineageError {
40    /// The target column was not found in the output.
41    ColumnNotFound(String),
42    /// Ambiguous column reference (multiple sources).
43    AmbiguousColumn(String),
44    /// Invalid query structure for lineage analysis.
45    InvalidQuery(String),
46    /// A parsing error occurred.
47    ParseError(String),
48}
49
50impl std::fmt::Display for LineageError {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        match self {
53            LineageError::ColumnNotFound(c) => write!(f, "Column not found in output: {c}"),
54            LineageError::AmbiguousColumn(c) => write!(f, "Ambiguous column reference: {c}"),
55            LineageError::InvalidQuery(msg) => write!(f, "Invalid query for lineage: {msg}"),
56            LineageError::ParseError(msg) => write!(f, "Parse error: {msg}"),
57        }
58    }
59}
60
61impl std::error::Error for LineageError {}
62
63impl From<LineageError> for SqlglotError {
64    fn from(e: LineageError) -> Self {
65        SqlglotError::Internal(e.to_string())
66    }
67}
68
69/// Result type for lineage operations.
70pub type LineageResult<T> = std::result::Result<T, LineageError>;
71
72// ═══════════════════════════════════════════════════════════════════════
73// Configuration
74// ═══════════════════════════════════════════════════════════════════════
75
76/// Configuration for lineage analysis.
77#[derive(Debug, Clone)]
78pub struct LineageConfig {
79    /// SQL dialect for parsing and identifier normalization.
80    pub dialect: Dialect,
81    /// Whether to trim column qualifiers in output node names.
82    pub trim_qualifiers: bool,
83    /// External sources mapping for multi-query lineage.
84    /// Maps source names to their SQL definitions (e.g., views).
85    pub sources: HashMap<String, String>,
86}
87
88impl Default for LineageConfig {
89    fn default() -> Self {
90        Self {
91            dialect: Dialect::Ansi,
92            trim_qualifiers: true,
93            sources: HashMap::new(),
94        }
95    }
96}
97
98impl LineageConfig {
99    /// Create a new configuration with the specified dialect.
100    #[must_use]
101    pub fn new(dialect: Dialect) -> Self {
102        Self {
103            dialect,
104            ..Default::default()
105        }
106    }
107
108    /// Add external sources for multi-query lineage.
109    #[must_use]
110    pub fn with_sources(mut self, sources: HashMap<String, String>) -> Self {
111        self.sources = sources;
112        self
113    }
114
115    /// Set whether to trim table qualifiers from output names.
116    #[must_use]
117    pub fn with_trim_qualifiers(mut self, trim: bool) -> Self {
118        self.trim_qualifiers = trim;
119        self
120    }
121}
122
123// ═══════════════════════════════════════════════════════════════════════
124// Lineage Node
125// ═══════════════════════════════════════════════════════════════════════
126
127/// A node in the lineage graph representing a column or expression.
128#[derive(Debug, Clone)]
129pub struct LineageNode {
130    /// The name of this column/expression (e.g., "a", "SUM(b)", "t.col").
131    pub name: String,
132    /// The AST expression this node represents.
133    pub expression: Option<Expr>,
134    /// The source table/CTE/subquery name, if applicable.
135    pub source_name: Option<String>,
136    /// Reference to the source AST (for complex expressions).
137    pub source: Option<Expr>,
138    /// Child nodes (upstream lineage - where data comes from).
139    pub downstream: Vec<LineageNode>,
140    /// The alias, if this is an aliased expression.
141    pub alias: Option<String>,
142    /// Depth in the lineage graph (0 = root output column).
143    pub depth: usize,
144}
145
146impl LineageNode {
147    /// Create a new lineage node.
148    #[must_use]
149    pub fn new(name: String) -> Self {
150        Self {
151            name,
152            expression: None,
153            source_name: None,
154            source: None,
155            downstream: Vec::new(),
156            alias: None,
157            depth: 0,
158        }
159    }
160
161    /// Create a node with source information.
162    #[must_use]
163    pub fn with_source(mut self, source_name: String) -> Self {
164        self.source_name = Some(source_name);
165        self
166    }
167
168    /// Create a node with an expression.
169    #[must_use]
170    pub fn with_expression(mut self, expr: Expr) -> Self {
171        self.expression = Some(expr);
172        self
173    }
174
175    /// Create a node with an alias.
176    #[must_use]
177    #[allow(dead_code)]
178    pub fn with_alias(mut self, alias: String) -> Self {
179        self.alias = Some(alias);
180        self
181    }
182
183    /// Create a node with depth.
184    #[must_use]
185    pub fn with_depth(mut self, depth: usize) -> Self {
186        self.depth = depth;
187        self
188    }
189
190    /// Add a downstream (upstream lineage) node.
191    #[allow(dead_code)]
192    pub fn add_downstream(&mut self, node: LineageNode) {
193        self.downstream.push(node);
194    }
195
196    /// Walk through all nodes in the lineage graph (pre-order).
197    pub fn walk<F>(&self, visitor: &mut F)
198    where
199        F: FnMut(&LineageNode),
200    {
201        visitor(self);
202        for child in &self.downstream {
203            child.walk(visitor);
204        }
205    }
206
207    /// Iterate over all nodes in the lineage graph.
208    #[must_use]
209    pub fn iter(&self) -> LineageIterator<'_> {
210        LineageIterator {
211            stack: vec![self],
212        }
213    }
214
215    /// Get all source columns (leaf nodes) in this lineage.
216    #[must_use]
217    #[allow(dead_code)]
218    pub fn source_columns(&self) -> Vec<&LineageNode> {
219        self.iter().filter(|n| n.downstream.is_empty()).collect()
220    }
221
222    /// Get all source table names referenced in this lineage.
223    #[must_use]
224    pub fn source_tables(&self) -> Vec<String> {
225        let mut tables = HashSet::new();
226        for node in self.iter() {
227            if let Some(ref source) = node.source_name {
228                tables.insert(source.clone());
229            }
230        }
231        tables.into_iter().collect()
232    }
233
234    /// Generate DOT format representation for visualization.
235    #[must_use]
236    pub fn to_dot(&self) -> String {
237        let mut dot = String::from("digraph lineage {\n");
238        dot.push_str("  rankdir=BT;\n");
239        dot.push_str("  node [shape=box];\n");
240        
241        let mut node_id = 0;
242        let mut node_ids = HashMap::new();
243        
244        // First pass: assign IDs and create nodes
245        self.walk(&mut |node| {
246            let id = format!("n{}", node_id);
247            let label = if let Some(ref src) = node.source_name {
248                format!("{}.{}", src, node.name)
249            } else {
250                node.name.clone()
251            };
252            dot.push_str(&format!("  {} [label=\"{}\"];\n", id, escape_dot(&label)));
253            node_ids.insert(node as *const _ as usize, id);
254            node_id += 1;
255        });
256        
257        // Second pass: create edges
258        self.walk(&mut |node| {
259            let parent_id = node_ids.get(&(node as *const _ as usize)).unwrap();
260            for child in &node.downstream {
261                let child_id = node_ids.get(&(child as *const _ as usize)).unwrap();
262                dot.push_str(&format!("  {} -> {};\n", child_id, parent_id));
263            }
264        });
265        
266        dot.push_str("}\n");
267        dot
268    }
269
270    /// Generate Mermaid diagram representation.
271    #[must_use]
272    pub fn to_mermaid(&self) -> String {
273        let mut mermaid = String::from("flowchart BT\n");
274        
275        let mut node_id = 0;
276        let mut node_ids = HashMap::new();
277        
278        // First pass: assign IDs and create nodes
279        self.walk(&mut |node| {
280            let id = format!("n{}", node_id);
281            let label = if let Some(ref src) = node.source_name {
282                format!("{}.{}", src, node.name)
283            } else {
284                node.name.clone()
285            };
286            mermaid.push_str(&format!("  {}[\"{}\"]\n", id, escape_mermaid(&label)));
287            node_ids.insert(node as *const _ as usize, id);
288            node_id += 1;
289        });
290        
291        // Second pass: create edges
292        self.walk(&mut |node| {
293            let parent_id = node_ids.get(&(node as *const _ as usize)).unwrap();
294            for child in &node.downstream {
295                let child_id = node_ids.get(&(child as *const _ as usize)).unwrap();
296                mermaid.push_str(&format!("  {} --> {}\n", child_id, parent_id));
297            }
298        });
299        
300        mermaid
301    }
302}
303
304/// Iterator over lineage nodes (pre-order traversal).
305pub struct LineageIterator<'a> {
306    stack: Vec<&'a LineageNode>,
307}
308
309impl<'a> Iterator for LineageIterator<'a> {
310    type Item = &'a LineageNode;
311
312    fn next(&mut self) -> Option<Self::Item> {
313        self.stack.pop().map(|node| {
314            // Push children in reverse order for pre-order traversal
315            for child in node.downstream.iter().rev() {
316                self.stack.push(child);
317            }
318            node
319        })
320    }
321}
322
323// ═══════════════════════════════════════════════════════════════════════
324// Lineage Graph
325// ═══════════════════════════════════════════════════════════════════════
326
327/// A lineage graph rooted at a specific output column.
328#[derive(Debug, Clone)]
329pub struct LineageGraph {
330    /// The root node representing the target output column.
331    pub node: LineageNode,
332    /// The original SQL that was analyzed.
333    pub sql: Option<String>,
334    /// The dialect used for analysis.
335    pub dialect: Dialect,
336}
337
338impl LineageGraph {
339    /// Create a new lineage graph.
340    #[must_use]
341    pub fn new(node: LineageNode, dialect: Dialect) -> Self {
342        Self {
343            node,
344            sql: None,
345            dialect,
346        }
347    }
348
349    /// Set the original SQL string.
350    #[must_use]
351    #[allow(dead_code)]
352    pub fn with_sql(mut self, sql: String) -> Self {
353        self.sql = Some(sql);
354        self
355    }
356
357    /// Get all source tables in the lineage.
358    #[must_use]
359    pub fn source_tables(&self) -> Vec<String> {
360        self.node.source_tables()
361    }
362
363    /// Get all source columns (leaf nodes).
364    #[must_use]
365    #[allow(dead_code)]
366    pub fn source_columns(&self) -> Vec<&LineageNode> {
367        self.node.source_columns()
368    }
369
370    /// Walk through all nodes in the graph.
371    #[allow(dead_code)]
372    pub fn walk<F>(&self, visitor: &mut F)
373    where
374        F: FnMut(&LineageNode),
375    {
376        self.node.walk(visitor);
377    }
378
379    /// Generate DOT format visualization.
380    #[must_use]
381    pub fn to_dot(&self) -> String {
382        self.node.to_dot()
383    }
384
385    /// Generate Mermaid diagram visualization.
386    #[must_use]
387    pub fn to_mermaid(&self) -> String {
388        self.node.to_mermaid()
389    }
390}
391
392// ═══════════════════════════════════════════════════════════════════════
393// Context for lineage building
394// ═══════════════════════════════════════════════════════════════════════
395
396/// Internal context for building lineage graphs.
397struct LineageContext {
398    /// The schema for column resolution.
399    schema: MappingSchema,
400    /// Configuration options.
401    config: LineageConfig,
402    /// Current depth in the lineage graph.
403    depth: usize,
404    /// CTE definitions available in this scope (owned).
405    ctes: HashMap<String, Statement>,
406    /// Visible sources in current scope (alias/name → source info).
407    sources: HashMap<String, SourceInfo>,
408    /// External sources for multi-query lineage.
409    external_sources: HashMap<String, Statement>,
410    /// Sources currently being visited (to prevent infinite recursion).
411    visiting: HashSet<String>,
412}
413
414/// Information about a source (table, CTE, derived table).
415#[derive(Debug, Clone)]
416struct SourceInfo {
417    /// The source type.
418    kind: SourceKind,
419    /// For subqueries/CTEs, the SELECT columns.
420    columns: Option<Vec<SelectItem>>,
421    /// The underlying statement, if any (owned).
422    statement: Option<Statement>,
423}
424
425#[derive(Debug, Clone, Copy, PartialEq, Eq)]
426#[allow(dead_code)]
427enum SourceKind {
428    Table,
429    Cte,
430    DerivedTable,
431    External,
432}
433
434impl LineageContext {
435    fn new(schema: &MappingSchema, config: &LineageConfig) -> Self {
436        Self {
437            schema: schema.clone(),
438            config: config.clone(),
439            depth: 0,
440            ctes: HashMap::new(),
441            sources: HashMap::new(),
442            external_sources: HashMap::new(),
443            visiting: HashSet::new(),
444        }
445    }
446
447    fn with_depth(&self, depth: usize) -> Self {
448        Self {
449            schema: self.schema.clone(),
450            config: self.config.clone(),
451            depth,
452            ctes: self.ctes.clone(),
453            sources: self.sources.clone(),
454            external_sources: self.external_sources.clone(),
455            visiting: self.visiting.clone(),
456        }
457    }
458
459    #[allow(dead_code)]
460    fn resolve_source(&self, name: &str) -> Option<&SourceInfo> {
461        let normalized = normalize_name(name, self.config.dialect);
462        self.sources.get(&normalized)
463    }
464}
465
466// ═══════════════════════════════════════════════════════════════════════
467// Public API
468// ═══════════════════════════════════════════════════════════════════════
469
470/// Build lineage for a specific output column in a SQL statement.
471///
472/// # Arguments
473///
474/// * `column` - The name of the output column to trace (can include table qualifier).
475/// * `statement` - The parsed SQL statement.
476/// * `schema` - Schema information for table/column resolution.
477/// * `config` - Configuration options.
478///
479/// # Returns
480///
481/// A [`LineageGraph`] rooted at the target column, showing its upstream lineage.
482///
483/// # Errors
484///
485/// Returns [`LineageError::ColumnNotFound`] if the column is not in the output.
486///
487/// # Example
488///
489/// ```rust
490/// use sqlglot_rust::parser::parse;
491/// use sqlglot_rust::dialects::Dialect;
492/// use sqlglot_rust::optimizer::lineage::{lineage, LineageConfig};
493/// use sqlglot_rust::schema::MappingSchema;
494///
495/// let sql = "SELECT a, b AS c FROM t";
496/// let ast = parse(sql, Dialect::Ansi).unwrap();
497/// let schema = MappingSchema::new(Dialect::Ansi);
498/// let config = LineageConfig::default();
499///
500/// let graph = lineage("c", &ast, &schema, &config).unwrap();
501/// assert_eq!(graph.node.name, "c");
502/// ```
503pub fn lineage(
504    column: &str,
505    statement: &Statement,
506    schema: &MappingSchema,
507    config: &LineageConfig,
508) -> LineageResult<LineageGraph> {
509    // Parse external sources if provided
510    let mut ctx = LineageContext::new(schema, config);
511    
512    for (name, sql) in &config.sources {
513        match crate::parser::parse(sql, config.dialect) {
514            Ok(stmt) => {
515                ctx.external_sources.insert(normalize_name(name, config.dialect), stmt);
516            }
517            Err(e) => return Err(LineageError::ParseError(e.to_string())),
518        }
519    }
520
521    // Build lineage for the target column
522    let node = build_lineage_for_column(column, statement, &mut ctx)?;
523    
524    Ok(LineageGraph::new(node, config.dialect))
525}
526
527/// Build lineage from a SQL string.
528///
529/// Convenience function that parses the SQL and builds lineage.
530///
531/// # Example
532///
533/// ```rust
534/// use sqlglot_rust::dialects::Dialect;
535/// use sqlglot_rust::optimizer::lineage::{lineage_sql, LineageConfig};
536/// use sqlglot_rust::schema::MappingSchema;
537///
538/// let schema = MappingSchema::new(Dialect::Ansi);
539/// let config = LineageConfig::default();
540///
541/// let graph = lineage_sql("c", "SELECT a + b AS c FROM t", &schema, &config).unwrap();
542/// assert_eq!(graph.node.name, "c");
543/// ```
544pub fn lineage_sql(
545    column: &str,
546    sql: &str,
547    schema: &MappingSchema,
548    config: &LineageConfig,
549) -> LineageResult<LineageGraph> {
550    let statement = crate::parser::parse(sql, config.dialect)
551        .map_err(|e| LineageError::ParseError(e.to_string()))?;
552    
553    let mut graph = lineage(column, &statement, schema, config)?;
554    graph.sql = Some(sql.to_string());
555    Ok(graph)
556}
557
558// ═══════════════════════════════════════════════════════════════════════
559// Internal lineage building
560// ═══════════════════════════════════════════════════════════════════════
561
562/// Build lineage for a specific column in a statement.
563fn build_lineage_for_column(
564    column: &str,
565    statement: &Statement,
566    ctx: &mut LineageContext,
567) -> LineageResult<LineageNode> {
568    match statement {
569        Statement::Select(sel) => build_lineage_for_select_column(column, sel, ctx),
570        Statement::SetOperation(set_op) => build_lineage_for_set_operation(column, set_op, ctx),
571        Statement::CreateView(cv) => build_lineage_for_column(column, &cv.query, ctx),
572        _ => Err(LineageError::InvalidQuery(
573            "Lineage analysis requires a SELECT or set operation statement".to_string(),
574        )),
575    }
576}
577
578/// Build lineage for a column in a SELECT statement.
579fn build_lineage_for_select_column(
580    column: &str,
581    sel: &SelectStatement,
582    ctx: &mut LineageContext,
583) -> LineageResult<LineageNode> {
584    // Register CTEs (cloning to avoid lifetime issues)
585    for cte in &sel.ctes {
586        let cte_name = normalize_name(&cte.name, ctx.config.dialect);
587        ctx.ctes.insert(cte_name.clone(), (*cte.query).clone());
588        ctx.sources.insert(
589            cte_name,
590            SourceInfo {
591                kind: SourceKind::Cte,
592                columns: extract_select_columns(&cte.query),
593                statement: Some((*cte.query).clone()),
594            },
595        );
596    }
597
598    // Register FROM source
599    if let Some(from) = &sel.from {
600        register_table_source(&from.source, ctx);
601    }
602
603    // Register JOINs
604    for join in &sel.joins {
605        register_table_source(&join.table, ctx);
606    }
607
608    // Find the target column in the SELECT list
609    let (col_name, table_qual) = parse_column_ref(column);
610    
611    for item in &sel.columns {
612        match item {
613            SelectItem::Expr { expr, alias } => {
614                let item_name = alias.as_ref().map(String::as_str).unwrap_or_else(|| {
615                    expr_output_name(expr)
616                });
617                
618                if matches_column_name(item_name, &col_name) {
619                    return build_lineage_for_expr(expr, alias.clone(), ctx);
620                }
621            }
622            SelectItem::Wildcard => {
623                // Expand wildcard - check all sources
624                for (source_name, source_info) in ctx.sources.clone() {
625                    if let Some(cols) = &source_info.columns {
626                        for col_item in cols {
627                            if let SelectItem::Expr { expr, alias } = col_item {
628                                let item_name = alias.as_ref().map(String::as_str).unwrap_or_else(|| {
629                                    expr_output_name(expr)
630                                });
631                                if matches_column_name(item_name, &col_name) {
632                                    return build_lineage_for_expr(expr, alias.clone(), ctx);
633                                }
634                            }
635                        }
636                    } else if source_info.kind == SourceKind::Table {
637                        // Check schema for table columns
638                        if let Ok(schema_cols) = ctx.schema.column_names(&[&source_name]) {
639                            if schema_cols.iter().any(|c| matches_column_name(c, &col_name)) {
640                                // Found in schema
641                                let mut node = LineageNode::new(col_name.clone())
642                                    .with_source(source_name.clone())
643                                    .with_depth(ctx.depth);
644                                node.expression = Some(Expr::Column {
645                                    table: Some(source_name.clone()),
646                                    name: col_name.clone(),
647                                    quote_style: QuoteStyle::None,
648                                    table_quote_style: QuoteStyle::None,
649                                });
650                                return Ok(node);
651                            }
652                        }
653                    }
654                }
655            }
656            SelectItem::QualifiedWildcard { table } => {
657                if table_qual.as_ref().is_some_and(|t| matches_column_name(t, table)) {
658                    // Check if column exists in this table
659                    if let Some(source_info) = ctx.sources.get(table).cloned() {
660                        if let Some(cols) = &source_info.columns {
661                            for col_item in cols {
662                                if let SelectItem::Expr { expr, alias } = col_item {
663                                    let item_name = alias.as_ref().map(String::as_str).unwrap_or_else(|| {
664                                        expr_output_name(expr)
665                                    });
666                                    if matches_column_name(item_name, &col_name) {
667                                        return build_lineage_for_expr(expr, alias.clone(), ctx);
668                                    }
669                                }
670                            }
671                        }
672                    }
673                }
674            }
675        }
676    }
677
678    Err(LineageError::ColumnNotFound(column.to_string()))
679}
680
681/// Build lineage for a set operation (UNION, INTERSECT, EXCEPT).
682fn build_lineage_for_set_operation(
683    column: &str,
684    set_op: &SetOperationStatement,
685    ctx: &mut LineageContext,
686) -> LineageResult<LineageNode> {
687    let mut root = LineageNode::new(column.to_string()).with_depth(ctx.depth);
688    
689    // Build lineage from both branches
690    let mut child_ctx = ctx.with_depth(ctx.depth + 1);
691    
692    let left_lineage = build_lineage_for_column(column, &set_op.left, &mut child_ctx)?;
693    let right_lineage = build_lineage_for_column(column, &set_op.right, &mut child_ctx)?;
694    
695    root.downstream.push(left_lineage);
696    root.downstream.push(right_lineage);
697    
698    Ok(root)
699}
700
701/// Build lineage for an expression.
702fn build_lineage_for_expr(
703    expr: &Expr,
704    alias: Option<String>,
705    ctx: &mut LineageContext,
706) -> LineageResult<LineageNode> {
707    let name = alias.clone().unwrap_or_else(|| expr_to_name(expr, ctx.config.trim_qualifiers));
708    let mut node = LineageNode::new(name.clone())
709        .with_expression(expr.clone())
710        .with_depth(ctx.depth);
711    
712    if let Some(a) = alias {
713        node.alias = Some(a);
714    }
715
716    // Collect column references from the expression
717    let columns = collect_expr_columns(expr);
718    
719    let mut child_ctx = ctx.with_depth(ctx.depth + 1);
720    
721    for col_ref in columns {
722        let child_node = resolve_column_lineage(&col_ref, &mut child_ctx)?;
723        node.downstream.push(child_node);
724    }
725    
726    Ok(node)
727}
728
729/// Resolve lineage for a column reference.
730fn resolve_column_lineage(
731    col: &ColumnReference,
732    ctx: &mut LineageContext,
733) -> LineageResult<LineageNode> {
734    let name = if ctx.config.trim_qualifiers {
735        col.name.clone()
736    } else {
737        col.qualified_name()
738    };
739    
740    // If table qualifier is provided, look up in that source
741    if let Some(ref table) = col.table {
742        let normalized_table = normalize_name(table, ctx.config.dialect);
743        
744        if let Some(source_info) = ctx.sources.get(&normalized_table).cloned() {
745            match source_info.kind {
746                SourceKind::Table => {
747                    // Base table - this is a leaf node
748                    let node = LineageNode::new(name)
749                        .with_source(normalized_table)
750                        .with_depth(ctx.depth);
751                    return Ok(node);
752                }
753                SourceKind::Cte | SourceKind::DerivedTable => {
754                    // Recurse into CTE/derived table (if not already visiting)
755                    if !ctx.visiting.contains(&normalized_table) {
756                        if let Some(stmt) = source_info.statement {
757                            ctx.visiting.insert(normalized_table.clone());
758                            let result = build_lineage_for_column(&col.name, &stmt, ctx);
759                            ctx.visiting.remove(&normalized_table);
760                            return result;
761                        }
762                    }
763                    // If already visiting, treat as leaf
764                    let node = LineageNode::new(name)
765                        .with_source(normalized_table)
766                        .with_depth(ctx.depth);
767                    return Ok(node);
768                }
769                SourceKind::External => {
770                    // Check external sources
771                    if let Some(stmt) = ctx.external_sources.get(&normalized_table).cloned() {
772                        return build_lineage_for_column(&col.name, &stmt, ctx);
773                    }
774                }
775            }
776        }
777    }
778    
779    // No table qualifier - search all sources
780    for (source_name, source_info) in ctx.sources.clone() {
781        match source_info.kind {
782            SourceKind::Table => {
783                // Check if this table has the column
784                if ctx.schema.has_column(&[&source_name], &col.name) {
785                    let node = LineageNode::new(name)
786                        .with_source(source_name.clone())
787                        .with_depth(ctx.depth);
788                    return Ok(node);
789                }
790            }
791            SourceKind::Cte | SourceKind::DerivedTable => {
792                // Skip if already visiting this source
793                if ctx.visiting.contains(&source_name) {
794                    continue;
795                }
796                // Check if CTE/derived table outputs this column
797                if let Some(ref columns) = source_info.columns {
798                    if columns.iter().any(|c| select_item_has_column(c, &col.name)) {
799                        if let Some(stmt) = source_info.statement {
800                            ctx.visiting.insert(source_name.clone());
801                            let result = build_lineage_for_column(&col.name, &stmt, ctx);
802                            ctx.visiting.remove(&source_name);
803                            return result;
804                        }
805                    }
806                }
807            }
808            SourceKind::External => {}
809        }
810    }
811    
812    // Column not found in any known source - treat as external/unknown
813    let node = LineageNode::new(name).with_depth(ctx.depth);
814    Ok(node)
815}
816
817/// Register a table source in the context.
818fn register_table_source(source: &TableSource, ctx: &mut LineageContext) {
819    match source {
820        TableSource::Table(table_ref) => {
821            let key = table_ref.alias.as_ref().unwrap_or(&table_ref.name).clone();
822            let normalized = normalize_name(&key, ctx.config.dialect);
823            // Don't overwrite CTEs or derived tables
824            if !ctx.sources.contains_key(&normalized) {
825                ctx.sources.insert(
826                    normalized,
827                    SourceInfo {
828                        kind: SourceKind::Table,
829                        columns: None,
830                        statement: None,
831                    },
832                );
833            }
834        }
835        TableSource::Subquery { query, alias } => {
836            if let Some(alias) = alias {
837                let normalized = normalize_name(alias, ctx.config.dialect);
838                ctx.sources.insert(
839                    normalized,
840                    SourceInfo {
841                        kind: SourceKind::DerivedTable,
842                        columns: extract_select_columns(query),
843                        statement: Some((**query).clone()),
844                    },
845                );
846            }
847        }
848        TableSource::Lateral { source } => {
849            register_table_source(source, ctx);
850        }
851        TableSource::Pivot { source, alias, .. } | TableSource::Unpivot { source, alias, .. } => {
852            register_table_source(source, ctx);
853            // TODO: Track pivot/unpivot column transformations
854            if let Some(alias) = alias {
855                let normalized = normalize_name(alias, ctx.config.dialect);
856                ctx.sources.insert(
857                    normalized,
858                    SourceInfo {
859                        kind: SourceKind::DerivedTable,
860                        columns: None,
861                        statement: None,
862                    },
863                );
864            }
865        }
866        TableSource::TableFunction { alias, .. } => {
867            if let Some(alias) = alias {
868                let normalized = normalize_name(alias, ctx.config.dialect);
869                ctx.sources.insert(
870                    normalized,
871                    SourceInfo {
872                        kind: SourceKind::Table,
873                        columns: None,
874                        statement: None,
875                    },
876                );
877            }
878        }
879        TableSource::Unnest { alias, .. } => {
880            if let Some(alias) = alias {
881                let normalized = normalize_name(alias, ctx.config.dialect);
882                ctx.sources.insert(
883                    normalized,
884                    SourceInfo {
885                        kind: SourceKind::Table,
886                        columns: None,
887                        statement: None,
888                    },
889                );
890            }
891        }
892    }
893}
894
895// ═══════════════════════════════════════════════════════════════════════
896// Helper types and functions
897// ═══════════════════════════════════════════════════════════════════════
898
899/// A column reference found in an expression.
900#[derive(Debug, Clone)]
901struct ColumnReference {
902    table: Option<String>,
903    name: String,
904}
905
906impl ColumnReference {
907    fn qualified_name(&self) -> String {
908        if let Some(ref table) = self.table {
909            format!("{}.{}", table, self.name)
910        } else {
911            self.name.clone()
912        }
913    }
914}
915
916/// Collect all column references from an expression.
917fn collect_expr_columns(expr: &Expr) -> Vec<ColumnReference> {
918    let mut columns = Vec::new();
919    
920    expr.walk(&mut |e| {
921        if let Expr::Column { table, name, .. } = e {
922            columns.push(ColumnReference {
923                table: table.clone(),
924                name: name.clone(),
925            });
926            return false; // Don't recurse into column nodes
927        }
928        // Don't descend into subqueries
929        !matches!(e, Expr::Subquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. })
930    });
931    
932    columns
933}
934
935/// Extract SELECT columns from a statement.
936fn extract_select_columns(stmt: &Statement) -> Option<Vec<SelectItem>> {
937    match stmt {
938        Statement::Select(sel) => Some(sel.columns.clone()),
939        Statement::SetOperation(set_op) => extract_select_columns(&set_op.left),
940        Statement::CreateView(cv) => extract_select_columns(&cv.query),
941        _ => None,
942    }
943}
944
945/// Get the output name of an expression.
946fn expr_output_name(expr: &Expr) -> &str {
947    match expr {
948        Expr::Column { name, .. } => name,
949        Expr::Alias { name, .. } => name,
950        _ => "",
951    }
952}
953
954/// Convert an expression to a displayable name.
955fn expr_to_name(expr: &Expr, trim_qualifiers: bool) -> String {
956    match expr {
957        Expr::Column { table, name, .. } => {
958            if trim_qualifiers {
959                name.clone()
960            } else if let Some(t) = table {
961                format!("{}.{}", t, name)
962            } else {
963                name.clone()
964            }
965        }
966        Expr::Alias { name, .. } => name.clone(),
967        Expr::Function { name, .. } => format!("{}(...)", name),
968        Expr::BinaryOp { op, .. } => format!("({:?})", op),
969        Expr::Cast { data_type, .. } => format!("CAST AS {:?}", data_type),
970        _ => "expr".to_string(),
971    }
972}
973
974/// Parse a column reference string into (name, optional_table_qualifier).
975fn parse_column_ref(column: &str) -> (String, Option<String>) {
976    if let Some(idx) = column.rfind('.') {
977        let table = column[..idx].to_string();
978        let name = column[idx + 1..].to_string();
979        (name, Some(table))
980    } else {
981        (column.to_string(), None)
982    }
983}
984
985/// Check if a column name matches (case-insensitive for most dialects).
986fn matches_column_name(item: &str, target: &str) -> bool {
987    item.eq_ignore_ascii_case(target)
988}
989
990/// Normalize an identifier name for the given dialect.
991fn normalize_name(name: &str, dialect: Dialect) -> String {
992    crate::schema::normalize_identifier(name, dialect)
993}
994
995/// Check if a SELECT item outputs a column with the given name.
996fn select_item_has_column(item: &SelectItem, name: &str) -> bool {
997    match item {
998        SelectItem::Expr { expr, alias } => {
999            let item_name = alias.as_ref().map(String::as_str).unwrap_or_else(|| {
1000                expr_output_name(expr)
1001            });
1002            matches_column_name(item_name, name)
1003        }
1004        SelectItem::Wildcard => true, // Could match any column
1005        SelectItem::QualifiedWildcard { .. } => true,
1006    }
1007}
1008
1009/// Escape a string for DOT format.
1010fn escape_dot(s: &str) -> String {
1011    s.replace('\\', "\\\\")
1012        .replace('"', "\\\"")
1013        .replace('\n', "\\n")
1014}
1015
1016/// Escape a string for Mermaid format.
1017fn escape_mermaid(s: &str) -> String {
1018    s.replace('"', "'")
1019        .replace('\n', " ")
1020        .replace('[', "(")
1021        .replace(']', ")")
1022}
1023
1024// ═══════════════════════════════════════════════════════════════════════
1025// Tests
1026// ═══════════════════════════════════════════════════════════════════════
1027
1028#[cfg(test)]
1029mod tests {
1030    use super::*;
1031    use crate::parser::parse;
1032
1033    fn test_config() -> LineageConfig {
1034        LineageConfig::new(Dialect::Ansi)
1035    }
1036
1037    fn test_schema() -> MappingSchema {
1038        let mut schema = MappingSchema::new(Dialect::Ansi);
1039        schema.add_table(
1040            &["t"],
1041            vec![
1042                ("a".to_string(), DataType::Int),
1043                ("b".to_string(), DataType::Int),
1044                ("c".to_string(), DataType::Int),
1045            ],
1046        ).unwrap();
1047        schema.add_table(
1048            &["users"],
1049            vec![
1050                ("id".to_string(), DataType::Int),
1051                ("name".to_string(), DataType::Varchar(Some(255))),
1052                ("email".to_string(), DataType::Text),
1053            ],
1054        ).unwrap();
1055        schema.add_table(
1056            &["orders"],
1057            vec![
1058                ("id".to_string(), DataType::Int),
1059                ("user_id".to_string(), DataType::Int),
1060                ("amount".to_string(), DataType::Decimal { precision: Some(10), scale: Some(2) }),
1061            ],
1062        ).unwrap();
1063        schema
1064    }
1065
1066    #[test]
1067    fn test_simple_column_lineage() {
1068        let sql = "SELECT a FROM t";
1069        let ast = parse(sql, Dialect::Ansi).unwrap();
1070        let schema = test_schema();
1071        let config = test_config();
1072        
1073        let graph = lineage("a", &ast, &schema, &config).unwrap();
1074        
1075        assert_eq!(graph.node.name, "a");
1076        assert_eq!(graph.node.depth, 0);
1077        // The root column depends on t.a
1078        assert_eq!(graph.node.downstream.len(), 1);
1079        assert_eq!(graph.node.downstream[0].source_name, Some("t".to_string()));
1080    }
1081
1082    #[test]
1083    fn test_aliased_column_lineage() {
1084        let sql = "SELECT a AS col_a FROM t";
1085        let ast = parse(sql, Dialect::Ansi).unwrap();
1086        let schema = test_schema();
1087        let config = test_config();
1088        
1089        let graph = lineage("col_a", &ast, &schema, &config).unwrap();
1090        
1091        assert_eq!(graph.node.name, "col_a");
1092        assert_eq!(graph.node.alias, Some("col_a".to_string()));
1093    }
1094
1095    #[test]
1096    fn test_expression_lineage() {
1097        let sql = "SELECT a + b AS sum FROM t";
1098        let ast = parse(sql, Dialect::Ansi).unwrap();
1099        let schema = test_schema();
1100        let config = test_config();
1101        
1102        let graph = lineage("sum", &ast, &schema, &config).unwrap();
1103        
1104        assert_eq!(graph.node.name, "sum");
1105        // The sum depends on both a and b
1106        assert_eq!(graph.node.downstream.len(), 2);
1107    }
1108
1109    #[test]
1110    fn test_cte_lineage() {
1111        let sql = "WITH cte AS (SELECT a FROM t) SELECT a FROM cte";
1112        let ast = parse(sql, Dialect::Ansi).unwrap();
1113        let schema = test_schema();
1114        let config = test_config();
1115        
1116        let graph = lineage("a", &ast, &schema, &config).unwrap();
1117        
1118        assert_eq!(graph.node.name, "a");
1119        // Should trace through the CTE
1120        assert!(graph.source_tables().contains(&"t".to_string()));
1121    }
1122
1123    #[test]
1124    fn test_join_lineage() {
1125        let sql = "SELECT u.name, o.amount FROM users u JOIN orders o ON u.id = o.user_id";
1126        let ast = parse(sql, Dialect::Ansi).unwrap();
1127        let schema = test_schema();
1128        let config = test_config();
1129        
1130        let graph = lineage("name", &ast, &schema, &config).unwrap();
1131        assert_eq!(graph.node.name, "name");
1132        
1133        let graph2 = lineage("amount", &ast, &schema, &config).unwrap();
1134        assert_eq!(graph2.node.name, "amount");
1135    }
1136
1137    #[test]
1138    fn test_union_lineage() {
1139        let sql = "SELECT a FROM t UNION SELECT b AS a FROM t";
1140        let ast = parse(sql, Dialect::Ansi).unwrap();
1141        let schema = test_schema();
1142        let config = test_config();
1143        
1144        let graph = lineage("a", &ast, &schema, &config).unwrap();
1145        
1146        assert_eq!(graph.node.name, "a");
1147        // Should have two branches
1148        assert_eq!(graph.node.downstream.len(), 2);
1149    }
1150
1151    #[test]
1152    fn test_column_not_found() {
1153        let sql = "SELECT a FROM t";
1154        let ast = parse(sql, Dialect::Ansi).unwrap();
1155        let schema = test_schema();
1156        let config = test_config();
1157        
1158        let result = lineage("nonexistent", &ast, &schema, &config);
1159        assert!(matches!(result, Err(LineageError::ColumnNotFound(_))));
1160    }
1161
1162    #[test]
1163    fn test_derived_table_lineage() {
1164        let sql = "SELECT x FROM (SELECT a AS x FROM t) sub";
1165        let ast = parse(sql, Dialect::Ansi).unwrap();
1166        let schema = test_schema();
1167        let config = test_config();
1168        
1169        let graph = lineage("x", &ast, &schema, &config).unwrap();
1170        
1171        assert_eq!(graph.node.name, "x");
1172        // Should trace through the derived table to t.a
1173        assert!(graph.source_tables().contains(&"t".to_string()));
1174    }
1175
1176    #[test]
1177    fn test_function_lineage() {
1178        let sql = "SELECT SUM(a) AS total FROM t";
1179        let ast = parse(sql, Dialect::Ansi).unwrap();
1180        let schema = test_schema();
1181        let config = test_config();
1182        
1183        let graph = lineage("total", &ast, &schema, &config).unwrap();
1184        
1185        assert_eq!(graph.node.name, "total");
1186        assert_eq!(graph.node.downstream.len(), 1);
1187    }
1188
1189    #[test]
1190    fn test_lineage_sql_convenience() {
1191        let schema = test_schema();
1192        let config = test_config();
1193        
1194        let graph = lineage_sql("b", "SELECT a, b FROM t", &schema, &config).unwrap();
1195        
1196        assert_eq!(graph.node.name, "b");
1197        assert_eq!(graph.sql, Some("SELECT a, b FROM t".to_string()));
1198    }
1199
1200    #[test]
1201    fn test_source_tables() {
1202        let sql = "SELECT u.name, o.amount FROM users u JOIN orders o ON u.id = o.user_id";
1203        let ast = parse(sql, Dialect::Ansi).unwrap();
1204        let schema = test_schema();
1205        let config = test_config();
1206        
1207        let graph = lineage("name", &ast, &schema, &config).unwrap();
1208        let tables = graph.source_tables();
1209        
1210        assert!(tables.contains(&"u".to_string()));
1211    }
1212
1213    #[test]
1214    fn test_to_dot() {
1215        let sql = "SELECT a AS col FROM t";
1216        let ast = parse(sql, Dialect::Ansi).unwrap();
1217        let schema = test_schema();
1218        let config = test_config();
1219        
1220        let graph = lineage("col", &ast, &schema, &config).unwrap();
1221        let dot = graph.to_dot();
1222        
1223        assert!(dot.contains("digraph lineage"));
1224        assert!(dot.contains("rankdir=BT"));
1225    }
1226
1227    #[test]
1228    fn test_to_mermaid() {
1229        let sql = "SELECT a AS col FROM t";
1230        let ast = parse(sql, Dialect::Ansi).unwrap();
1231        let schema = test_schema();
1232        let config = test_config();
1233        
1234        let graph = lineage("col", &ast, &schema, &config).unwrap();
1235        let mermaid = graph.to_mermaid();
1236        
1237        assert!(mermaid.contains("flowchart BT"));
1238    }
1239
1240    #[test]
1241    fn test_case_expression_lineage() {
1242        let sql = "SELECT CASE WHEN a > 0 THEN b ELSE c END AS result FROM t";
1243        let ast = parse(sql, Dialect::Ansi).unwrap();
1244        let schema = test_schema();
1245        let config = test_config();
1246        
1247        let graph = lineage("result", &ast, &schema, &config).unwrap();
1248        
1249        assert_eq!(graph.node.name, "result");
1250        // Should depend on a, b, and c
1251        assert!(graph.node.downstream.len() >= 2);
1252    }
1253
1254    #[test]
1255    fn test_coalesce_lineage() {
1256        let sql = "SELECT COALESCE(a, b, c) AS val FROM t";
1257        let ast = parse(sql, Dialect::Ansi).unwrap();
1258        let schema = test_schema();
1259        let config = test_config();
1260        
1261        let graph = lineage("val", &ast, &schema, &config).unwrap();
1262        
1263        assert_eq!(graph.node.name, "val");
1264        // Should depend on a, b, and c
1265        assert_eq!(graph.node.downstream.len(), 3);
1266    }
1267
1268    #[test]
1269    fn test_nested_cte_lineage() {
1270        let sql = r#"
1271            WITH cte1 AS (SELECT a, b FROM t),
1272                 cte2 AS (SELECT a + b AS sum FROM cte1)
1273            SELECT sum FROM cte2
1274        "#;
1275        let ast = parse(sql, Dialect::Ansi).unwrap();
1276        let schema = test_schema();
1277        let config = test_config();
1278        
1279        let graph = lineage("sum", &ast, &schema, &config).unwrap();
1280        
1281        assert_eq!(graph.node.name, "sum");
1282        // Should trace through both CTEs to t
1283        let sources = graph.source_tables();
1284        assert!(sources.contains(&"t".to_string()));
1285    }
1286
1287    #[test]
1288    fn test_lineage_iterator() {
1289        let sql = "SELECT a + b AS sum FROM t";
1290        let ast = parse(sql, Dialect::Ansi).unwrap();
1291        let schema = test_schema();
1292        let config = test_config();
1293        
1294        let graph = lineage("sum", &ast, &schema, &config).unwrap();
1295        
1296        let nodes: Vec<_> = graph.node.iter().collect();
1297        assert!(!nodes.is_empty());
1298        assert_eq!(nodes[0].name, "sum");
1299    }
1300
1301    #[test]
1302    fn test_external_sources() {
1303        let schema = test_schema();
1304        let mut sources = HashMap::new();
1305        sources.insert("view1".to_string(), "SELECT a FROM t".to_string());
1306        
1307        let config = LineageConfig::new(Dialect::Ansi).with_sources(sources);
1308        
1309        let sql = "SELECT a FROM view1";
1310        let result = lineage_sql("a", sql, &schema, &config);
1311        // Should parse and handle external sources
1312        assert!(result.is_ok() || matches!(result, Err(LineageError::ColumnNotFound(_))));
1313    }
1314
1315    #[test]
1316    fn test_qualified_column() {
1317        let sql = "SELECT t.a FROM t";
1318        let ast = parse(sql, Dialect::Ansi).unwrap();
1319        let schema = test_schema();
1320        let config = LineageConfig::new(Dialect::Ansi).with_trim_qualifiers(false);
1321        
1322        let graph = lineage("a", &ast, &schema, &config).unwrap();
1323        
1324        // With trim_qualifiers=false, should preserve qualification
1325        assert!(graph.node.name.contains('a'));
1326    }
1327}