flowscope_core/analyzer/
visitor.rs

1//! Visitor pattern for AST traversal and lineage analysis.
2//!
3//! This module provides a visitor-based approach to traversing SQL AST nodes
4//! and building lineage graphs. It separates traversal logic (the `Visitor` trait)
5//! from analysis logic (the `LineageVisitor` implementation).
6
7use super::context::StatementContext;
8use super::expression::ExpressionAnalyzer;
9use super::helpers::{
10    alias_visibility_warning, find_cte_definition_span, find_derived_table_alias_span,
11    generate_node_id,
12};
13use super::select_analyzer::SelectAnalyzer;
14use super::Analyzer;
15use crate::types::{issue_codes, Issue, Node, NodeType, Span};
16use sqlparser::ast::{
17    self, Cte, Expr, Ident, Join, Query, Select, SetExpr, SetOperator, Statement, TableAlias,
18    TableFactor, TableWithJoins, Values,
19};
20use std::sync::Arc;
21
22/// A visitor trait for traversing the SQL AST.
23///
24/// This trait defines default behavior for visiting nodes (traversing children).
25/// Implementors can override specific methods to add custom logic.
26pub trait Visitor {
27    fn visit_statement(&mut self, statement: &Statement) {
28        match statement {
29            Statement::Query(query) => self.visit_query(query),
30            Statement::Insert(insert) => {
31                if let Some(source) = &insert.source {
32                    self.visit_query(source);
33                }
34            }
35            Statement::CreateTable(create) => {
36                if let Some(query) = &create.query {
37                    self.visit_query(query);
38                }
39            }
40            Statement::CreateView { query, .. } => self.visit_query(query),
41            _ => {}
42        }
43    }
44
45    fn visit_query(&mut self, query: &Query) {
46        if let Some(with) = &query.with {
47            for cte in &with.cte_tables {
48                self.visit_cte(cte);
49            }
50        }
51        self.visit_set_expr(&query.body);
52    }
53
54    fn visit_cte(&mut self, cte: &Cte) {
55        self.visit_query(&cte.query);
56    }
57
58    fn visit_set_expr(&mut self, set_expr: &SetExpr) {
59        match set_expr {
60            SetExpr::Select(select) => self.visit_select(select),
61            SetExpr::Query(query) => self.visit_query(query),
62            SetExpr::SetOperation { left, right, .. } => {
63                self.visit_set_expr(left);
64                self.visit_set_expr(right);
65            }
66            SetExpr::Values(values) => self.visit_values(values),
67            SetExpr::Insert(stmt) => self.visit_statement(stmt),
68            _ => {}
69        }
70    }
71
72    fn visit_select(&mut self, select: &Select) {
73        for from in &select.from {
74            self.visit_table_with_joins(from);
75        }
76    }
77
78    fn visit_table_with_joins(&mut self, table: &TableWithJoins) {
79        self.visit_table_factor(&table.relation);
80        for join in &table.joins {
81            self.visit_join(join);
82        }
83    }
84
85    fn visit_table_factor(&mut self, table: &TableFactor) {
86        match table {
87            TableFactor::Derived { subquery, .. } => self.visit_query(subquery),
88            TableFactor::NestedJoin {
89                table_with_joins, ..
90            } => self.visit_table_with_joins(table_with_joins),
91            _ => {}
92        }
93    }
94
95    fn visit_join(&mut self, join: &Join) {
96        self.visit_table_factor(&join.relation);
97    }
98
99    fn visit_values(&mut self, values: &Values) {
100        for row in &values.rows {
101            for expr in row {
102                self.visit_expr(expr);
103            }
104        }
105    }
106
107    fn visit_expr(&mut self, _expr: &Expr) {}
108}
109
110/// Visitor implementation that builds the lineage graph.
111pub(crate) struct LineageVisitor<'a, 'b> {
112    pub(crate) analyzer: &'a mut Analyzer<'b>,
113    pub(crate) ctx: &'a mut StatementContext,
114    pub(crate) target_node: Option<String>,
115}
116
117impl<'a, 'b> LineageVisitor<'a, 'b> {
118    pub(crate) fn new(
119        analyzer: &'a mut Analyzer<'b>,
120        ctx: &'a mut StatementContext,
121        target_node: Option<String>,
122    ) -> Self {
123        Self {
124            analyzer,
125            ctx,
126            target_node,
127        }
128    }
129
130    #[inline]
131    pub fn target_from_arc(arc: Option<&Arc<str>>) -> Option<String> {
132        arc.map(|s| s.to_string())
133    }
134
135    pub fn set_target_node(&mut self, target: Option<String>) {
136        self.target_node = target;
137    }
138
139    pub fn set_last_operation(&mut self, op: Option<String>) {
140        self.ctx.last_operation = op;
141    }
142
143    /// Locates a span using the provided finder function.
144    ///
145    /// Handles the common logic for span searching:
146    /// - Uses statement-local SQL when available, full request SQL otherwise
147    /// - Adjusts span coordinates from statement-local to request-global
148    /// - Updates the span search cursor after successful matches
149    fn locate_span<F>(&mut self, identifier: &str, finder: F) -> Option<Span>
150    where
151        F: Fn(&str, &str, usize) -> Option<Span>,
152    {
153        let search_start = self.ctx.span_search_cursor;
154
155        let (sql, offset) = if let Some(source) = &self.analyzer.current_statement_source {
156            (
157                &source.sql[source.range.start..source.range.end],
158                source.range.start,
159            )
160        } else {
161            (self.analyzer.request.sql.as_str(), 0)
162        };
163
164        let span = finder(sql, identifier, search_start)?;
165
166        // Invariant: cursor should only move forward (left-to-right traversal)
167        debug_assert!(
168            span.end >= self.ctx.span_search_cursor,
169            "Span cursor moved backward: {} -> {} (identifier: '{}')",
170            self.ctx.span_search_cursor,
171            span.end,
172            identifier
173        );
174
175        self.ctx.span_search_cursor = span.end;
176        Some(Span::new(offset + span.start, offset + span.end))
177    }
178
179    fn locate_cte_definition_span(&mut self, identifier: &str) -> Option<Span> {
180        self.locate_span(identifier, find_cte_definition_span)
181    }
182
183    fn locate_derived_alias_span(&mut self, identifier: &str) -> Option<Span> {
184        self.locate_span(identifier, find_derived_table_alias_span)
185    }
186
187    /// Extract the expression from a JoinOperator's constraint, if any.
188    fn extract_join_constraint_expr(op: &ast::JoinOperator) -> Option<&Expr> {
189        let constraint = match op {
190            ast::JoinOperator::Join(c)
191            | ast::JoinOperator::Inner(c)
192            | ast::JoinOperator::Left(c)
193            | ast::JoinOperator::LeftOuter(c)
194            | ast::JoinOperator::Right(c)
195            | ast::JoinOperator::RightOuter(c)
196            | ast::JoinOperator::FullOuter(c)
197            | ast::JoinOperator::Semi(c)
198            | ast::JoinOperator::LeftSemi(c)
199            | ast::JoinOperator::RightSemi(c)
200            | ast::JoinOperator::Anti(c)
201            | ast::JoinOperator::LeftAnti(c)
202            | ast::JoinOperator::RightAnti(c)
203            | ast::JoinOperator::StraightJoin(c) => Some(c),
204            ast::JoinOperator::AsOf { constraint, .. } => Some(constraint),
205            ast::JoinOperator::CrossJoin(_)
206            | ast::JoinOperator::CrossApply
207            | ast::JoinOperator::OuterApply => None,
208        };
209
210        constraint.and_then(|c| match c {
211            ast::JoinConstraint::On(expr) => Some(expr),
212            _ => None,
213        })
214    }
215
216    /// Extract and record implied foreign key relationships from a JOIN condition.
217    ///
218    /// For equality expressions like `t1.a = t2.b`, we record **both directions**
219    /// as potential FK relationships. This is intentional because:
220    ///
221    /// 1. **No authoritative direction**: From syntax alone, we cannot determine
222    ///    which column is the FK and which is the referenced PK. The true direction
223    ///    depends on schema knowledge we may not have.
224    ///
225    /// 2. **Consumer deduplication**: Downstream consumers (like the React SchemaView)
226    ///    normalize and deduplicate reciprocal FK edges before rendering, so storing
227    ///    both directions doesn't create duplicate visual edges.
228    ///
229    /// 3. **Heuristic accuracy**: Recording both ensures we capture the relationship
230    ///    regardless of how the user wrote the JOIN condition (`a.id = b.a_id` vs
231    ///    `b.a_id = a.id`).
232    ///
233    /// Self-joins are excluded since `t.a = t.b` within the same table doesn't
234    /// imply a cross-table FK relationship (see [`StatementContext::record_implied_foreign_key`]).
235    fn record_join_fk_relationships(&mut self, expr: &Expr) {
236        use sqlparser::ast::BinaryOperator;
237
238        match expr {
239            Expr::BinaryOp { left, op, right } if *op == BinaryOperator::And => {
240                // Recurse into AND conditions (common in multi-column joins)
241                self.record_join_fk_relationships(left);
242                self.record_join_fk_relationships(right);
243            }
244            Expr::BinaryOp { left, op, right } if *op == BinaryOperator::Eq => {
245                self.record_equality_fk(left, right);
246            }
247            Expr::Nested(inner) => self.record_join_fk_relationships(inner),
248            _ => {}
249        }
250    }
251
252    /// Record FK relationships from an equality expression (t1.a = t2.b).
253    fn record_equality_fk(&mut self, left: &Expr, right: &Expr) {
254        let Some(left_ref) = Self::extract_column_ref(left) else {
255            return;
256        };
257        let Some(right_ref) = Self::extract_column_ref(right) else {
258            return;
259        };
260
261        let left_table = left_ref
262            .0
263            .as_ref()
264            .and_then(|t| self.resolve_table_alias(Some(t)));
265        let right_table = right_ref
266            .0
267            .as_ref()
268            .and_then(|t| self.resolve_table_alias(Some(t)));
269
270        let (Some(left_table), Some(right_table)) = (left_table, right_table) else {
271            return;
272        };
273
274        // Record FK in both directions (see record_join_fk_relationships docs for rationale)
275        self.ctx
276            .record_implied_foreign_key(&left_table, &left_ref.1, &right_table, &right_ref.1);
277        self.ctx
278            .record_implied_foreign_key(&right_table, &right_ref.1, &left_table, &left_ref.1);
279    }
280
281    /// Extract a (table, column) pair from a simple column reference expression.
282    fn extract_column_ref(expr: &Expr) -> Option<(Option<String>, String)> {
283        match expr {
284            Expr::Identifier(ident) => Some((None, ident.value.clone())),
285            Expr::CompoundIdentifier(idents) if idents.len() == 2 => {
286                Some((Some(idents[0].value.clone()), idents[1].value.clone()))
287            }
288            Expr::CompoundIdentifier(idents) if idents.len() >= 2 => {
289                // schema.table.column - take last two parts
290                let len = idents.len();
291                Some((
292                    Some(idents[len - 2].value.clone()),
293                    idents[len - 1].value.clone(),
294                ))
295            }
296            _ => None,
297        }
298    }
299
300    pub fn add_source_table(&mut self, table_name: &str) -> Option<String> {
301        self.analyzer
302            .add_source_table(self.ctx, table_name, self.target_node.as_deref())
303    }
304
305    pub fn analyze_dml_target(
306        &mut self,
307        table_name: &str,
308        alias: Option<&TableAlias>,
309    ) -> Option<(String, Arc<str>)> {
310        let canonical_res = self.analyzer.add_source_table(self.ctx, table_name, None);
311        let canonical = canonical_res
312            .clone()
313            .unwrap_or_else(|| self.analyzer.normalize_table_name(table_name));
314
315        if let (Some(a), Some(canonical_name)) = (alias, canonical_res) {
316            self.ctx
317                .table_aliases
318                .insert(a.name.to_string(), canonical_name);
319        }
320
321        let node_id = self
322            .ctx
323            .table_node_ids
324            .get(&canonical)
325            .cloned()
326            .unwrap_or_else(|| self.analyzer.relation_node_id(&canonical));
327
328        self.analyzer
329            .tracker
330            .record_produced(&canonical, self.ctx.statement_index);
331        self.analyzer
332            .add_table_columns_from_schema(self.ctx, &canonical, &node_id);
333
334        Some((canonical, node_id))
335    }
336
337    pub fn analyze_dml_target_factor(&mut self, table: &TableFactor) -> Option<Arc<str>> {
338        if let TableFactor::Table { name, alias, .. } = table {
339            let table_name = name.to_string();
340            self.analyze_dml_target(&table_name, alias.as_ref())
341                .map(|(_, node_id)| node_id)
342        } else {
343            self.visit_table_factor(table);
344            None
345        }
346    }
347
348    pub fn analyze_dml_target_from_table_with_joins(
349        &mut self,
350        table: &TableWithJoins,
351    ) -> Option<Arc<str>> {
352        if let TableFactor::Table { name, alias, .. } = &table.relation {
353            let table_name = name.to_string();
354            self.analyze_dml_target(&table_name, alias.as_ref())
355                .map(|(_, node_id)| node_id)
356        } else {
357            self.visit_table_with_joins(table);
358            None
359        }
360    }
361
362    pub fn register_aliases_in_table_with_joins(&mut self, table_with_joins: &TableWithJoins) {
363        self.register_aliases_in_table_factor(&table_with_joins.relation);
364        for join in &table_with_joins.joins {
365            self.register_aliases_in_table_factor(&join.relation);
366        }
367    }
368
369    fn register_aliases_in_table_factor(&mut self, table_factor: &TableFactor) {
370        match table_factor {
371            TableFactor::Table {
372                name,
373                alias: Some(a),
374                ..
375            } => {
376                let canonical = self
377                    .analyzer
378                    .canonicalize_table_reference(&name.to_string())
379                    .canonical;
380                self.ctx.table_aliases.insert(a.name.to_string(), canonical);
381            }
382            TableFactor::Derived { alias: Some(a), .. } => {
383                self.ctx.subquery_aliases.insert(a.name.to_string());
384            }
385            TableFactor::NestedJoin {
386                table_with_joins, ..
387            } => {
388                self.register_aliases_in_table_with_joins(table_with_joins);
389            }
390            _ => {}
391        }
392    }
393
394    pub fn resolve_table_alias(&self, alias: Option<&str>) -> Option<String> {
395        self.analyzer.resolve_table_alias(self.ctx, alias)
396    }
397
398    pub(super) fn canonicalize_table_reference(&self, name: &str) -> super::TableResolution {
399        self.analyzer.canonicalize_table_reference(name)
400    }
401
402    /// Extracts table identifiers from an expression (best-effort for unsupported constructs).
403    ///
404    /// Used for PIVOT, UNPIVOT, and table functions where full semantic analysis is not
405    /// implemented. This may produce false positives (column references mistaken for tables)
406    /// or false negatives (table references in unhandled expression types).
407    fn extract_identifiers_from_expr(&mut self, expr: &Expr) {
408        match expr {
409            Expr::Identifier(ident) => {
410                self.try_add_identifier_as_table(std::slice::from_ref(ident));
411            }
412            Expr::CompoundIdentifier(idents) => {
413                self.try_add_identifier_as_table(idents);
414            }
415            Expr::Function(func) => {
416                if let ast::FunctionArguments::List(arg_list) = &func.args {
417                    for arg in &arg_list.args {
418                        if let ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(e)) = arg {
419                            self.extract_identifiers_from_expr(e);
420                        }
421                    }
422                }
423            }
424            Expr::BinaryOp { left, right, .. } => {
425                self.extract_identifiers_from_expr(left);
426                self.extract_identifiers_from_expr(right);
427            }
428            Expr::UnaryOp { expr, .. } => {
429                self.extract_identifiers_from_expr(expr);
430            }
431            Expr::Nested(e) => {
432                self.extract_identifiers_from_expr(e);
433            }
434            Expr::InList { expr, list, .. } => {
435                self.extract_identifiers_from_expr(expr);
436                for e in list {
437                    self.extract_identifiers_from_expr(e);
438                }
439            }
440            Expr::Case {
441                operand,
442                conditions,
443                else_result,
444                ..
445            } => {
446                if let Some(op) = operand {
447                    self.extract_identifiers_from_expr(op);
448                }
449                for case_when in conditions {
450                    self.extract_identifiers_from_expr(&case_when.condition);
451                    self.extract_identifiers_from_expr(&case_when.result);
452                }
453                if let Some(else_r) = else_result {
454                    self.extract_identifiers_from_expr(else_r);
455                }
456            }
457            _ => {}
458        }
459    }
460
461    fn try_add_identifier_as_table(&mut self, idents: &[Ident]) {
462        if idents.is_empty() {
463            return;
464        }
465
466        let name = idents
467            .iter()
468            .map(|i| i.value.as_str())
469            .collect::<Vec<_>>()
470            .join(".");
471
472        let resolution = self.analyzer.canonicalize_table_reference(&name);
473        if resolution.matched_schema {
474            self.add_source_table(&name);
475        }
476    }
477
478    /// Emits a warning for unsupported alias usage in a clause.
479    fn emit_alias_warning(&mut self, clause_name: &str, alias_name: &str) {
480        let dialect = self.analyzer.request.dialect;
481        let statement_index = self.ctx.statement_index;
482        self.analyzer.issues.push(alias_visibility_warning(
483            dialect,
484            clause_name,
485            alias_name,
486            statement_index,
487        ));
488    }
489
490    /// Analyzes ORDER BY clause for alias visibility warnings.
491    ///
492    /// Checks if aliases from the SELECT list are used in ORDER BY expressions
493    /// and emits warnings for dialects that don't support alias references in ORDER BY.
494    fn analyze_order_by(&mut self, order_by: &ast::OrderBy) {
495        let dialect = self.analyzer.request.dialect;
496
497        let order_exprs = match &order_by.kind {
498            ast::OrderByKind::Expressions(exprs) => exprs,
499            ast::OrderByKind::All(_) => return,
500        };
501
502        // Check for alias usage in ORDER BY clause
503        if !dialect.alias_in_order_by() {
504            for order_expr in order_exprs {
505                let identifiers = ExpressionAnalyzer::extract_simple_identifiers(&order_expr.expr);
506                for ident in &identifiers {
507                    let normalized_ident = self.analyzer.normalize_identifier(ident);
508                    if let Some(alias_name) = self
509                        .ctx
510                        .output_columns
511                        .iter()
512                        .find(|c| self.analyzer.normalize_identifier(&c.name) == normalized_ident)
513                        .map(|c| c.name.clone())
514                    {
515                        self.emit_alias_warning("ORDER BY", &alias_name);
516                    }
517                }
518            }
519        }
520
521        // Also analyze any subqueries in ORDER BY expressions
522        for order_expr in order_exprs {
523            let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
524            ea.analyze(&order_expr.expr);
525        }
526    }
527}
528
529impl<'a, 'b> Visitor for LineageVisitor<'a, 'b> {
530    fn visit_query(&mut self, query: &Query) {
531        if let Some(with) = &query.with {
532            let mut cte_ids: Vec<(String, Arc<str>)> = Vec::new();
533            for cte in &with.cte_tables {
534                let cte_name = cte.alias.name.to_string();
535                let cte_span = self.locate_cte_definition_span(&cte_name);
536                let cte_id = self.ctx.add_node(Node {
537                    id: generate_node_id("cte", &cte_name),
538                    node_type: NodeType::Cte,
539                    label: cte_name.clone().into(),
540                    qualified_name: Some(cte_name.clone().into()),
541                    expression: None,
542                    span: cte_span,
543                    metadata: None,
544                    resolution_source: None,
545                    filters: Vec::new(),
546                    join_type: None,
547                    join_condition: None,
548                    aggregation: None,
549                });
550
551                self.ctx
552                    .cte_definitions
553                    .insert(cte_name.clone(), cte_id.clone());
554                self.ctx
555                    .cte_node_to_name
556                    .insert(cte_id.clone(), cte_name.clone());
557                self.analyzer.tracker.record_cte(&cte_name);
558                cte_ids.push((cte_name, cte_id));
559            }
560
561            for (cte, (_, cte_id)) in with.cte_tables.iter().zip(cte_ids.iter()) {
562                let projection_checkpoint = self.ctx.projection_checkpoint();
563                let mut cte_visitor =
564                    LineageVisitor::new(self.analyzer, self.ctx, Some(cte_id.to_string()));
565                cte_visitor.visit_query(&cte.query);
566                let columns = self.ctx.take_output_columns_since(projection_checkpoint);
567                self.ctx
568                    .aliased_subquery_columns
569                    .insert(cte.alias.name.to_string(), columns);
570            }
571        }
572        self.visit_set_expr(&query.body);
573
574        // Analyze ORDER BY for alias visibility warnings
575        if let Some(order_by) = &query.order_by {
576            self.analyze_order_by(order_by);
577        }
578    }
579
580    fn visit_set_expr(&mut self, set_expr: &SetExpr) {
581        match set_expr {
582            SetExpr::Select(select) => self.visit_select(select),
583            SetExpr::Query(query) => self.visit_query(query),
584            SetExpr::SetOperation {
585                op, left, right, ..
586            } => {
587                let op_name = match op {
588                    SetOperator::Union => "UNION",
589                    SetOperator::Intersect => "INTERSECT",
590                    SetOperator::Except => "EXCEPT",
591                    SetOperator::Minus => "MINUS",
592                };
593                self.visit_set_expr(left);
594                self.visit_set_expr(right);
595                if self.target_node.is_some() {
596                    self.ctx.last_operation = Some(op_name.to_string());
597                }
598            }
599            SetExpr::Values(values) => self.visit_values(values),
600            SetExpr::Insert(insert_stmt) => {
601                let Statement::Insert(insert) = insert_stmt else {
602                    return;
603                };
604                let target_name = insert.table.to_string();
605                self.add_source_table(&target_name);
606            }
607            SetExpr::Table(tbl) => {
608                let name = tbl
609                    .table_name
610                    .as_ref()
611                    .map(|n| n.to_string())
612                    .unwrap_or_default();
613                if !name.is_empty() {
614                    self.add_source_table(&name);
615                }
616            }
617            _ => {}
618        }
619    }
620
621    fn visit_select(&mut self, select: &Select) {
622        self.ctx.push_scope();
623        for table_with_joins in &select.from {
624            self.visit_table_with_joins(table_with_joins);
625        }
626        if self.analyzer.column_lineage_enabled {
627            let output_node = self.ctx.output_node_id().map(|node_id| node_id.to_string());
628            let target_node = self.target_node.clone().or(output_node);
629            let mut select_analyzer = SelectAnalyzer::new(self.analyzer, self.ctx, target_node);
630            select_analyzer.analyze(select);
631        }
632        self.ctx.pop_scope();
633    }
634
635    fn visit_table_with_joins(&mut self, table_with_joins: &TableWithJoins) {
636        self.visit_table_factor(&table_with_joins.relation);
637        for join in &table_with_joins.joins {
638            let (join_type, join_condition) = Analyzer::convert_join_operator(&join.join_operator);
639            self.ctx.current_join_info.join_type = join_type;
640            self.ctx.current_join_info.join_condition = join_condition;
641            self.ctx.last_operation = Analyzer::join_type_to_operation(join_type);
642            self.visit_table_factor(&join.relation);
643
644            // Analyze JOIN condition expression to capture column references for implied schema
645            if let Some(expr) = Self::extract_join_constraint_expr(&join.join_operator) {
646                let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
647                ea.analyze(expr);
648
649                // Extract implied FK relationships from equality conditions
650                self.record_join_fk_relationships(expr);
651            }
652
653            self.ctx.current_join_info.join_type = None;
654            self.ctx.current_join_info.join_condition = None;
655        }
656    }
657
658    fn visit_table_factor(&mut self, table_factor: &TableFactor) {
659        match table_factor {
660            TableFactor::Table { name, alias, .. } => {
661                let table_name = name.to_string();
662                let canonical = self.add_source_table(&table_name);
663                if let (Some(a), Some(canonical_name)) = (alias, canonical) {
664                    self.ctx
665                        .register_alias_in_scope(a.name.to_string(), canonical_name);
666                }
667            }
668            TableFactor::Derived {
669                subquery, alias, ..
670            } => {
671                // A derived table (subquery in a FROM clause) is treated like a temporary CTE.
672                // We create a node for it in the graph, analyze its subquery to determine its
673                // output columns, and then register its alias and columns in the current scope
674                // so the outer query can reference it.
675                let alias_name = alias.as_ref().map(|a| a.name.to_string());
676                let projection_checkpoint = self.ctx.projection_checkpoint();
677                let derived_span = alias_name
678                    .as_ref()
679                    .and_then(|name| self.locate_derived_alias_span(name));
680
681                // We model derived tables as CTEs in the graph since they are conceptually
682                // similar: both are ephemeral, named result sets scoped to a single query.
683                // This avoids introducing a separate NodeType for a very similar concept.
684                let derived_node_id = alias_name.as_ref().map(|name| {
685                    self.ctx.add_node(Node {
686                        id: generate_node_id("derived", name),
687                        node_type: NodeType::Cte,
688                        label: name.clone().into(),
689                        qualified_name: Some(name.clone().into()),
690                        expression: None,
691                        span: derived_span,
692                        metadata: None,
693                        resolution_source: None,
694                        filters: Vec::new(),
695                        join_type: None,
696                        join_condition: None,
697                        aggregation: None,
698                    })
699                });
700
701                if let (Some(name), Some(node_id)) = (alias_name.as_ref(), derived_node_id.as_ref())
702                {
703                    // Track reverse mapping for wildcard inference without polluting
704                    // the global CTE definition map (which stores real WITH items).
705                    self.ctx
706                        .cte_node_to_name
707                        .insert(node_id.clone(), name.clone());
708                }
709
710                let mut derived_visitor = LineageVisitor::new(
711                    self.analyzer,
712                    self.ctx,
713                    derived_node_id.as_ref().map(|id| id.to_string()),
714                );
715                derived_visitor.visit_query(subquery);
716                let columns = self.ctx.take_output_columns_since(projection_checkpoint);
717
718                if let (Some(name), Some(node_id)) = (alias_name, derived_node_id) {
719                    self.ctx
720                        .register_table_in_scope(name.clone(), node_id.clone());
721                    self.ctx.register_alias_in_scope(name.clone(), name.clone());
722                    self.ctx.aliased_subquery_columns.insert(name, columns);
723                }
724            }
725            TableFactor::NestedJoin {
726                table_with_joins, ..
727            } => {
728                self.visit_table_with_joins(table_with_joins);
729            }
730            TableFactor::TableFunction { expr, alias, .. } => {
731                self.extract_identifiers_from_expr(expr);
732                if let Some(a) = alias {
733                    self.ctx
734                        .register_subquery_alias_in_scope(a.name.to_string());
735                }
736                self.analyzer.issues.push(
737                    Issue::info(
738                        issue_codes::UNSUPPORTED_SYNTAX,
739                        "Table function lineage extracted with best-effort identifier matching",
740                    )
741                    .with_statement(self.ctx.statement_index),
742                );
743            }
744            TableFactor::Pivot {
745                table,
746                aggregate_functions,
747                value_column,
748                value_source,
749                alias,
750                ..
751            } => {
752                self.visit_table_factor(table);
753                for func in aggregate_functions {
754                    self.extract_identifiers_from_expr(&func.expr);
755                }
756                for expr in value_column {
757                    self.extract_identifiers_from_expr(expr);
758                }
759                match value_source {
760                    ast::PivotValueSource::List(values) => {
761                        for value in values {
762                            self.extract_identifiers_from_expr(&value.expr);
763                        }
764                    }
765                    ast::PivotValueSource::Any(_) => {}
766                    ast::PivotValueSource::Subquery(q) => {
767                        self.visit_query(q);
768                    }
769                }
770                if let Some(a) = alias {
771                    self.ctx
772                        .register_subquery_alias_in_scope(a.name.to_string());
773                }
774                self.analyzer.issues.push(
775                    Issue::warning(
776                        issue_codes::UNSUPPORTED_SYNTAX,
777                        "PIVOT lineage extracted with best-effort identifier matching",
778                    )
779                    .with_statement(self.ctx.statement_index),
780                );
781            }
782            TableFactor::Unpivot {
783                table,
784                columns,
785                alias,
786                ..
787            } => {
788                self.visit_table_factor(table);
789                for col in columns {
790                    self.extract_identifiers_from_expr(&col.expr);
791                }
792                if let Some(a) = alias {
793                    self.ctx
794                        .register_subquery_alias_in_scope(a.name.to_string());
795                }
796                self.analyzer.issues.push(
797                    Issue::warning(
798                        issue_codes::UNSUPPORTED_SYNTAX,
799                        "UNPIVOT lineage extracted with best-effort identifier matching",
800                    )
801                    .with_statement(self.ctx.statement_index),
802                );
803            }
804            TableFactor::UNNEST {
805                array_exprs, alias, ..
806            } => {
807                // UNNEST expands array columns into rows. Extract column references
808                // from the array expressions and resolve them to their source tables.
809                for expr in array_exprs {
810                    let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
811                    let column_refs = ea.extract_column_refs_with_warning(expr);
812                    for col_ref in &column_refs {
813                        // Resolve the column to its source table and add it as a data source
814                        if let Some(table_canonical) = self.analyzer.resolve_column_table(
815                            self.ctx,
816                            col_ref.table.as_deref(),
817                            &col_ref.column,
818                        ) {
819                            self.add_source_table(&table_canonical);
820                        }
821                    }
822                }
823                if let Some(a) = alias {
824                    self.ctx
825                        .register_subquery_alias_in_scope(a.name.to_string());
826                }
827            }
828            _ => {}
829        }
830    }
831
832    fn visit_values(&mut self, values: &Values) {
833        let mut expr_analyzer = ExpressionAnalyzer::new(self.analyzer, self.ctx);
834        for row in &values.rows {
835            for expr in row {
836                expr_analyzer.analyze(expr);
837            }
838        }
839    }
840}