Skip to main content

flowscope_core/analyzer/
visitor.rs

1//! Visitor pattern for AST traversal and lineage analysis.
2//!
3//! This module provides a visitor-based approach to traversing SQL AST nodes
4//! and building lineage graphs. It separates traversal logic (the `Visitor` trait)
5//! from analysis logic (the `LineageVisitor` implementation).
6
7use super::context::StatementContext;
8use super::expression::ExpressionAnalyzer;
9use super::helpers::{
10    alias_visibility_warning, find_cte_definition_span, find_derived_table_alias_span,
11    generate_node_id,
12};
13use super::select_analyzer::SelectAnalyzer;
14use super::Analyzer;
15use crate::generated::is_value_table_function;
16use crate::types::{issue_codes, Issue, Node, NodeType, Span};
17use sqlparser::ast::{
18    self, CreateView, Cte, Expr, Ident, Join, Query, Select, SetExpr, SetOperator, Statement,
19    TableAlias, TableFactor, TableWithJoins, Values,
20};
21use std::sync::Arc;
22
23/// A visitor trait for traversing the SQL AST.
24///
25/// This trait defines default behavior for visiting nodes (traversing children).
26/// Implementors can override specific methods to add custom logic.
27pub trait Visitor {
28    fn visit_statement(&mut self, statement: &Statement) {
29        match statement {
30            Statement::Query(query) => self.visit_query(query),
31            Statement::Insert(insert) => {
32                if let Some(source) = &insert.source {
33                    self.visit_query(source);
34                }
35            }
36            Statement::CreateTable(create) => {
37                if let Some(query) = &create.query {
38                    self.visit_query(query);
39                }
40            }
41            Statement::CreateView(CreateView { query, .. }) => self.visit_query(query),
42            _ => {}
43        }
44    }
45
46    fn visit_query(&mut self, query: &Query) {
47        if let Some(with) = &query.with {
48            for cte in &with.cte_tables {
49                self.visit_cte(cte);
50            }
51        }
52        self.visit_set_expr(&query.body);
53    }
54
55    fn visit_cte(&mut self, cte: &Cte) {
56        self.visit_query(&cte.query);
57    }
58
59    fn visit_set_expr(&mut self, set_expr: &SetExpr) {
60        match set_expr {
61            SetExpr::Select(select) => self.visit_select(select),
62            SetExpr::Query(query) => self.visit_query(query),
63            SetExpr::SetOperation { left, right, .. } => {
64                self.visit_set_expr(left);
65                self.visit_set_expr(right);
66            }
67            SetExpr::Values(values) => self.visit_values(values),
68            SetExpr::Insert(stmt) => self.visit_statement(stmt),
69            _ => {}
70        }
71    }
72
73    fn visit_select(&mut self, select: &Select) {
74        for from in &select.from {
75            self.visit_table_with_joins(from);
76        }
77    }
78
79    fn visit_table_with_joins(&mut self, table: &TableWithJoins) {
80        self.visit_table_factor(&table.relation);
81        for join in &table.joins {
82            self.visit_join(join);
83        }
84    }
85
86    fn visit_table_factor(&mut self, table: &TableFactor) {
87        match table {
88            TableFactor::Derived { subquery, .. } => self.visit_query(subquery),
89            TableFactor::NestedJoin {
90                table_with_joins, ..
91            } => self.visit_table_with_joins(table_with_joins),
92            _ => {}
93        }
94    }
95
96    fn visit_join(&mut self, join: &Join) {
97        self.visit_table_factor(&join.relation);
98    }
99
100    fn visit_values(&mut self, values: &Values) {
101        for row in &values.rows {
102            for expr in row {
103                self.visit_expr(expr);
104            }
105        }
106    }
107
108    fn visit_expr(&mut self, _expr: &Expr) {}
109}
110
111/// Visitor implementation that builds the lineage graph.
112pub(crate) struct LineageVisitor<'a, 'b> {
113    pub(crate) analyzer: &'a mut Analyzer<'b>,
114    pub(crate) ctx: &'a mut StatementContext,
115    pub(crate) target_node: Option<String>,
116}
117
118impl<'a, 'b> LineageVisitor<'a, 'b> {
119    pub(crate) fn new(
120        analyzer: &'a mut Analyzer<'b>,
121        ctx: &'a mut StatementContext,
122        target_node: Option<String>,
123    ) -> Self {
124        Self {
125            analyzer,
126            ctx,
127            target_node,
128        }
129    }
130
131    #[inline]
132    pub fn target_from_arc(arc: Option<&Arc<str>>) -> Option<String> {
133        arc.map(|s| s.to_string())
134    }
135
136    pub fn set_target_node(&mut self, target: Option<String>) {
137        self.target_node = target;
138    }
139
140    pub fn set_last_operation(&mut self, op: Option<String>) {
141        self.ctx.last_operation = op;
142    }
143
144    /// Locates a span using the provided finder function.
145    ///
146    /// Handles the common logic for span searching:
147    /// - Uses statement-local SQL when available, full request SQL otherwise
148    /// - Adjusts span coordinates from statement-local to request-global
149    /// - Updates the span search cursor after successful matches
150    fn locate_span<F>(&mut self, identifier: &str, finder: F) -> Option<Span>
151    where
152        F: Fn(&str, &str, usize) -> Option<Span>,
153    {
154        let search_start = self.ctx.span_search_cursor;
155
156        let (sql, offset) = if let Some(source) = &self.analyzer.current_statement_source {
157            (
158                &source.sql[source.range.start..source.range.end],
159                source.range.start,
160            )
161        } else {
162            (self.analyzer.request.sql.as_str(), 0)
163        };
164
165        let span = finder(sql, identifier, search_start)?;
166
167        // Invariant: cursor should only move forward (left-to-right traversal)
168        debug_assert!(
169            span.end >= self.ctx.span_search_cursor,
170            "Span cursor moved backward: {} -> {} (identifier: '{}')",
171            self.ctx.span_search_cursor,
172            span.end,
173            identifier
174        );
175
176        self.ctx.span_search_cursor = span.end;
177        Some(Span::new(offset + span.start, offset + span.end))
178    }
179
180    fn locate_cte_definition_span(&mut self, identifier: &str) -> Option<Span> {
181        self.locate_span(identifier, find_cte_definition_span)
182    }
183
184    fn locate_derived_alias_span(&mut self, identifier: &str) -> Option<Span> {
185        self.locate_span(identifier, find_derived_table_alias_span)
186    }
187
188    /// Extract the expression from a JoinOperator's constraint, if any.
189    fn extract_join_constraint_expr(op: &ast::JoinOperator) -> Option<&Expr> {
190        let constraint = match op {
191            ast::JoinOperator::Join(c)
192            | ast::JoinOperator::Inner(c)
193            | ast::JoinOperator::Left(c)
194            | ast::JoinOperator::LeftOuter(c)
195            | ast::JoinOperator::Right(c)
196            | ast::JoinOperator::RightOuter(c)
197            | ast::JoinOperator::FullOuter(c)
198            | ast::JoinOperator::Semi(c)
199            | ast::JoinOperator::LeftSemi(c)
200            | ast::JoinOperator::RightSemi(c)
201            | ast::JoinOperator::Anti(c)
202            | ast::JoinOperator::LeftAnti(c)
203            | ast::JoinOperator::RightAnti(c)
204            | ast::JoinOperator::StraightJoin(c) => Some(c),
205            ast::JoinOperator::AsOf { constraint, .. } => Some(constraint),
206            ast::JoinOperator::CrossJoin(_)
207            | ast::JoinOperator::CrossApply
208            | ast::JoinOperator::OuterApply => None,
209        };
210
211        constraint.and_then(|c| match c {
212            ast::JoinConstraint::On(expr) => Some(expr),
213            _ => None,
214        })
215    }
216
217    /// Extract and record implied foreign key relationships from a JOIN condition.
218    ///
219    /// For equality expressions like `t1.a = t2.b`, we record **both directions**
220    /// as potential FK relationships. This is intentional because:
221    ///
222    /// 1. **No authoritative direction**: From syntax alone, we cannot determine
223    ///    which column is the FK and which is the referenced PK. The true direction
224    ///    depends on schema knowledge we may not have.
225    ///
226    /// 2. **Consumer deduplication**: Downstream consumers (like the React SchemaView)
227    ///    normalize and deduplicate reciprocal FK edges before rendering, so storing
228    ///    both directions doesn't create duplicate visual edges.
229    ///
230    /// 3. **Heuristic accuracy**: Recording both ensures we capture the relationship
231    ///    regardless of how the user wrote the JOIN condition (`a.id = b.a_id` vs
232    ///    `b.a_id = a.id`).
233    ///
234    /// Self-joins are excluded since `t.a = t.b` within the same table doesn't
235    /// imply a cross-table FK relationship (see [`StatementContext::record_implied_foreign_key`]).
236    fn record_join_fk_relationships(&mut self, expr: &Expr) {
237        use sqlparser::ast::BinaryOperator;
238
239        match expr {
240            Expr::BinaryOp { left, op, right } if *op == BinaryOperator::And => {
241                // Recurse into AND conditions (common in multi-column joins)
242                self.record_join_fk_relationships(left);
243                self.record_join_fk_relationships(right);
244            }
245            Expr::BinaryOp { left, op, right } if *op == BinaryOperator::Eq => {
246                self.record_equality_fk(left, right);
247            }
248            Expr::Nested(inner) => self.record_join_fk_relationships(inner),
249            _ => {}
250        }
251    }
252
253    /// Record FK relationships from an equality expression (t1.a = t2.b).
254    fn record_equality_fk(&mut self, left: &Expr, right: &Expr) {
255        let Some(left_ref) = Self::extract_column_ref(left) else {
256            return;
257        };
258        let Some(right_ref) = Self::extract_column_ref(right) else {
259            return;
260        };
261
262        let left_table = left_ref
263            .0
264            .as_ref()
265            .and_then(|t| self.resolve_table_alias(Some(t)));
266        let right_table = right_ref
267            .0
268            .as_ref()
269            .and_then(|t| self.resolve_table_alias(Some(t)));
270
271        let (Some(left_table), Some(right_table)) = (left_table, right_table) else {
272            return;
273        };
274
275        // Record FK in both directions (see record_join_fk_relationships docs for rationale)
276        self.ctx
277            .record_implied_foreign_key(&left_table, &left_ref.1, &right_table, &right_ref.1);
278        self.ctx
279            .record_implied_foreign_key(&right_table, &right_ref.1, &left_table, &left_ref.1);
280    }
281
282    /// Extract a (table, column) pair from a simple column reference expression.
283    fn extract_column_ref(expr: &Expr) -> Option<(Option<String>, String)> {
284        match expr {
285            Expr::Identifier(ident) => Some((None, ident.value.clone())),
286            Expr::CompoundIdentifier(idents) if idents.len() == 2 => {
287                Some((Some(idents[0].value.clone()), idents[1].value.clone()))
288            }
289            Expr::CompoundIdentifier(idents) if idents.len() >= 2 => {
290                // schema.table.column - take last two parts
291                let len = idents.len();
292                Some((
293                    Some(idents[len - 2].value.clone()),
294                    idents[len - 1].value.clone(),
295                ))
296            }
297            _ => None,
298        }
299    }
300
301    pub fn add_source_table(&mut self, table_name: &str) -> Option<String> {
302        self.analyzer
303            .add_source_table(self.ctx, table_name, self.target_node.as_deref(), None)
304    }
305
306    pub fn add_source_table_with_alias(
307        &mut self,
308        table_name: &str,
309        alias: Option<&str>,
310    ) -> Option<String> {
311        self.analyzer
312            .add_source_table(self.ctx, table_name, self.target_node.as_deref(), alias)
313    }
314
315    pub fn analyze_dml_target(
316        &mut self,
317        table_name: &str,
318        alias: Option<&TableAlias>,
319    ) -> Option<(String, Arc<str>)> {
320        let canonical_res = self
321            .analyzer
322            .add_source_table(self.ctx, table_name, None, None);
323        let canonical = canonical_res
324            .clone()
325            .unwrap_or_else(|| self.analyzer.normalize_table_name(table_name));
326
327        if let (Some(a), Some(canonical_name)) = (alias, canonical_res) {
328            self.ctx
329                .table_aliases
330                .insert(a.name.to_string(), canonical_name);
331        }
332
333        let node_id = self
334            .ctx
335            .table_node_ids
336            .get(&canonical)
337            .cloned()
338            .unwrap_or_else(|| self.analyzer.relation_node_id(&canonical));
339
340        self.analyzer
341            .tracker
342            .record_produced(&canonical, self.ctx.statement_index);
343        self.analyzer
344            .add_table_columns_from_schema(self.ctx, &canonical, &node_id);
345
346        Some((canonical, node_id))
347    }
348
349    pub fn analyze_dml_target_factor(&mut self, table: &TableFactor) -> Option<Arc<str>> {
350        if let TableFactor::Table { name, alias, .. } = table {
351            let table_name = name.to_string();
352            self.analyze_dml_target(&table_name, alias.as_ref())
353                .map(|(_, node_id)| node_id)
354        } else {
355            self.visit_table_factor(table);
356            None
357        }
358    }
359
360    pub fn analyze_dml_target_from_table_with_joins(
361        &mut self,
362        table: &TableWithJoins,
363    ) -> Option<Arc<str>> {
364        if let TableFactor::Table { name, alias, .. } = &table.relation {
365            let table_name = name.to_string();
366            self.analyze_dml_target(&table_name, alias.as_ref())
367                .map(|(_, node_id)| node_id)
368        } else {
369            self.visit_table_with_joins(table);
370            None
371        }
372    }
373
374    pub fn register_aliases_in_table_with_joins(&mut self, table_with_joins: &TableWithJoins) {
375        self.register_aliases_in_table_factor(&table_with_joins.relation);
376        for join in &table_with_joins.joins {
377            self.register_aliases_in_table_factor(&join.relation);
378        }
379    }
380
381    fn register_aliases_in_table_factor(&mut self, table_factor: &TableFactor) {
382        match table_factor {
383            TableFactor::Table {
384                name,
385                alias: Some(a),
386                ..
387            } => {
388                let canonical = self
389                    .analyzer
390                    .canonicalize_table_reference(&name.to_string())
391                    .canonical;
392                self.ctx.table_aliases.insert(a.name.to_string(), canonical);
393            }
394            TableFactor::Derived { alias: Some(a), .. } => {
395                self.ctx.subquery_aliases.insert(a.name.to_string());
396            }
397            TableFactor::NestedJoin {
398                table_with_joins, ..
399            } => {
400                self.register_aliases_in_table_with_joins(table_with_joins);
401            }
402            _ => {}
403        }
404    }
405
406    pub fn resolve_table_alias(&self, alias: Option<&str>) -> Option<String> {
407        self.analyzer.resolve_table_alias(self.ctx, alias)
408    }
409
410    pub(super) fn canonicalize_table_reference(&self, name: &str) -> super::TableResolution {
411        self.analyzer.canonicalize_table_reference(name)
412    }
413
414    /// Extracts table identifiers from an expression (best-effort for unsupported constructs).
415    ///
416    /// Used for PIVOT, UNPIVOT, and table functions where full semantic analysis is not
417    /// implemented. This may produce false positives (column references mistaken for tables)
418    /// or false negatives (table references in unhandled expression types).
419    fn extract_identifiers_from_expr(&mut self, expr: &Expr) {
420        match expr {
421            Expr::Identifier(ident) => {
422                self.try_add_identifier_as_table(std::slice::from_ref(ident));
423            }
424            Expr::CompoundIdentifier(idents) => {
425                self.try_add_identifier_as_table(idents);
426            }
427            Expr::Function(func) => {
428                if let ast::FunctionArguments::List(arg_list) = &func.args {
429                    for arg in &arg_list.args {
430                        if let ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(e)) = arg {
431                            self.extract_identifiers_from_expr(e);
432                        }
433                    }
434                }
435            }
436            Expr::BinaryOp { left, right, .. } => {
437                self.extract_identifiers_from_expr(left);
438                self.extract_identifiers_from_expr(right);
439            }
440            Expr::UnaryOp { expr, .. } => {
441                self.extract_identifiers_from_expr(expr);
442            }
443            Expr::Nested(e) => {
444                self.extract_identifiers_from_expr(e);
445            }
446            Expr::InList { expr, list, .. } => {
447                self.extract_identifiers_from_expr(expr);
448                for e in list {
449                    self.extract_identifiers_from_expr(e);
450                }
451            }
452            Expr::Case {
453                operand,
454                conditions,
455                else_result,
456                ..
457            } => {
458                if let Some(op) = operand {
459                    self.extract_identifiers_from_expr(op);
460                }
461                for case_when in conditions {
462                    self.extract_identifiers_from_expr(&case_when.condition);
463                    self.extract_identifiers_from_expr(&case_when.result);
464                }
465                if let Some(else_r) = else_result {
466                    self.extract_identifiers_from_expr(else_r);
467                }
468            }
469            _ => {}
470        }
471    }
472
473    fn try_add_identifier_as_table(&mut self, idents: &[Ident]) {
474        if idents.is_empty() {
475            return;
476        }
477
478        let name = idents
479            .iter()
480            .map(|i| i.value.as_str())
481            .collect::<Vec<_>>()
482            .join(".");
483
484        let resolution = self.analyzer.canonicalize_table_reference(&name);
485        if resolution.matched_schema {
486            self.add_source_table(&name);
487        }
488    }
489
490    /// Emits a warning for unsupported alias usage in a clause.
491    fn emit_alias_warning(&mut self, clause_name: &str, alias_name: &str) {
492        let dialect = self.analyzer.request.dialect;
493        let statement_index = self.ctx.statement_index;
494        self.analyzer.issues.push(alias_visibility_warning(
495            dialect,
496            clause_name,
497            alias_name,
498            statement_index,
499        ));
500    }
501
502    /// Analyzes ORDER BY clause for alias visibility warnings.
503    ///
504    /// Checks if aliases from the SELECT list are used in ORDER BY expressions
505    /// and emits warnings for dialects that don't support alias references in ORDER BY.
506    fn analyze_order_by(&mut self, order_by: &ast::OrderBy) {
507        let dialect = self.analyzer.request.dialect;
508
509        let order_exprs = match &order_by.kind {
510            ast::OrderByKind::Expressions(exprs) => exprs,
511            ast::OrderByKind::All(_) => return,
512        };
513
514        // Check for alias usage in ORDER BY clause
515        if !dialect.alias_in_order_by() {
516            for order_expr in order_exprs {
517                let identifiers = ExpressionAnalyzer::extract_simple_identifiers(&order_expr.expr);
518                for ident in &identifiers {
519                    let normalized_ident = self.analyzer.normalize_identifier(ident);
520                    if let Some(alias_name) = self
521                        .ctx
522                        .output_columns
523                        .iter()
524                        .find(|c| self.analyzer.normalize_identifier(&c.name) == normalized_ident)
525                        .map(|c| c.name.clone())
526                    {
527                        self.emit_alias_warning("ORDER BY", &alias_name);
528                    }
529                }
530            }
531        }
532
533        // Also analyze any subqueries in ORDER BY expressions
534        for order_expr in order_exprs {
535            let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
536            ea.analyze(&order_expr.expr);
537        }
538    }
539}
540
541impl<'a, 'b> Visitor for LineageVisitor<'a, 'b> {
542    fn visit_query(&mut self, query: &Query) {
543        if let Some(with) = &query.with {
544            let mut cte_ids: Vec<(String, Arc<str>)> = Vec::new();
545            for cte in &with.cte_tables {
546                let cte_name = cte.alias.name.to_string();
547                let cte_span = self.locate_cte_definition_span(&cte_name);
548                let cte_id = self.ctx.add_node(Node {
549                    id: generate_node_id("cte", &cte_name),
550                    node_type: NodeType::Cte,
551                    label: cte_name.clone().into(),
552                    qualified_name: Some(cte_name.clone().into()),
553                    expression: None,
554                    span: cte_span,
555                    metadata: None,
556                    resolution_source: None,
557                    filters: Vec::new(),
558                    aggregation: None,
559                });
560
561                self.ctx
562                    .cte_definitions
563                    .insert(cte_name.clone(), cte_id.clone());
564                self.ctx
565                    .cte_node_to_name
566                    .insert(cte_id.clone(), cte_name.clone());
567                self.analyzer.tracker.record_cte(&cte_name);
568                cte_ids.push((cte_name, cte_id));
569            }
570
571            for (cte, (_, cte_id)) in with.cte_tables.iter().zip(cte_ids.iter()) {
572                let projection_checkpoint = self.ctx.projection_checkpoint();
573                let mut cte_visitor =
574                    LineageVisitor::new(self.analyzer, self.ctx, Some(cte_id.to_string()));
575                cte_visitor.visit_query(&cte.query);
576                let columns = self.ctx.take_output_columns_since(projection_checkpoint);
577                self.ctx
578                    .register_cte_output_columns(cte.alias.name.to_string(), columns);
579            }
580        }
581        self.visit_set_expr(&query.body);
582
583        // Analyze ORDER BY for alias visibility warnings
584        if let Some(order_by) = &query.order_by {
585            self.analyze_order_by(order_by);
586        }
587    }
588
589    fn visit_set_expr(&mut self, set_expr: &SetExpr) {
590        match set_expr {
591            SetExpr::Select(select) => self.visit_select(select),
592            SetExpr::Query(query) => self.visit_query(query),
593            SetExpr::SetOperation {
594                op, left, right, ..
595            } => {
596                let op_name = match op {
597                    SetOperator::Union => "UNION",
598                    SetOperator::Intersect => "INTERSECT",
599                    SetOperator::Except => "EXCEPT",
600                    SetOperator::Minus => "MINUS",
601                };
602                self.visit_set_expr(left);
603                self.visit_set_expr(right);
604                if self.target_node.is_some() {
605                    self.ctx.last_operation = Some(op_name.to_string());
606                }
607            }
608            SetExpr::Values(values) => self.visit_values(values),
609            SetExpr::Insert(insert_stmt) => {
610                let Statement::Insert(insert) = insert_stmt else {
611                    return;
612                };
613                let target_name = insert.table.to_string();
614                self.add_source_table(&target_name);
615            }
616            SetExpr::Table(tbl) => {
617                let name = tbl
618                    .table_name
619                    .as_ref()
620                    .map(|n| n.to_string())
621                    .unwrap_or_default();
622                if !name.is_empty() {
623                    self.add_source_table(&name);
624                }
625            }
626            _ => {}
627        }
628    }
629
630    fn visit_select(&mut self, select: &Select) {
631        self.ctx.push_scope();
632        for table_with_joins in &select.from {
633            self.visit_table_with_joins(table_with_joins);
634        }
635        if self.analyzer.column_lineage_enabled {
636            let output_node = self.ctx.output_node_id().map(|node_id| node_id.to_string());
637            let target_node = self.target_node.clone().or(output_node);
638            let mut select_analyzer = SelectAnalyzer::new(self.analyzer, self.ctx, target_node);
639            select_analyzer.analyze(select);
640        }
641        self.ctx.pop_scope();
642    }
643
644    fn visit_table_with_joins(&mut self, table_with_joins: &TableWithJoins) {
645        self.visit_table_factor(&table_with_joins.relation);
646        for join in &table_with_joins.joins {
647            let (join_type, join_condition) = Analyzer::convert_join_operator(&join.join_operator);
648            self.ctx.current_join_info.join_type = join_type;
649            self.ctx.current_join_info.join_condition = join_condition;
650            self.ctx.last_operation = Analyzer::join_type_to_operation(join_type);
651            self.visit_table_factor(&join.relation);
652
653            // Analyze JOIN condition expression to capture column references for implied schema
654            if let Some(expr) = Self::extract_join_constraint_expr(&join.join_operator) {
655                let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
656                ea.analyze(expr);
657
658                // Extract implied FK relationships from equality conditions
659                self.record_join_fk_relationships(expr);
660            }
661
662            self.ctx.current_join_info.join_type = None;
663            self.ctx.current_join_info.join_condition = None;
664        }
665    }
666
667    fn visit_table_factor(&mut self, table_factor: &TableFactor) {
668        match table_factor {
669            TableFactor::Table { name, alias, .. } => {
670                let table_name = name.to_string();
671                let alias_str = alias.as_ref().map(|a| a.name.to_string());
672                let canonical = self.add_source_table_with_alias(&table_name, alias_str.as_deref());
673                if let (Some(a), Some(canonical_name)) = (&alias_str, &canonical) {
674                    self.ctx
675                        .register_alias_in_scope(a.clone(), canonical_name.clone());
676                }
677            }
678            TableFactor::Derived {
679                subquery, alias, ..
680            } => {
681                // A derived table (subquery in a FROM clause) is treated like a temporary CTE.
682                // We create a node for it in the graph, analyze its subquery to determine its
683                // output columns, and then register its alias and columns in the current scope
684                // so the outer query can reference it.
685                let alias_name = alias.as_ref().map(|a| a.name.to_string());
686                let projection_checkpoint = self.ctx.projection_checkpoint();
687                let derived_span = alias_name
688                    .as_ref()
689                    .and_then(|name| self.locate_derived_alias_span(name));
690
691                // We model derived tables as CTEs in the graph since they are conceptually
692                // similar: both are ephemeral, named result sets scoped to a single query.
693                // This avoids introducing a separate NodeType for a very similar concept.
694                let derived_node_id = alias_name.as_ref().map(|name| {
695                    self.ctx.add_node(Node {
696                        id: generate_node_id("derived", name),
697                        node_type: NodeType::Cte,
698                        label: name.clone().into(),
699                        qualified_name: Some(name.clone().into()),
700                        expression: None,
701                        span: derived_span,
702                        metadata: None,
703                        resolution_source: None,
704                        filters: Vec::new(),
705                        aggregation: None,
706                    })
707                });
708
709                if let (Some(name), Some(node_id)) = (alias_name.as_ref(), derived_node_id.as_ref())
710                {
711                    // Track reverse mapping for wildcard inference without polluting
712                    // the global CTE definition map (which stores real WITH items).
713                    self.ctx
714                        .cte_node_to_name
715                        .insert(node_id.clone(), name.clone());
716                }
717
718                let mut derived_visitor = LineageVisitor::new(
719                    self.analyzer,
720                    self.ctx,
721                    derived_node_id.as_ref().map(|id| id.to_string()),
722                );
723                derived_visitor.visit_query(subquery);
724                let columns = self.ctx.take_output_columns_since(projection_checkpoint);
725
726                if let (Some(name), Some(node_id)) = (alias_name, derived_node_id) {
727                    self.ctx
728                        .register_table_in_scope(name.clone(), node_id.clone());
729                    self.ctx.register_alias_in_scope(name.clone(), name.clone());
730                    self.ctx.register_subquery_columns_in_scope(name, columns);
731                }
732            }
733            TableFactor::NestedJoin {
734                table_with_joins, ..
735            } => {
736                self.visit_table_with_joins(table_with_joins);
737            }
738            TableFactor::TableFunction { expr, alias, .. } => {
739                self.extract_identifiers_from_expr(expr);
740                let is_value_table = matches!(expr, Expr::Function(func) if is_value_table_function(
741                    self.analyzer.request.dialect,
742                    &func.name.to_string(),
743                ));
744                if is_value_table {
745                    self.ctx.mark_table_function_in_scope();
746                }
747                if let Some(a) = alias {
748                    self.ctx
749                        .register_subquery_alias_in_scope(a.name.to_string());
750                }
751                self.analyzer.issues.push(
752                    Issue::info(
753                        issue_codes::UNSUPPORTED_SYNTAX,
754                        "Table function lineage extracted with best-effort identifier matching",
755                    )
756                    .with_statement(self.ctx.statement_index),
757                );
758            }
759            TableFactor::Pivot {
760                table,
761                aggregate_functions,
762                value_column,
763                value_source,
764                alias,
765                ..
766            } => {
767                self.visit_table_factor(table);
768                for func in aggregate_functions {
769                    self.extract_identifiers_from_expr(&func.expr);
770                }
771                for expr in value_column {
772                    self.extract_identifiers_from_expr(expr);
773                }
774                match value_source {
775                    ast::PivotValueSource::List(values) => {
776                        for value in values {
777                            self.extract_identifiers_from_expr(&value.expr);
778                        }
779                    }
780                    ast::PivotValueSource::Any(_) => {}
781                    ast::PivotValueSource::Subquery(q) => {
782                        self.visit_query(q);
783                    }
784                }
785                if let Some(a) = alias {
786                    self.ctx
787                        .register_subquery_alias_in_scope(a.name.to_string());
788                }
789                self.analyzer.issues.push(
790                    Issue::warning(
791                        issue_codes::UNSUPPORTED_SYNTAX,
792                        "PIVOT lineage extracted with best-effort identifier matching",
793                    )
794                    .with_statement(self.ctx.statement_index),
795                );
796            }
797            TableFactor::Unpivot {
798                table,
799                columns,
800                alias,
801                ..
802            } => {
803                self.visit_table_factor(table);
804                for col in columns {
805                    self.extract_identifiers_from_expr(&col.expr);
806                }
807                if let Some(a) = alias {
808                    self.ctx
809                        .register_subquery_alias_in_scope(a.name.to_string());
810                }
811                self.analyzer.issues.push(
812                    Issue::warning(
813                        issue_codes::UNSUPPORTED_SYNTAX,
814                        "UNPIVOT lineage extracted with best-effort identifier matching",
815                    )
816                    .with_statement(self.ctx.statement_index),
817                );
818            }
819            TableFactor::UNNEST {
820                array_exprs, alias, ..
821            } => {
822                // UNNEST expands array columns into rows. Extract column references
823                // from the array expressions and resolve them to their source tables.
824                for expr in array_exprs {
825                    let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
826                    let column_refs = ea.extract_column_refs_with_warning(expr);
827                    for col_ref in &column_refs {
828                        // Resolve the column to its source table and add it as a data source
829                        if let Some(table_canonical) = self.analyzer.resolve_column_table(
830                            self.ctx,
831                            col_ref.table.as_deref(),
832                            &col_ref.column,
833                        ) {
834                            self.add_source_table(&table_canonical);
835                        }
836                    }
837                }
838                if let Some(a) = alias {
839                    self.ctx
840                        .register_subquery_alias_in_scope(a.name.to_string());
841                }
842            }
843            _ => {}
844        }
845    }
846
847    fn visit_values(&mut self, values: &Values) {
848        let mut expr_analyzer = ExpressionAnalyzer::new(self.analyzer, self.ctx);
849        for row in &values.rows {
850            for expr in row {
851                expr_analyzer.analyze(expr);
852            }
853        }
854    }
855}