1use super::context::StatementContext;
8use super::expression::ExpressionAnalyzer;
9use super::helpers::{alias_visibility_warning, generate_node_id};
10use super::select_analyzer::SelectAnalyzer;
11use super::Analyzer;
12use crate::types::{issue_codes, Issue, Node, NodeType};
13use sqlparser::ast::{
14 self, Cte, Expr, Ident, Join, Query, Select, SetExpr, SetOperator, Statement, TableAlias,
15 TableFactor, TableWithJoins, Values,
16};
17use std::sync::Arc;
18
19pub trait Visitor {
24 fn visit_statement(&mut self, statement: &Statement) {
25 match statement {
26 Statement::Query(query) => self.visit_query(query),
27 Statement::Insert(insert) => {
28 if let Some(source) = &insert.source {
29 self.visit_query(source);
30 }
31 }
32 Statement::CreateTable(create) => {
33 if let Some(query) = &create.query {
34 self.visit_query(query);
35 }
36 }
37 Statement::CreateView { query, .. } => self.visit_query(query),
38 _ => {}
39 }
40 }
41
42 fn visit_query(&mut self, query: &Query) {
43 if let Some(with) = &query.with {
44 for cte in &with.cte_tables {
45 self.visit_cte(cte);
46 }
47 }
48 self.visit_set_expr(&query.body);
49 }
50
51 fn visit_cte(&mut self, cte: &Cte) {
52 self.visit_query(&cte.query);
53 }
54
55 fn visit_set_expr(&mut self, set_expr: &SetExpr) {
56 match set_expr {
57 SetExpr::Select(select) => self.visit_select(select),
58 SetExpr::Query(query) => self.visit_query(query),
59 SetExpr::SetOperation { left, right, .. } => {
60 self.visit_set_expr(left);
61 self.visit_set_expr(right);
62 }
63 SetExpr::Values(values) => self.visit_values(values),
64 SetExpr::Insert(stmt) => self.visit_statement(stmt),
65 _ => {}
66 }
67 }
68
69 fn visit_select(&mut self, select: &Select) {
70 for from in &select.from {
71 self.visit_table_with_joins(from);
72 }
73 }
74
75 fn visit_table_with_joins(&mut self, table: &TableWithJoins) {
76 self.visit_table_factor(&table.relation);
77 for join in &table.joins {
78 self.visit_join(join);
79 }
80 }
81
82 fn visit_table_factor(&mut self, table: &TableFactor) {
83 match table {
84 TableFactor::Derived { subquery, .. } => self.visit_query(subquery),
85 TableFactor::NestedJoin {
86 table_with_joins, ..
87 } => self.visit_table_with_joins(table_with_joins),
88 _ => {}
89 }
90 }
91
92 fn visit_join(&mut self, join: &Join) {
93 self.visit_table_factor(&join.relation);
94 }
95
96 fn visit_values(&mut self, values: &Values) {
97 for row in &values.rows {
98 for expr in row {
99 self.visit_expr(expr);
100 }
101 }
102 }
103
104 fn visit_expr(&mut self, _expr: &Expr) {}
105}
106
107pub(crate) struct LineageVisitor<'a, 'b> {
109 pub(crate) analyzer: &'a mut Analyzer<'b>,
110 pub(crate) ctx: &'a mut StatementContext,
111 pub(crate) target_node: Option<String>,
112}
113
114impl<'a, 'b> LineageVisitor<'a, 'b> {
115 pub(crate) fn new(
116 analyzer: &'a mut Analyzer<'b>,
117 ctx: &'a mut StatementContext,
118 target_node: Option<String>,
119 ) -> Self {
120 Self {
121 analyzer,
122 ctx,
123 target_node,
124 }
125 }
126
127 #[inline]
128 pub fn target_from_arc(arc: Option<&Arc<str>>) -> Option<String> {
129 arc.map(|s| s.to_string())
130 }
131
132 pub fn set_target_node(&mut self, target: Option<String>) {
133 self.target_node = target;
134 }
135
136 pub fn set_last_operation(&mut self, op: Option<String>) {
137 self.ctx.last_operation = op;
138 }
139
140 fn extract_join_constraint_expr(op: &ast::JoinOperator) -> Option<&Expr> {
142 let constraint = match op {
143 ast::JoinOperator::Join(c)
144 | ast::JoinOperator::Inner(c)
145 | ast::JoinOperator::Left(c)
146 | ast::JoinOperator::LeftOuter(c)
147 | ast::JoinOperator::Right(c)
148 | ast::JoinOperator::RightOuter(c)
149 | ast::JoinOperator::FullOuter(c)
150 | ast::JoinOperator::Semi(c)
151 | ast::JoinOperator::LeftSemi(c)
152 | ast::JoinOperator::RightSemi(c)
153 | ast::JoinOperator::Anti(c)
154 | ast::JoinOperator::LeftAnti(c)
155 | ast::JoinOperator::RightAnti(c)
156 | ast::JoinOperator::StraightJoin(c) => Some(c),
157 ast::JoinOperator::AsOf { constraint, .. } => Some(constraint),
158 ast::JoinOperator::CrossJoin(_)
159 | ast::JoinOperator::CrossApply
160 | ast::JoinOperator::OuterApply => None,
161 };
162
163 constraint.and_then(|c| match c {
164 ast::JoinConstraint::On(expr) => Some(expr),
165 _ => None,
166 })
167 }
168
169 fn record_join_fk_relationships(&mut self, expr: &Expr) {
189 use sqlparser::ast::BinaryOperator;
190
191 match expr {
192 Expr::BinaryOp { left, op, right } if *op == BinaryOperator::And => {
193 self.record_join_fk_relationships(left);
195 self.record_join_fk_relationships(right);
196 }
197 Expr::BinaryOp { left, op, right } if *op == BinaryOperator::Eq => {
198 self.record_equality_fk(left, right);
199 }
200 Expr::Nested(inner) => self.record_join_fk_relationships(inner),
201 _ => {}
202 }
203 }
204
205 fn record_equality_fk(&mut self, left: &Expr, right: &Expr) {
207 let Some(left_ref) = Self::extract_column_ref(left) else {
208 return;
209 };
210 let Some(right_ref) = Self::extract_column_ref(right) else {
211 return;
212 };
213
214 let left_table = left_ref
215 .0
216 .as_ref()
217 .and_then(|t| self.resolve_table_alias(Some(t)));
218 let right_table = right_ref
219 .0
220 .as_ref()
221 .and_then(|t| self.resolve_table_alias(Some(t)));
222
223 let (Some(left_table), Some(right_table)) = (left_table, right_table) else {
224 return;
225 };
226
227 self.ctx
229 .record_implied_foreign_key(&left_table, &left_ref.1, &right_table, &right_ref.1);
230 self.ctx
231 .record_implied_foreign_key(&right_table, &right_ref.1, &left_table, &left_ref.1);
232 }
233
234 fn extract_column_ref(expr: &Expr) -> Option<(Option<String>, String)> {
236 match expr {
237 Expr::Identifier(ident) => Some((None, ident.value.clone())),
238 Expr::CompoundIdentifier(idents) if idents.len() == 2 => {
239 Some((Some(idents[0].value.clone()), idents[1].value.clone()))
240 }
241 Expr::CompoundIdentifier(idents) if idents.len() >= 2 => {
242 let len = idents.len();
244 Some((
245 Some(idents[len - 2].value.clone()),
246 idents[len - 1].value.clone(),
247 ))
248 }
249 _ => None,
250 }
251 }
252
253 pub fn add_source_table(&mut self, table_name: &str) -> Option<String> {
254 self.analyzer
255 .add_source_table(self.ctx, table_name, self.target_node.as_deref())
256 }
257
258 pub fn analyze_dml_target(
259 &mut self,
260 table_name: &str,
261 alias: Option<&TableAlias>,
262 ) -> Option<(String, Arc<str>)> {
263 let canonical_res = self.analyzer.add_source_table(self.ctx, table_name, None);
264 let canonical = canonical_res
265 .clone()
266 .unwrap_or_else(|| self.analyzer.normalize_table_name(table_name));
267
268 if let (Some(a), Some(canonical_name)) = (alias, canonical_res) {
269 self.ctx
270 .table_aliases
271 .insert(a.name.to_string(), canonical_name);
272 }
273
274 let node_id = self
275 .ctx
276 .table_node_ids
277 .get(&canonical)
278 .cloned()
279 .unwrap_or_else(|| self.analyzer.relation_node_id(&canonical));
280
281 self.analyzer
282 .tracker
283 .record_produced(&canonical, self.ctx.statement_index);
284 self.analyzer
285 .add_table_columns_from_schema(self.ctx, &canonical, &node_id);
286
287 Some((canonical, node_id))
288 }
289
290 pub fn analyze_dml_target_factor(&mut self, table: &TableFactor) -> Option<Arc<str>> {
291 if let TableFactor::Table { name, alias, .. } = table {
292 let table_name = name.to_string();
293 self.analyze_dml_target(&table_name, alias.as_ref())
294 .map(|(_, node_id)| node_id)
295 } else {
296 self.visit_table_factor(table);
297 None
298 }
299 }
300
301 pub fn analyze_dml_target_from_table_with_joins(
302 &mut self,
303 table: &TableWithJoins,
304 ) -> Option<Arc<str>> {
305 if let TableFactor::Table { name, alias, .. } = &table.relation {
306 let table_name = name.to_string();
307 self.analyze_dml_target(&table_name, alias.as_ref())
308 .map(|(_, node_id)| node_id)
309 } else {
310 self.visit_table_with_joins(table);
311 None
312 }
313 }
314
315 pub fn register_aliases_in_table_with_joins(&mut self, table_with_joins: &TableWithJoins) {
316 self.register_aliases_in_table_factor(&table_with_joins.relation);
317 for join in &table_with_joins.joins {
318 self.register_aliases_in_table_factor(&join.relation);
319 }
320 }
321
322 fn register_aliases_in_table_factor(&mut self, table_factor: &TableFactor) {
323 match table_factor {
324 TableFactor::Table {
325 name,
326 alias: Some(a),
327 ..
328 } => {
329 let canonical = self
330 .analyzer
331 .canonicalize_table_reference(&name.to_string())
332 .canonical;
333 self.ctx.table_aliases.insert(a.name.to_string(), canonical);
334 }
335 TableFactor::Derived { alias: Some(a), .. } => {
336 self.ctx.subquery_aliases.insert(a.name.to_string());
337 }
338 TableFactor::NestedJoin {
339 table_with_joins, ..
340 } => {
341 self.register_aliases_in_table_with_joins(table_with_joins);
342 }
343 _ => {}
344 }
345 }
346
347 pub fn resolve_table_alias(&self, alias: Option<&str>) -> Option<String> {
348 self.analyzer.resolve_table_alias(self.ctx, alias)
349 }
350
351 pub(super) fn canonicalize_table_reference(&self, name: &str) -> super::TableResolution {
352 self.analyzer.canonicalize_table_reference(name)
353 }
354
355 fn extract_identifiers_from_expr(&mut self, expr: &Expr) {
361 match expr {
362 Expr::Identifier(ident) => {
363 self.try_add_identifier_as_table(std::slice::from_ref(ident));
364 }
365 Expr::CompoundIdentifier(idents) => {
366 self.try_add_identifier_as_table(idents);
367 }
368 Expr::Function(func) => {
369 if let ast::FunctionArguments::List(arg_list) = &func.args {
370 for arg in &arg_list.args {
371 if let ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(e)) = arg {
372 self.extract_identifiers_from_expr(e);
373 }
374 }
375 }
376 }
377 Expr::BinaryOp { left, right, .. } => {
378 self.extract_identifiers_from_expr(left);
379 self.extract_identifiers_from_expr(right);
380 }
381 Expr::UnaryOp { expr, .. } => {
382 self.extract_identifiers_from_expr(expr);
383 }
384 Expr::Nested(e) => {
385 self.extract_identifiers_from_expr(e);
386 }
387 Expr::InList { expr, list, .. } => {
388 self.extract_identifiers_from_expr(expr);
389 for e in list {
390 self.extract_identifiers_from_expr(e);
391 }
392 }
393 Expr::Case {
394 operand,
395 conditions,
396 else_result,
397 ..
398 } => {
399 if let Some(op) = operand {
400 self.extract_identifiers_from_expr(op);
401 }
402 for case_when in conditions {
403 self.extract_identifiers_from_expr(&case_when.condition);
404 self.extract_identifiers_from_expr(&case_when.result);
405 }
406 if let Some(else_r) = else_result {
407 self.extract_identifiers_from_expr(else_r);
408 }
409 }
410 _ => {}
411 }
412 }
413
414 fn try_add_identifier_as_table(&mut self, idents: &[Ident]) {
415 if idents.is_empty() {
416 return;
417 }
418
419 let name = idents
420 .iter()
421 .map(|i| i.value.as_str())
422 .collect::<Vec<_>>()
423 .join(".");
424
425 let resolution = self.analyzer.canonicalize_table_reference(&name);
426 if resolution.matched_schema {
427 self.add_source_table(&name);
428 }
429 }
430
431 fn emit_alias_warning(&mut self, clause_name: &str, alias_name: &str) {
433 let dialect = self.analyzer.request.dialect;
434 let statement_index = self.ctx.statement_index;
435 self.analyzer.issues.push(alias_visibility_warning(
436 dialect,
437 clause_name,
438 alias_name,
439 statement_index,
440 ));
441 }
442
443 fn analyze_order_by(&mut self, order_by: &ast::OrderBy) {
448 let dialect = self.analyzer.request.dialect;
449
450 let order_exprs = match &order_by.kind {
451 ast::OrderByKind::Expressions(exprs) => exprs,
452 ast::OrderByKind::All(_) => return,
453 };
454
455 if !dialect.alias_in_order_by() {
457 for order_expr in order_exprs {
458 let identifiers = ExpressionAnalyzer::extract_simple_identifiers(&order_expr.expr);
459 for ident in &identifiers {
460 let normalized_ident = self.analyzer.normalize_identifier(ident);
461 if let Some(alias_name) = self
462 .ctx
463 .output_columns
464 .iter()
465 .find(|c| self.analyzer.normalize_identifier(&c.name) == normalized_ident)
466 .map(|c| c.name.clone())
467 {
468 self.emit_alias_warning("ORDER BY", &alias_name);
469 }
470 }
471 }
472 }
473
474 for order_expr in order_exprs {
476 let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
477 ea.analyze(&order_expr.expr);
478 }
479 }
480}
481
482impl<'a, 'b> Visitor for LineageVisitor<'a, 'b> {
483 fn visit_query(&mut self, query: &Query) {
484 if let Some(with) = &query.with {
485 let mut cte_ids: Vec<(String, Arc<str>)> = Vec::new();
486 for cte in &with.cte_tables {
487 let cte_name = cte.alias.name.to_string();
488 let cte_id = self.ctx.add_node(Node {
489 id: generate_node_id("cte", &cte_name),
490 node_type: NodeType::Cte,
491 label: cte_name.clone().into(),
492 qualified_name: Some(cte_name.clone().into()),
493 expression: None,
494 span: None,
495 metadata: None,
496 resolution_source: None,
497 filters: Vec::new(),
498 join_type: None,
499 join_condition: None,
500 aggregation: None,
501 });
502
503 self.ctx
504 .cte_definitions
505 .insert(cte_name.clone(), cte_id.clone());
506 self.analyzer.tracker.record_cte(&cte_name);
507 cte_ids.push((cte_name, cte_id));
508 }
509
510 for (cte, (_, cte_id)) in with.cte_tables.iter().zip(cte_ids.iter()) {
511 let projection_checkpoint = self.ctx.projection_checkpoint();
512 let mut cte_visitor =
513 LineageVisitor::new(self.analyzer, self.ctx, Some(cte_id.to_string()));
514 cte_visitor.visit_query(&cte.query);
515 let columns = self.ctx.take_output_columns_since(projection_checkpoint);
516 self.ctx
517 .aliased_subquery_columns
518 .insert(cte.alias.name.to_string(), columns);
519 }
520 }
521 self.visit_set_expr(&query.body);
522
523 if let Some(order_by) = &query.order_by {
525 self.analyze_order_by(order_by);
526 }
527 }
528
529 fn visit_set_expr(&mut self, set_expr: &SetExpr) {
530 match set_expr {
531 SetExpr::Select(select) => self.visit_select(select),
532 SetExpr::Query(query) => self.visit_query(query),
533 SetExpr::SetOperation {
534 op, left, right, ..
535 } => {
536 let op_name = match op {
537 SetOperator::Union => "UNION",
538 SetOperator::Intersect => "INTERSECT",
539 SetOperator::Except => "EXCEPT",
540 SetOperator::Minus => "MINUS",
541 };
542 self.visit_set_expr(left);
543 self.visit_set_expr(right);
544 if self.target_node.is_some() {
545 self.ctx.last_operation = Some(op_name.to_string());
546 }
547 }
548 SetExpr::Values(values) => self.visit_values(values),
549 SetExpr::Insert(insert_stmt) => {
550 let Statement::Insert(insert) = insert_stmt else {
551 return;
552 };
553 let target_name = insert.table.to_string();
554 self.add_source_table(&target_name);
555 }
556 SetExpr::Table(tbl) => {
557 let name = tbl
558 .table_name
559 .as_ref()
560 .map(|n| n.to_string())
561 .unwrap_or_default();
562 if !name.is_empty() {
563 self.add_source_table(&name);
564 }
565 }
566 _ => {}
567 }
568 }
569
570 fn visit_select(&mut self, select: &Select) {
571 self.ctx.push_scope();
572 for table_with_joins in &select.from {
573 self.visit_table_with_joins(table_with_joins);
574 }
575 if self.analyzer.column_lineage_enabled {
576 let output_node = self.ctx.output_node_id().map(|node_id| node_id.to_string());
577 let target_node = self.target_node.clone().or(output_node);
578 let mut select_analyzer = SelectAnalyzer::new(self.analyzer, self.ctx, target_node);
579 select_analyzer.analyze(select);
580 }
581 self.ctx.pop_scope();
582 }
583
584 fn visit_table_with_joins(&mut self, table_with_joins: &TableWithJoins) {
585 self.visit_table_factor(&table_with_joins.relation);
586 for join in &table_with_joins.joins {
587 let (join_type, join_condition) = Analyzer::convert_join_operator(&join.join_operator);
588 self.ctx.current_join_info.join_type = join_type;
589 self.ctx.current_join_info.join_condition = join_condition;
590 self.ctx.last_operation = Analyzer::join_type_to_operation(join_type);
591 self.visit_table_factor(&join.relation);
592
593 if let Some(expr) = Self::extract_join_constraint_expr(&join.join_operator) {
595 let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
596 ea.analyze(expr);
597
598 self.record_join_fk_relationships(expr);
600 }
601
602 self.ctx.current_join_info.join_type = None;
603 self.ctx.current_join_info.join_condition = None;
604 }
605 }
606
607 fn visit_table_factor(&mut self, table_factor: &TableFactor) {
608 match table_factor {
609 TableFactor::Table { name, alias, .. } => {
610 let table_name = name.to_string();
611 let canonical = self.add_source_table(&table_name);
612 if let (Some(a), Some(canonical_name)) = (alias, canonical) {
613 self.ctx
614 .register_alias_in_scope(a.name.to_string(), canonical_name);
615 }
616 }
617 TableFactor::Derived {
618 subquery, alias, ..
619 } => {
620 let alias_name = alias.as_ref().map(|a| a.name.to_string());
625 let projection_checkpoint = self.ctx.projection_checkpoint();
626
627 let derived_node_id = alias_name.as_ref().map(|name| {
631 self.ctx.add_node(Node {
632 id: generate_node_id("derived", name),
633 node_type: NodeType::Cte,
634 label: name.clone().into(),
635 qualified_name: Some(name.clone().into()),
636 expression: None,
637 span: None,
638 metadata: None,
639 resolution_source: None,
640 filters: Vec::new(),
641 join_type: None,
642 join_condition: None,
643 aggregation: None,
644 })
645 });
646
647 let mut derived_visitor = LineageVisitor::new(
648 self.analyzer,
649 self.ctx,
650 derived_node_id.as_ref().map(|id| id.to_string()),
651 );
652 derived_visitor.visit_query(subquery);
653 let columns = self.ctx.take_output_columns_since(projection_checkpoint);
654
655 if let (Some(name), Some(node_id)) = (alias_name, derived_node_id) {
656 self.ctx
657 .register_table_in_scope(name.clone(), node_id.clone());
658 self.ctx.register_alias_in_scope(name.clone(), name.clone());
659 self.ctx.aliased_subquery_columns.insert(name, columns);
660 }
661 }
662 TableFactor::NestedJoin {
663 table_with_joins, ..
664 } => {
665 self.visit_table_with_joins(table_with_joins);
666 }
667 TableFactor::TableFunction { expr, alias, .. } => {
668 self.extract_identifiers_from_expr(expr);
669 if let Some(a) = alias {
670 self.ctx
671 .register_subquery_alias_in_scope(a.name.to_string());
672 }
673 self.analyzer.issues.push(
674 Issue::info(
675 issue_codes::UNSUPPORTED_SYNTAX,
676 "Table function lineage extracted with best-effort identifier matching",
677 )
678 .with_statement(self.ctx.statement_index),
679 );
680 }
681 TableFactor::Pivot {
682 table,
683 aggregate_functions,
684 value_column,
685 value_source,
686 alias,
687 ..
688 } => {
689 self.visit_table_factor(table);
690 for func in aggregate_functions {
691 self.extract_identifiers_from_expr(&func.expr);
692 }
693 for expr in value_column {
694 self.extract_identifiers_from_expr(expr);
695 }
696 match value_source {
697 ast::PivotValueSource::List(values) => {
698 for value in values {
699 self.extract_identifiers_from_expr(&value.expr);
700 }
701 }
702 ast::PivotValueSource::Any(_) => {}
703 ast::PivotValueSource::Subquery(q) => {
704 self.visit_query(q);
705 }
706 }
707 if let Some(a) = alias {
708 self.ctx
709 .register_subquery_alias_in_scope(a.name.to_string());
710 }
711 self.analyzer.issues.push(
712 Issue::warning(
713 issue_codes::UNSUPPORTED_SYNTAX,
714 "PIVOT lineage extracted with best-effort identifier matching",
715 )
716 .with_statement(self.ctx.statement_index),
717 );
718 }
719 TableFactor::Unpivot {
720 table,
721 columns,
722 alias,
723 ..
724 } => {
725 self.visit_table_factor(table);
726 for col in columns {
727 self.extract_identifiers_from_expr(&col.expr);
728 }
729 if let Some(a) = alias {
730 self.ctx
731 .register_subquery_alias_in_scope(a.name.to_string());
732 }
733 self.analyzer.issues.push(
734 Issue::warning(
735 issue_codes::UNSUPPORTED_SYNTAX,
736 "UNPIVOT lineage extracted with best-effort identifier matching",
737 )
738 .with_statement(self.ctx.statement_index),
739 );
740 }
741 TableFactor::UNNEST {
742 array_exprs, alias, ..
743 } => {
744 for expr in array_exprs {
747 let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
748 let column_refs = ea.extract_column_refs_with_warning(expr);
749 for col_ref in &column_refs {
750 if let Some(table_canonical) = self.analyzer.resolve_column_table(
752 self.ctx,
753 col_ref.table.as_deref(),
754 &col_ref.column,
755 ) {
756 self.add_source_table(&table_canonical);
757 }
758 }
759 }
760 if let Some(a) = alias {
761 self.ctx
762 .register_subquery_alias_in_scope(a.name.to_string());
763 }
764 }
765 _ => {}
766 }
767 }
768
769 fn visit_values(&mut self, values: &Values) {
770 let mut expr_analyzer = ExpressionAnalyzer::new(self.analyzer, self.ctx);
771 for row in &values.rows {
772 for expr in row {
773 expr_analyzer.analyze(expr);
774 }
775 }
776 }
777}