Skip to main content

flowscope_core/analyzer/
visitor.rs

1//! Visitor pattern for AST traversal and lineage analysis.
2//!
3//! This module provides a visitor-based approach to traversing SQL AST nodes
4//! and building lineage graphs. It separates traversal logic (the `Visitor` trait)
5//! from analysis logic (the `LineageVisitor` implementation).
6
7use super::context::StatementContext;
8use super::expression::ExpressionAnalyzer;
9use super::helpers::{
10    alias_visibility_warning, find_cte_body_span, find_cte_definition_span,
11    find_derived_table_alias_span, generate_statement_scoped_node_id,
12};
13use super::select_analyzer::SelectAnalyzer;
14use super::Analyzer;
15use crate::generated::is_value_table_function;
16use crate::types::{issue_codes, Issue, Node, NodeType, Span};
17use sqlparser::ast::{
18    self, CreateView, Cte, Expr, Ident, Join, Query, Select, SetExpr, SetOperator, Statement,
19    TableAlias, TableFactor, TableWithJoins, Values,
20};
21use std::sync::Arc;
22
23/// A visitor trait for traversing the SQL AST.
24///
25/// This trait defines default behavior for visiting nodes (traversing children).
26/// Implementors can override specific methods to add custom logic.
27pub trait Visitor {
28    fn visit_statement(&mut self, statement: &Statement) {
29        match statement {
30            Statement::Query(query) => self.visit_query(query),
31            Statement::Insert(insert) => {
32                if let Some(source) = &insert.source {
33                    self.visit_query(source);
34                }
35            }
36            Statement::CreateTable(create) => {
37                if let Some(query) = &create.query {
38                    self.visit_query(query);
39                }
40            }
41            Statement::CreateView(CreateView { query, .. }) => self.visit_query(query),
42            _ => {}
43        }
44    }
45
46    fn visit_query(&mut self, query: &Query) {
47        if let Some(with) = &query.with {
48            for cte in &with.cte_tables {
49                self.visit_cte(cte);
50            }
51        }
52        self.visit_set_expr(&query.body);
53    }
54
55    fn visit_cte(&mut self, cte: &Cte) {
56        self.visit_query(&cte.query);
57    }
58
59    fn visit_set_expr(&mut self, set_expr: &SetExpr) {
60        match set_expr {
61            SetExpr::Select(select) => self.visit_select(select),
62            SetExpr::Query(query) => self.visit_query(query),
63            SetExpr::SetOperation { left, right, .. } => {
64                self.visit_set_expr(left);
65                self.visit_set_expr(right);
66            }
67            SetExpr::Values(values) => self.visit_values(values),
68            SetExpr::Insert(stmt) => self.visit_statement(stmt),
69            _ => {}
70        }
71    }
72
73    fn visit_select(&mut self, select: &Select) {
74        for from in &select.from {
75            self.visit_table_with_joins(from);
76        }
77    }
78
79    fn visit_table_with_joins(&mut self, table: &TableWithJoins) {
80        self.visit_table_factor(&table.relation);
81        for join in &table.joins {
82            self.visit_join(join);
83        }
84    }
85
86    fn visit_table_factor(&mut self, table: &TableFactor) {
87        match table {
88            TableFactor::Derived { subquery, .. } => self.visit_query(subquery),
89            TableFactor::NestedJoin {
90                table_with_joins, ..
91            } => self.visit_table_with_joins(table_with_joins),
92            _ => {}
93        }
94    }
95
96    fn visit_join(&mut self, join: &Join) {
97        self.visit_table_factor(&join.relation);
98    }
99
100    fn visit_values(&mut self, values: &Values) {
101        for row in &values.rows {
102            for expr in row {
103                self.visit_expr(expr);
104            }
105        }
106    }
107
108    fn visit_expr(&mut self, _expr: &Expr) {}
109}
110
111/// Visitor implementation that builds the lineage graph.
112pub(crate) struct LineageVisitor<'a, 'b> {
113    pub(crate) analyzer: &'a mut Analyzer<'b>,
114    pub(crate) ctx: &'a mut StatementContext,
115    pub(crate) target_node: Option<String>,
116}
117
118impl<'a, 'b> LineageVisitor<'a, 'b> {
119    pub(crate) fn new(
120        analyzer: &'a mut Analyzer<'b>,
121        ctx: &'a mut StatementContext,
122        target_node: Option<String>,
123    ) -> Self {
124        Self {
125            analyzer,
126            ctx,
127            target_node,
128        }
129    }
130
131    #[inline]
132    pub fn target_from_arc(arc: Option<&Arc<str>>) -> Option<String> {
133        arc.map(|s| s.to_string())
134    }
135
136    pub fn set_target_node(&mut self, target: Option<String>) {
137        self.target_node = target;
138    }
139
140    pub fn set_last_operation(&mut self, op: Option<String>) {
141        self.ctx.last_operation = op;
142    }
143
144    /// Locates a span using the provided finder function.
145    ///
146    /// Handles the common logic for span searching:
147    /// - Uses statement-local SQL when available, full request SQL otherwise
148    /// - Adjusts span coordinates from statement-local to request-global
149    /// - Updates the span search cursor after successful matches
150    fn locate_span<F>(&mut self, identifier: &str, finder: F) -> Option<Span>
151    where
152        F: Fn(&str, &str, usize) -> Option<Span>,
153    {
154        self.analyzer
155            .locate_statement_span(self.ctx, identifier, finder)
156    }
157
158    fn locate_cte_definition_span(&mut self, identifier: &str) -> Option<Span> {
159        self.locate_span(identifier, find_cte_definition_span)
160    }
161
162    fn locate_derived_alias_span(&mut self, identifier: &str) -> Option<Span> {
163        self.locate_span(identifier, find_derived_table_alias_span)
164    }
165
166    /// Extract the expression from a JoinOperator's constraint, if any.
167    fn extract_join_constraint_expr(op: &ast::JoinOperator) -> Option<&Expr> {
168        let constraint = match op {
169            ast::JoinOperator::Join(c)
170            | ast::JoinOperator::Inner(c)
171            | ast::JoinOperator::Left(c)
172            | ast::JoinOperator::LeftOuter(c)
173            | ast::JoinOperator::Right(c)
174            | ast::JoinOperator::RightOuter(c)
175            | ast::JoinOperator::FullOuter(c)
176            | ast::JoinOperator::Semi(c)
177            | ast::JoinOperator::LeftSemi(c)
178            | ast::JoinOperator::RightSemi(c)
179            | ast::JoinOperator::Anti(c)
180            | ast::JoinOperator::LeftAnti(c)
181            | ast::JoinOperator::RightAnti(c)
182            | ast::JoinOperator::StraightJoin(c) => Some(c),
183            ast::JoinOperator::AsOf { constraint, .. } => Some(constraint),
184            ast::JoinOperator::CrossJoin(_)
185            | ast::JoinOperator::CrossApply
186            | ast::JoinOperator::OuterApply => None,
187        };
188
189        constraint.and_then(|c| match c {
190            ast::JoinConstraint::On(expr) => Some(expr),
191            _ => None,
192        })
193    }
194
195    /// Extract and record implied foreign key relationships from a JOIN condition.
196    ///
197    /// For equality expressions like `t1.a = t2.b`, we record **both directions**
198    /// as potential FK relationships. This is intentional because:
199    ///
200    /// 1. **No authoritative direction**: From syntax alone, we cannot determine
201    ///    which column is the FK and which is the referenced PK. The true direction
202    ///    depends on schema knowledge we may not have.
203    ///
204    /// 2. **Consumer deduplication**: Downstream consumers (like the React SchemaView)
205    ///    normalize and deduplicate reciprocal FK edges before rendering, so storing
206    ///    both directions doesn't create duplicate visual edges.
207    ///
208    /// 3. **Heuristic accuracy**: Recording both ensures we capture the relationship
209    ///    regardless of how the user wrote the JOIN condition (`a.id = b.a_id` vs
210    ///    `b.a_id = a.id`).
211    ///
212    /// Self-joins are excluded since `t.a = t.b` within the same table doesn't
213    /// imply a cross-table FK relationship (see [`StatementContext::record_implied_foreign_key`]).
214    fn record_join_fk_relationships(&mut self, expr: &Expr) {
215        use sqlparser::ast::BinaryOperator;
216
217        match expr {
218            Expr::BinaryOp { left, op, right } if *op == BinaryOperator::And => {
219                // Recurse into AND conditions (common in multi-column joins)
220                self.record_join_fk_relationships(left);
221                self.record_join_fk_relationships(right);
222            }
223            Expr::BinaryOp { left, op, right } if *op == BinaryOperator::Eq => {
224                self.record_equality_fk(left, right);
225            }
226            Expr::Nested(inner) => self.record_join_fk_relationships(inner),
227            _ => {}
228        }
229    }
230
231    /// Record FK relationships from an equality expression (t1.a = t2.b).
232    fn record_equality_fk(&mut self, left: &Expr, right: &Expr) {
233        let Some(left_ref) = Self::extract_column_ref(left) else {
234            return;
235        };
236        let Some(right_ref) = Self::extract_column_ref(right) else {
237            return;
238        };
239
240        let left_table = left_ref
241            .0
242            .as_ref()
243            .and_then(|t| self.resolve_table_alias(Some(t)));
244        let right_table = right_ref
245            .0
246            .as_ref()
247            .and_then(|t| self.resolve_table_alias(Some(t)));
248
249        let (Some(left_table), Some(right_table)) = (left_table, right_table) else {
250            return;
251        };
252
253        // Record FK in both directions (see record_join_fk_relationships docs for rationale)
254        self.ctx
255            .record_implied_foreign_key(&left_table, &left_ref.1, &right_table, &right_ref.1);
256        self.ctx
257            .record_implied_foreign_key(&right_table, &right_ref.1, &left_table, &left_ref.1);
258    }
259
260    /// Extract a (table, column) pair from a simple column reference expression.
261    fn extract_column_ref(expr: &Expr) -> Option<(Option<String>, String)> {
262        match expr {
263            Expr::Identifier(ident) => Some((None, ident.value.clone())),
264            Expr::CompoundIdentifier(idents) if idents.len() == 2 => {
265                Some((Some(idents[0].value.clone()), idents[1].value.clone()))
266            }
267            Expr::CompoundIdentifier(idents) if idents.len() >= 2 => {
268                // schema.table.column - take last two parts
269                let len = idents.len();
270                Some((
271                    Some(idents[len - 2].value.clone()),
272                    idents[len - 1].value.clone(),
273                ))
274            }
275            _ => None,
276        }
277    }
278
279    pub fn add_source_table(&mut self, table_name: &str) -> Option<String> {
280        self.analyzer
281            .add_source_table(self.ctx, table_name, self.target_node.as_deref(), None)
282    }
283
284    pub fn add_source_table_with_alias(
285        &mut self,
286        table_name: &str,
287        alias: Option<&str>,
288    ) -> Option<String> {
289        self.analyzer
290            .add_source_table(self.ctx, table_name, self.target_node.as_deref(), alias)
291    }
292
293    pub fn analyze_dml_target(
294        &mut self,
295        table_name: &str,
296        alias: Option<&TableAlias>,
297    ) -> Option<(String, Arc<str>)> {
298        let canonical_res = self
299            .analyzer
300            .add_source_table(self.ctx, table_name, None, None);
301        let canonical = canonical_res
302            .clone()
303            .unwrap_or_else(|| self.analyzer.normalize_table_name(table_name));
304
305        if let (Some(a), Some(canonical_name)) = (alias, canonical_res) {
306            self.ctx
307                .table_aliases
308                .insert(a.name.to_string(), canonical_name);
309        }
310
311        let node_id = self
312            .ctx
313            .table_node_ids
314            .get(&canonical)
315            .cloned()
316            .unwrap_or_else(|| self.analyzer.relation_node_id(&canonical));
317
318        self.analyzer
319            .tracker
320            .record_produced(&canonical, self.ctx.statement_index);
321        self.analyzer
322            .add_table_columns_from_schema(self.ctx, &canonical, &node_id);
323
324        Some((canonical, node_id))
325    }
326
327    pub fn analyze_dml_target_factor(&mut self, table: &TableFactor) -> Option<Arc<str>> {
328        if let TableFactor::Table { name, alias, .. } = table {
329            let table_name = name.to_string();
330            self.analyze_dml_target(&table_name, alias.as_ref())
331                .map(|(_, node_id)| node_id)
332        } else {
333            self.visit_table_factor(table);
334            None
335        }
336    }
337
338    pub fn analyze_dml_target_from_table_with_joins(
339        &mut self,
340        table: &TableWithJoins,
341    ) -> Option<Arc<str>> {
342        if let TableFactor::Table { name, alias, .. } = &table.relation {
343            let table_name = name.to_string();
344            self.analyze_dml_target(&table_name, alias.as_ref())
345                .map(|(_, node_id)| node_id)
346        } else {
347            self.visit_table_with_joins(table);
348            None
349        }
350    }
351
352    pub fn register_aliases_in_table_with_joins(&mut self, table_with_joins: &TableWithJoins) {
353        self.register_aliases_in_table_factor(&table_with_joins.relation);
354        for join in &table_with_joins.joins {
355            self.register_aliases_in_table_factor(&join.relation);
356        }
357    }
358
359    fn register_aliases_in_table_factor(&mut self, table_factor: &TableFactor) {
360        match table_factor {
361            TableFactor::Table {
362                name,
363                alias: Some(a),
364                ..
365            } => {
366                let canonical = self
367                    .analyzer
368                    .canonicalize_table_reference(&name.to_string())
369                    .canonical;
370                self.ctx.table_aliases.insert(a.name.to_string(), canonical);
371            }
372            TableFactor::Derived { alias: Some(a), .. } => {
373                self.ctx.subquery_aliases.insert(a.name.to_string());
374            }
375            TableFactor::NestedJoin {
376                table_with_joins, ..
377            } => {
378                self.register_aliases_in_table_with_joins(table_with_joins);
379            }
380            _ => {}
381        }
382    }
383
384    pub fn resolve_table_alias(&self, alias: Option<&str>) -> Option<String> {
385        self.analyzer.resolve_table_alias(self.ctx, alias)
386    }
387
388    pub(super) fn canonicalize_table_reference(&self, name: &str) -> super::TableResolution {
389        self.analyzer.canonicalize_table_reference(name)
390    }
391
392    /// Extracts table identifiers from an expression (best-effort for unsupported constructs).
393    ///
394    /// Used for PIVOT, UNPIVOT, and table functions where full semantic analysis is not
395    /// implemented. This may produce false positives (column references mistaken for tables)
396    /// or false negatives (table references in unhandled expression types).
397    fn extract_identifiers_from_expr(&mut self, expr: &Expr) {
398        match expr {
399            Expr::Identifier(ident) => {
400                self.try_add_identifier_as_table(std::slice::from_ref(ident));
401            }
402            Expr::CompoundIdentifier(idents) => {
403                self.try_add_identifier_as_table(idents);
404            }
405            Expr::Function(func) => {
406                if let ast::FunctionArguments::List(arg_list) = &func.args {
407                    for arg in &arg_list.args {
408                        if let ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(e)) = arg {
409                            self.extract_identifiers_from_expr(e);
410                        }
411                    }
412                }
413            }
414            Expr::BinaryOp { left, right, .. } => {
415                self.extract_identifiers_from_expr(left);
416                self.extract_identifiers_from_expr(right);
417            }
418            Expr::UnaryOp { expr, .. } => {
419                self.extract_identifiers_from_expr(expr);
420            }
421            Expr::Nested(e) => {
422                self.extract_identifiers_from_expr(e);
423            }
424            Expr::InList { expr, list, .. } => {
425                self.extract_identifiers_from_expr(expr);
426                for e in list {
427                    self.extract_identifiers_from_expr(e);
428                }
429            }
430            Expr::Case {
431                operand,
432                conditions,
433                else_result,
434                ..
435            } => {
436                if let Some(op) = operand {
437                    self.extract_identifiers_from_expr(op);
438                }
439                for case_when in conditions {
440                    self.extract_identifiers_from_expr(&case_when.condition);
441                    self.extract_identifiers_from_expr(&case_when.result);
442                }
443                if let Some(else_r) = else_result {
444                    self.extract_identifiers_from_expr(else_r);
445                }
446            }
447            _ => {}
448        }
449    }
450
451    fn try_add_identifier_as_table(&mut self, idents: &[Ident]) {
452        if idents.is_empty() {
453            return;
454        }
455
456        let name = idents
457            .iter()
458            .map(|i| i.value.as_str())
459            .collect::<Vec<_>>()
460            .join(".");
461
462        let resolution = self.analyzer.canonicalize_table_reference(&name);
463        if resolution.matched_schema {
464            self.add_source_table(&name);
465        }
466    }
467
468    /// Emits a warning for unsupported alias usage in a clause.
469    fn emit_alias_warning(&mut self, clause_name: &str, alias_name: &str) {
470        let dialect = self.analyzer.request.dialect;
471        let statement_index = self.ctx.statement_index;
472        self.analyzer.issues.push(alias_visibility_warning(
473            dialect,
474            clause_name,
475            alias_name,
476            statement_index,
477        ));
478    }
479
480    /// Analyzes ORDER BY clause for alias visibility warnings.
481    ///
482    /// Checks if aliases from the SELECT list are used in ORDER BY expressions
483    /// and emits warnings for dialects that don't support alias references in ORDER BY.
484    fn analyze_order_by(&mut self, order_by: &ast::OrderBy) {
485        let dialect = self.analyzer.request.dialect;
486
487        let order_exprs = match &order_by.kind {
488            ast::OrderByKind::Expressions(exprs) => exprs,
489            ast::OrderByKind::All(_) => return,
490        };
491
492        // Check for alias usage in ORDER BY clause
493        if !dialect.alias_in_order_by() {
494            for order_expr in order_exprs {
495                let identifiers = ExpressionAnalyzer::extract_simple_identifiers(&order_expr.expr);
496                for ident in &identifiers {
497                    let normalized_ident = self.analyzer.normalize_identifier(ident);
498                    if let Some(alias_name) = self
499                        .ctx
500                        .output_columns
501                        .iter()
502                        .find(|c| self.analyzer.normalize_identifier(&c.name) == normalized_ident)
503                        .map(|c| c.name.clone())
504                    {
505                        self.emit_alias_warning("ORDER BY", &alias_name);
506                    }
507                }
508            }
509        }
510
511        // Also analyze any subqueries in ORDER BY expressions
512        for order_expr in order_exprs {
513            let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
514            ea.analyze(&order_expr.expr);
515        }
516    }
517}
518
519impl<'a, 'b> Visitor for LineageVisitor<'a, 'b> {
520    fn visit_query(&mut self, query: &Query) {
521        if let Some(with) = &query.with {
522            let mut cte_ids: Vec<(String, Arc<str>)> = Vec::new();
523            for cte in &with.cte_tables {
524                let cte_name = cte.alias.name.to_string();
525                let cte_span = self.locate_cte_definition_span(&cte_name);
526                let body_span = cte_span.and_then(|span| {
527                    let sql = if let Some(source) = &self.analyzer.current_statement_source {
528                        source.sql.as_ref()
529                    } else {
530                        self.analyzer.request.sql.as_str()
531                    };
532                    find_cte_body_span(sql, span)
533                });
534                let cte_id = self.ctx.add_node(Node {
535                    id: generate_statement_scoped_node_id(
536                        "cte",
537                        self.ctx.statement_index,
538                        &cte_name,
539                    ),
540                    node_type: NodeType::Cte,
541                    label: cte_name.clone().into(),
542                    qualified_name: Some(cte_name.clone().into()),
543                    span: cte_span,
544                    name_spans: cte_span.into_iter().collect(),
545                    body_span,
546                    ..Default::default()
547                });
548                if let Some(span) = cte_span {
549                    let cursor = self.ctx.relation_span_cursor(&cte_name);
550                    *cursor = (*cursor).max(span.end);
551                }
552
553                self.ctx
554                    .cte_definitions
555                    .insert(cte_name.clone(), cte_id.clone());
556                self.ctx
557                    .cte_node_to_name
558                    .insert(cte_id.clone(), cte_name.clone());
559                self.analyzer.tracker.record_cte(&cte_name);
560                cte_ids.push((cte_name, cte_id));
561            }
562
563            for (cte, (_, cte_id)) in with.cte_tables.iter().zip(cte_ids.iter()) {
564                let projection_checkpoint = self.ctx.projection_checkpoint();
565                let mut cte_visitor =
566                    LineageVisitor::new(self.analyzer, self.ctx, Some(cte_id.to_string()));
567                cte_visitor.visit_query(&cte.query);
568                let columns = self.ctx.take_output_columns_since(projection_checkpoint);
569                self.ctx
570                    .register_cte_output_columns(cte.alias.name.to_string(), columns);
571            }
572        }
573        self.visit_set_expr(&query.body);
574
575        // Analyze ORDER BY for alias visibility warnings
576        if let Some(order_by) = &query.order_by {
577            self.analyze_order_by(order_by);
578        }
579    }
580
581    fn visit_set_expr(&mut self, set_expr: &SetExpr) {
582        match set_expr {
583            SetExpr::Select(select) => self.visit_select(select),
584            SetExpr::Query(query) => self.visit_query(query),
585            SetExpr::SetOperation {
586                op, left, right, ..
587            } => {
588                let op_name = match op {
589                    SetOperator::Union => "UNION",
590                    SetOperator::Intersect => "INTERSECT",
591                    SetOperator::Except => "EXCEPT",
592                    SetOperator::Minus => "MINUS",
593                };
594                self.visit_set_expr(left);
595                self.visit_set_expr(right);
596                if self.target_node.is_some() {
597                    self.ctx.last_operation = Some(op_name.to_string());
598                }
599            }
600            SetExpr::Values(values) => self.visit_values(values),
601            SetExpr::Insert(insert_stmt) => {
602                let Statement::Insert(insert) = insert_stmt else {
603                    return;
604                };
605                let target_name = insert.table.to_string();
606                self.add_source_table(&target_name);
607            }
608            SetExpr::Table(tbl) => {
609                let name = tbl
610                    .table_name
611                    .as_ref()
612                    .map(|n| n.to_string())
613                    .unwrap_or_default();
614                if !name.is_empty() {
615                    self.add_source_table(&name);
616                }
617            }
618            _ => {}
619        }
620    }
621
622    fn visit_select(&mut self, select: &Select) {
623        self.ctx.push_scope();
624        for table_with_joins in &select.from {
625            self.visit_table_with_joins(table_with_joins);
626        }
627        if self.analyzer.column_lineage_enabled {
628            let sink_node = self.ctx.sink_node_id().map(|node_id| node_id.to_string());
629            let target_node = self.target_node.clone().or(sink_node);
630            let mut select_analyzer = SelectAnalyzer::new(self.analyzer, self.ctx, target_node);
631            select_analyzer.analyze(select);
632        }
633        self.ctx.pop_scope();
634    }
635
636    fn visit_table_with_joins(&mut self, table_with_joins: &TableWithJoins) {
637        self.visit_table_factor(&table_with_joins.relation);
638        for join in &table_with_joins.joins {
639            let (join_type, join_condition) = Analyzer::convert_join_operator(&join.join_operator);
640            self.ctx.current_join_info.join_type = join_type;
641            self.ctx.current_join_info.join_condition = join_condition;
642            self.ctx.last_operation = Analyzer::join_type_to_operation(join_type);
643            self.visit_table_factor(&join.relation);
644
645            // Analyze JOIN condition expression to capture column references for implied schema
646            if let Some(expr) = Self::extract_join_constraint_expr(&join.join_operator) {
647                let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
648                ea.analyze(expr);
649
650                // Extract implied FK relationships from equality conditions
651                self.record_join_fk_relationships(expr);
652            }
653
654            self.ctx.current_join_info.join_type = None;
655            self.ctx.current_join_info.join_condition = None;
656        }
657    }
658
659    fn visit_table_factor(&mut self, table_factor: &TableFactor) {
660        match table_factor {
661            TableFactor::Table { name, alias, .. } => {
662                let table_name = name.to_string();
663                let alias_str = alias.as_ref().map(|a| a.name.to_string());
664                let canonical = self.add_source_table_with_alias(&table_name, alias_str.as_deref());
665                if let (Some(a), Some(canonical_name)) = (&alias_str, &canonical) {
666                    self.ctx
667                        .register_alias_in_scope(a.clone(), canonical_name.clone());
668                }
669            }
670            TableFactor::Derived {
671                subquery, alias, ..
672            } => {
673                // A derived table (subquery in a FROM clause) is treated like a temporary CTE.
674                // We create a node for it in the graph, analyze its subquery to determine its
675                // output columns, and then register its alias and columns in the current scope
676                // so the outer query can reference it.
677                let alias_name = alias.as_ref().map(|a| a.name.to_string());
678                let projection_checkpoint = self.ctx.projection_checkpoint();
679                let derived_span = alias_name
680                    .as_ref()
681                    .and_then(|name| self.locate_derived_alias_span(name));
682
683                // We model derived tables as CTEs in the graph since they are conceptually
684                // similar: both are ephemeral, named result sets scoped to a single query.
685                // This avoids introducing a separate NodeType for a very similar concept.
686                let derived_node_id = alias_name.as_ref().map(|name| {
687                    self.ctx.add_node(Node {
688                        id: generate_statement_scoped_node_id(
689                            "derived",
690                            self.ctx.statement_index,
691                            name,
692                        ),
693                        node_type: NodeType::Cte,
694                        label: name.clone().into(),
695                        qualified_name: Some(name.clone().into()),
696                        span: derived_span,
697                        name_spans: derived_span.into_iter().collect(),
698                        ..Default::default()
699                    })
700                });
701
702                if let (Some(name), Some(node_id)) = (alias_name.as_ref(), derived_node_id.as_ref())
703                {
704                    // Track reverse mapping for wildcard inference without polluting
705                    // the global CTE definition map (which stores real WITH items).
706                    self.ctx
707                        .cte_node_to_name
708                        .insert(node_id.clone(), name.clone());
709                }
710
711                let mut derived_visitor = LineageVisitor::new(
712                    self.analyzer,
713                    self.ctx,
714                    derived_node_id.as_ref().map(|id| id.to_string()),
715                );
716                derived_visitor.visit_query(subquery);
717                let columns = self.ctx.take_output_columns_since(projection_checkpoint);
718
719                if let (Some(name), Some(node_id)) = (alias_name, derived_node_id) {
720                    self.ctx
721                        .register_table_in_scope(name.clone(), node_id.clone());
722                    self.ctx.register_alias_in_scope(name.clone(), name.clone());
723                    self.ctx.register_subquery_columns_in_scope(name, columns);
724                }
725            }
726            TableFactor::NestedJoin {
727                table_with_joins, ..
728            } => {
729                self.visit_table_with_joins(table_with_joins);
730            }
731            TableFactor::TableFunction { expr, alias, .. } => {
732                self.extract_identifiers_from_expr(expr);
733                let is_value_table = matches!(expr, Expr::Function(func) if is_value_table_function(
734                    self.analyzer.request.dialect,
735                    &func.name.to_string(),
736                ));
737                if is_value_table {
738                    self.ctx.mark_table_function_in_scope();
739                }
740                if let Some(a) = alias {
741                    self.ctx
742                        .register_subquery_alias_in_scope(a.name.to_string());
743                }
744                self.analyzer.issues.push(
745                    Issue::info(
746                        issue_codes::UNSUPPORTED_SYNTAX,
747                        "Table function lineage extracted with best-effort identifier matching",
748                    )
749                    .with_statement(self.ctx.statement_index),
750                );
751            }
752            TableFactor::Pivot {
753                table,
754                aggregate_functions,
755                value_column,
756                value_source,
757                alias,
758                ..
759            } => {
760                self.visit_table_factor(table);
761                for func in aggregate_functions {
762                    self.extract_identifiers_from_expr(&func.expr);
763                }
764                for expr in value_column {
765                    self.extract_identifiers_from_expr(expr);
766                }
767                match value_source {
768                    ast::PivotValueSource::List(values) => {
769                        for value in values {
770                            self.extract_identifiers_from_expr(&value.expr);
771                        }
772                    }
773                    ast::PivotValueSource::Any(_) => {}
774                    ast::PivotValueSource::Subquery(q) => {
775                        self.visit_query(q);
776                    }
777                }
778                if let Some(a) = alias {
779                    self.ctx
780                        .register_subquery_alias_in_scope(a.name.to_string());
781                }
782                self.analyzer.issues.push(
783                    Issue::warning(
784                        issue_codes::UNSUPPORTED_SYNTAX,
785                        "PIVOT lineage extracted with best-effort identifier matching",
786                    )
787                    .with_statement(self.ctx.statement_index),
788                );
789            }
790            TableFactor::Unpivot {
791                table,
792                columns,
793                alias,
794                ..
795            } => {
796                self.visit_table_factor(table);
797                for col in columns {
798                    self.extract_identifiers_from_expr(&col.expr);
799                }
800                if let Some(a) = alias {
801                    self.ctx
802                        .register_subquery_alias_in_scope(a.name.to_string());
803                }
804                self.analyzer.issues.push(
805                    Issue::warning(
806                        issue_codes::UNSUPPORTED_SYNTAX,
807                        "UNPIVOT lineage extracted with best-effort identifier matching",
808                    )
809                    .with_statement(self.ctx.statement_index),
810                );
811            }
812            TableFactor::UNNEST {
813                array_exprs, alias, ..
814            } => {
815                // UNNEST expands array columns into rows. Extract column references
816                // from the array expressions and resolve them to their source tables.
817                for expr in array_exprs {
818                    let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
819                    let column_refs = ea.extract_column_refs_with_warning(expr);
820                    for col_ref in &column_refs {
821                        // Resolve the column to its source table and add it as a data source
822                        if let Some(table_canonical) = self.analyzer.resolve_column_table(
823                            self.ctx,
824                            col_ref.table.as_deref(),
825                            &col_ref.column,
826                        ) {
827                            self.add_source_table(&table_canonical);
828                        }
829                    }
830                }
831                if let Some(a) = alias {
832                    self.ctx
833                        .register_subquery_alias_in_scope(a.name.to_string());
834                }
835            }
836            _ => {}
837        }
838    }
839
840    fn visit_values(&mut self, values: &Values) {
841        let mut expr_analyzer = ExpressionAnalyzer::new(self.analyzer, self.ctx);
842        for row in &values.rows {
843            for expr in row {
844                expr_analyzer.analyze(expr);
845            }
846        }
847    }
848}