use super::context::StatementContext;
use super::expression::ExpressionAnalyzer;
use super::helpers::{
alias_visibility_warning, find_cte_body_span, find_cte_definition_span,
find_derived_table_alias_span, generate_statement_scoped_node_id,
};
use super::select_analyzer::SelectAnalyzer;
use super::Analyzer;
use crate::generated::is_value_table_function;
use crate::types::{issue_codes, Issue, Node, NodeType, Span};
use sqlparser::ast::{
self, CreateView, Cte, Expr, Ident, Join, Query, Select, SetExpr, SetOperator, Statement,
TableAlias, TableFactor, TableWithJoins, Values,
};
use std::sync::Arc;
pub trait Visitor {
fn visit_statement(&mut self, statement: &Statement) {
match statement {
Statement::Query(query) => self.visit_query(query),
Statement::Insert(insert) => {
if let Some(source) = &insert.source {
self.visit_query(source);
}
}
Statement::CreateTable(create) => {
if let Some(query) = &create.query {
self.visit_query(query);
}
}
Statement::CreateView(CreateView { query, .. }) => self.visit_query(query),
_ => {}
}
}
fn visit_query(&mut self, query: &Query) {
if let Some(with) = &query.with {
for cte in &with.cte_tables {
self.visit_cte(cte);
}
}
self.visit_set_expr(&query.body);
}
fn visit_cte(&mut self, cte: &Cte) {
self.visit_query(&cte.query);
}
fn visit_set_expr(&mut self, set_expr: &SetExpr) {
match set_expr {
SetExpr::Select(select) => self.visit_select(select),
SetExpr::Query(query) => self.visit_query(query),
SetExpr::SetOperation { left, right, .. } => {
self.visit_set_expr(left);
self.visit_set_expr(right);
}
SetExpr::Values(values) => self.visit_values(values),
SetExpr::Insert(stmt) => self.visit_statement(stmt),
_ => {}
}
}
fn visit_select(&mut self, select: &Select) {
for from in &select.from {
self.visit_table_with_joins(from);
}
}
fn visit_table_with_joins(&mut self, table: &TableWithJoins) {
self.visit_table_factor(&table.relation);
for join in &table.joins {
self.visit_join(join);
}
}
fn visit_table_factor(&mut self, table: &TableFactor) {
match table {
TableFactor::Derived { subquery, .. } => self.visit_query(subquery),
TableFactor::NestedJoin {
table_with_joins, ..
} => self.visit_table_with_joins(table_with_joins),
_ => {}
}
}
fn visit_join(&mut self, join: &Join) {
self.visit_table_factor(&join.relation);
}
fn visit_values(&mut self, values: &Values) {
for row in &values.rows {
for expr in row {
self.visit_expr(expr);
}
}
}
fn visit_expr(&mut self, _expr: &Expr) {}
}
pub(crate) struct LineageVisitor<'a, 'b> {
pub(crate) analyzer: &'a mut Analyzer<'b>,
pub(crate) ctx: &'a mut StatementContext,
pub(crate) target_node: Option<String>,
}
impl<'a, 'b> LineageVisitor<'a, 'b> {
pub(crate) fn new(
analyzer: &'a mut Analyzer<'b>,
ctx: &'a mut StatementContext,
target_node: Option<String>,
) -> Self {
Self {
analyzer,
ctx,
target_node,
}
}
#[inline]
pub fn target_from_arc(arc: Option<&Arc<str>>) -> Option<String> {
arc.map(|s| s.to_string())
}
pub fn set_target_node(&mut self, target: Option<String>) {
self.target_node = target;
}
pub fn set_last_operation(&mut self, op: Option<String>) {
self.ctx.last_operation = op;
}
fn locate_span<F>(&mut self, identifier: &str, finder: F) -> Option<Span>
where
F: Fn(&str, &str, usize) -> Option<Span>,
{
self.analyzer
.locate_statement_span(self.ctx, identifier, finder)
}
fn locate_cte_definition_span(&mut self, identifier: &str) -> Option<Span> {
self.locate_span(identifier, find_cte_definition_span)
}
fn locate_derived_alias_span(&mut self, identifier: &str) -> Option<Span> {
self.locate_span(identifier, find_derived_table_alias_span)
}
fn extract_join_constraint_expr(op: &ast::JoinOperator) -> Option<&Expr> {
let constraint = match op {
ast::JoinOperator::Join(c)
| ast::JoinOperator::Inner(c)
| ast::JoinOperator::Left(c)
| ast::JoinOperator::LeftOuter(c)
| ast::JoinOperator::Right(c)
| ast::JoinOperator::RightOuter(c)
| ast::JoinOperator::FullOuter(c)
| ast::JoinOperator::Semi(c)
| ast::JoinOperator::LeftSemi(c)
| ast::JoinOperator::RightSemi(c)
| ast::JoinOperator::Anti(c)
| ast::JoinOperator::LeftAnti(c)
| ast::JoinOperator::RightAnti(c)
| ast::JoinOperator::StraightJoin(c) => Some(c),
ast::JoinOperator::AsOf { constraint, .. } => Some(constraint),
ast::JoinOperator::CrossJoin(_)
| ast::JoinOperator::CrossApply
| ast::JoinOperator::OuterApply => None,
};
constraint.and_then(|c| match c {
ast::JoinConstraint::On(expr) => Some(expr),
_ => None,
})
}
fn record_join_fk_relationships(&mut self, expr: &Expr) {
use sqlparser::ast::BinaryOperator;
match expr {
Expr::BinaryOp { left, op, right } if *op == BinaryOperator::And => {
self.record_join_fk_relationships(left);
self.record_join_fk_relationships(right);
}
Expr::BinaryOp { left, op, right } if *op == BinaryOperator::Eq => {
self.record_equality_fk(left, right);
}
Expr::Nested(inner) => self.record_join_fk_relationships(inner),
_ => {}
}
}
fn record_equality_fk(&mut self, left: &Expr, right: &Expr) {
let Some(left_ref) = Self::extract_column_ref(left) else {
return;
};
let Some(right_ref) = Self::extract_column_ref(right) else {
return;
};
let left_table = left_ref
.0
.as_ref()
.and_then(|t| self.resolve_table_alias(Some(t)));
let right_table = right_ref
.0
.as_ref()
.and_then(|t| self.resolve_table_alias(Some(t)));
let (Some(left_table), Some(right_table)) = (left_table, right_table) else {
return;
};
self.ctx
.record_implied_foreign_key(&left_table, &left_ref.1, &right_table, &right_ref.1);
self.ctx
.record_implied_foreign_key(&right_table, &right_ref.1, &left_table, &left_ref.1);
}
fn extract_column_ref(expr: &Expr) -> Option<(Option<String>, String)> {
match expr {
Expr::Identifier(ident) => Some((None, ident.value.clone())),
Expr::CompoundIdentifier(idents) if idents.len() == 2 => {
Some((Some(idents[0].value.clone()), idents[1].value.clone()))
}
Expr::CompoundIdentifier(idents) if idents.len() >= 2 => {
let len = idents.len();
Some((
Some(idents[len - 2].value.clone()),
idents[len - 1].value.clone(),
))
}
_ => None,
}
}
pub fn add_source_table(&mut self, table_name: &str) -> Option<String> {
self.analyzer
.add_source_table(self.ctx, table_name, self.target_node.as_deref(), None)
}
pub fn add_source_table_with_alias(
&mut self,
table_name: &str,
alias: Option<&str>,
) -> Option<String> {
self.analyzer
.add_source_table(self.ctx, table_name, self.target_node.as_deref(), alias)
}
pub fn analyze_dml_target(
&mut self,
table_name: &str,
alias: Option<&TableAlias>,
) -> Option<(String, Arc<str>)> {
let canonical_res = self
.analyzer
.add_source_table(self.ctx, table_name, None, None);
let canonical = canonical_res
.clone()
.unwrap_or_else(|| self.analyzer.normalize_table_name(table_name));
if let (Some(a), Some(canonical_name)) = (alias, canonical_res) {
self.ctx
.table_aliases
.insert(a.name.to_string(), canonical_name);
}
let node_id = self
.ctx
.table_node_ids
.get(&canonical)
.cloned()
.unwrap_or_else(|| self.analyzer.relation_node_id(&canonical));
self.analyzer
.tracker
.record_produced(&canonical, self.ctx.statement_index);
self.analyzer
.add_table_columns_from_schema(self.ctx, &canonical, &node_id);
Some((canonical, node_id))
}
pub fn analyze_dml_target_factor(&mut self, table: &TableFactor) -> Option<Arc<str>> {
if let TableFactor::Table { name, alias, .. } = table {
let table_name = name.to_string();
self.analyze_dml_target(&table_name, alias.as_ref())
.map(|(_, node_id)| node_id)
} else {
self.visit_table_factor(table);
None
}
}
pub fn analyze_dml_target_from_table_with_joins(
&mut self,
table: &TableWithJoins,
) -> Option<Arc<str>> {
if let TableFactor::Table { name, alias, .. } = &table.relation {
let table_name = name.to_string();
self.analyze_dml_target(&table_name, alias.as_ref())
.map(|(_, node_id)| node_id)
} else {
self.visit_table_with_joins(table);
None
}
}
pub fn register_aliases_in_table_with_joins(&mut self, table_with_joins: &TableWithJoins) {
self.register_aliases_in_table_factor(&table_with_joins.relation);
for join in &table_with_joins.joins {
self.register_aliases_in_table_factor(&join.relation);
}
}
fn register_aliases_in_table_factor(&mut self, table_factor: &TableFactor) {
match table_factor {
TableFactor::Table {
name,
alias: Some(a),
..
} => {
let canonical = self
.analyzer
.canonicalize_table_reference(&name.to_string())
.canonical;
self.ctx.table_aliases.insert(a.name.to_string(), canonical);
}
TableFactor::Derived { alias: Some(a), .. } => {
self.ctx.subquery_aliases.insert(a.name.to_string());
}
TableFactor::NestedJoin {
table_with_joins, ..
} => {
self.register_aliases_in_table_with_joins(table_with_joins);
}
_ => {}
}
}
pub fn resolve_table_alias(&self, alias: Option<&str>) -> Option<String> {
self.analyzer.resolve_table_alias(self.ctx, alias)
}
pub(super) fn canonicalize_table_reference(&self, name: &str) -> super::TableResolution {
self.analyzer.canonicalize_table_reference(name)
}
fn extract_identifiers_from_expr(&mut self, expr: &Expr) {
match expr {
Expr::Identifier(ident) => {
self.try_add_identifier_as_table(std::slice::from_ref(ident));
}
Expr::CompoundIdentifier(idents) => {
self.try_add_identifier_as_table(idents);
}
Expr::Function(func) => {
if let ast::FunctionArguments::List(arg_list) = &func.args {
for arg in &arg_list.args {
if let ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(e)) = arg {
self.extract_identifiers_from_expr(e);
}
}
}
}
Expr::BinaryOp { left, right, .. } => {
self.extract_identifiers_from_expr(left);
self.extract_identifiers_from_expr(right);
}
Expr::UnaryOp { expr, .. } => {
self.extract_identifiers_from_expr(expr);
}
Expr::Nested(e) => {
self.extract_identifiers_from_expr(e);
}
Expr::InList { expr, list, .. } => {
self.extract_identifiers_from_expr(expr);
for e in list {
self.extract_identifiers_from_expr(e);
}
}
Expr::Case {
operand,
conditions,
else_result,
..
} => {
if let Some(op) = operand {
self.extract_identifiers_from_expr(op);
}
for case_when in conditions {
self.extract_identifiers_from_expr(&case_when.condition);
self.extract_identifiers_from_expr(&case_when.result);
}
if let Some(else_r) = else_result {
self.extract_identifiers_from_expr(else_r);
}
}
_ => {}
}
}
fn try_add_identifier_as_table(&mut self, idents: &[Ident]) {
if idents.is_empty() {
return;
}
let name = idents
.iter()
.map(|i| i.value.as_str())
.collect::<Vec<_>>()
.join(".");
let resolution = self.analyzer.canonicalize_table_reference(&name);
if resolution.matched_schema {
self.add_source_table(&name);
}
}
fn emit_alias_warning(&mut self, clause_name: &str, alias_name: &str) {
let dialect = self.analyzer.request.dialect;
let statement_index = self.ctx.statement_index;
self.analyzer.issues.push(alias_visibility_warning(
dialect,
clause_name,
alias_name,
statement_index,
));
}
fn analyze_order_by(&mut self, order_by: &ast::OrderBy) {
let dialect = self.analyzer.request.dialect;
let order_exprs = match &order_by.kind {
ast::OrderByKind::Expressions(exprs) => exprs,
ast::OrderByKind::All(_) => return,
};
if !dialect.alias_in_order_by() {
for order_expr in order_exprs {
let identifiers = ExpressionAnalyzer::extract_simple_identifiers(&order_expr.expr);
for ident in &identifiers {
let normalized_ident = self.analyzer.normalize_identifier(ident);
if let Some(alias_name) = self
.ctx
.output_columns
.iter()
.find(|c| self.analyzer.normalize_identifier(&c.name) == normalized_ident)
.map(|c| c.name.clone())
{
self.emit_alias_warning("ORDER BY", &alias_name);
}
}
}
}
for order_expr in order_exprs {
let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
ea.analyze(&order_expr.expr);
}
}
}
impl<'a, 'b> Visitor for LineageVisitor<'a, 'b> {
fn visit_query(&mut self, query: &Query) {
if let Some(with) = &query.with {
let mut cte_ids: Vec<(String, Arc<str>)> = Vec::new();
for cte in &with.cte_tables {
let cte_name = cte.alias.name.to_string();
let cte_span = self.locate_cte_definition_span(&cte_name);
let body_span = cte_span.and_then(|span| {
let sql = if let Some(source) = &self.analyzer.current_statement_source {
source.sql.as_ref()
} else {
self.analyzer.request.sql.as_str()
};
find_cte_body_span(sql, span)
});
let cte_id = self.ctx.add_node(Node {
id: generate_statement_scoped_node_id(
"cte",
self.ctx.statement_index,
&cte_name,
),
node_type: NodeType::Cte,
label: cte_name.clone().into(),
qualified_name: Some(cte_name.clone().into()),
span: cte_span,
name_spans: cte_span.into_iter().collect(),
body_span,
..Default::default()
});
if let Some(span) = cte_span {
let cursor = self.ctx.relation_span_cursor(&cte_name);
*cursor = (*cursor).max(span.end);
}
self.ctx
.cte_definitions
.insert(cte_name.clone(), cte_id.clone());
self.ctx
.cte_node_to_name
.insert(cte_id.clone(), cte_name.clone());
self.analyzer.tracker.record_cte(&cte_name);
cte_ids.push((cte_name, cte_id));
}
for (cte, (_, cte_id)) in with.cte_tables.iter().zip(cte_ids.iter()) {
let projection_checkpoint = self.ctx.projection_checkpoint();
let mut cte_visitor =
LineageVisitor::new(self.analyzer, self.ctx, Some(cte_id.to_string()));
cte_visitor.visit_query(&cte.query);
let columns = self.ctx.take_output_columns_since(projection_checkpoint);
self.ctx
.register_cte_output_columns(cte.alias.name.to_string(), columns);
}
}
self.visit_set_expr(&query.body);
if let Some(order_by) = &query.order_by {
self.analyze_order_by(order_by);
}
}
fn visit_set_expr(&mut self, set_expr: &SetExpr) {
match set_expr {
SetExpr::Select(select) => self.visit_select(select),
SetExpr::Query(query) => self.visit_query(query),
SetExpr::SetOperation {
op, left, right, ..
} => {
let op_name = match op {
SetOperator::Union => "UNION",
SetOperator::Intersect => "INTERSECT",
SetOperator::Except => "EXCEPT",
SetOperator::Minus => "MINUS",
};
self.visit_set_expr(left);
self.visit_set_expr(right);
if self.target_node.is_some() {
self.ctx.last_operation = Some(op_name.to_string());
}
}
SetExpr::Values(values) => self.visit_values(values),
SetExpr::Insert(insert_stmt) => {
let Statement::Insert(insert) = insert_stmt else {
return;
};
let target_name = insert.table.to_string();
self.add_source_table(&target_name);
}
SetExpr::Table(tbl) => {
let name = tbl
.table_name
.as_ref()
.map(|n| n.to_string())
.unwrap_or_default();
if !name.is_empty() {
self.add_source_table(&name);
}
}
_ => {}
}
}
fn visit_select(&mut self, select: &Select) {
self.ctx.push_scope();
for table_with_joins in &select.from {
self.visit_table_with_joins(table_with_joins);
}
if self.analyzer.column_lineage_enabled {
let sink_node = self.ctx.sink_node_id().map(|node_id| node_id.to_string());
let target_node = self.target_node.clone().or(sink_node);
let mut select_analyzer = SelectAnalyzer::new(self.analyzer, self.ctx, target_node);
select_analyzer.analyze(select);
}
self.ctx.pop_scope();
}
fn visit_table_with_joins(&mut self, table_with_joins: &TableWithJoins) {
self.visit_table_factor(&table_with_joins.relation);
for join in &table_with_joins.joins {
let (join_type, join_condition) = Analyzer::convert_join_operator(&join.join_operator);
self.ctx.current_join_info.join_type = join_type;
self.ctx.current_join_info.join_condition = join_condition;
self.ctx.last_operation = Analyzer::join_type_to_operation(join_type);
self.visit_table_factor(&join.relation);
if let Some(expr) = Self::extract_join_constraint_expr(&join.join_operator) {
let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
ea.analyze(expr);
self.record_join_fk_relationships(expr);
}
self.ctx.current_join_info.join_type = None;
self.ctx.current_join_info.join_condition = None;
}
}
fn visit_table_factor(&mut self, table_factor: &TableFactor) {
match table_factor {
TableFactor::Table { name, alias, .. } => {
let table_name = name.to_string();
let alias_str = alias.as_ref().map(|a| a.name.to_string());
let canonical = self.add_source_table_with_alias(&table_name, alias_str.as_deref());
if let (Some(a), Some(canonical_name)) = (&alias_str, &canonical) {
self.ctx
.register_alias_in_scope(a.clone(), canonical_name.clone());
}
}
TableFactor::Derived {
subquery, alias, ..
} => {
let alias_name = alias.as_ref().map(|a| a.name.to_string());
let projection_checkpoint = self.ctx.projection_checkpoint();
let derived_span = alias_name
.as_ref()
.and_then(|name| self.locate_derived_alias_span(name));
let derived_node_id = alias_name.as_ref().map(|name| {
self.ctx.add_node(Node {
id: generate_statement_scoped_node_id(
"derived",
self.ctx.statement_index,
name,
),
node_type: NodeType::Cte,
label: name.clone().into(),
qualified_name: Some(name.clone().into()),
span: derived_span,
name_spans: derived_span.into_iter().collect(),
..Default::default()
})
});
if let (Some(name), Some(node_id)) = (alias_name.as_ref(), derived_node_id.as_ref())
{
self.ctx
.cte_node_to_name
.insert(node_id.clone(), name.clone());
}
let mut derived_visitor = LineageVisitor::new(
self.analyzer,
self.ctx,
derived_node_id.as_ref().map(|id| id.to_string()),
);
derived_visitor.visit_query(subquery);
let columns = self.ctx.take_output_columns_since(projection_checkpoint);
if let (Some(name), Some(node_id)) = (alias_name, derived_node_id) {
self.ctx
.register_table_in_scope(name.clone(), node_id.clone());
self.ctx.register_alias_in_scope(name.clone(), name.clone());
self.ctx.register_subquery_columns_in_scope(name, columns);
}
}
TableFactor::NestedJoin {
table_with_joins, ..
} => {
self.visit_table_with_joins(table_with_joins);
}
TableFactor::TableFunction { expr, alias, .. } => {
self.extract_identifiers_from_expr(expr);
let is_value_table = matches!(expr, Expr::Function(func) if is_value_table_function(
self.analyzer.request.dialect,
&func.name.to_string(),
));
if is_value_table {
self.ctx.mark_table_function_in_scope();
}
if let Some(a) = alias {
self.ctx
.register_subquery_alias_in_scope(a.name.to_string());
}
self.analyzer.issues.push(
Issue::info(
issue_codes::UNSUPPORTED_SYNTAX,
"Table function lineage extracted with best-effort identifier matching",
)
.with_statement(self.ctx.statement_index),
);
}
TableFactor::Pivot {
table,
aggregate_functions,
value_column,
value_source,
alias,
..
} => {
self.visit_table_factor(table);
for func in aggregate_functions {
self.extract_identifiers_from_expr(&func.expr);
}
for expr in value_column {
self.extract_identifiers_from_expr(expr);
}
match value_source {
ast::PivotValueSource::List(values) => {
for value in values {
self.extract_identifiers_from_expr(&value.expr);
}
}
ast::PivotValueSource::Any(_) => {}
ast::PivotValueSource::Subquery(q) => {
self.visit_query(q);
}
}
if let Some(a) = alias {
self.ctx
.register_subquery_alias_in_scope(a.name.to_string());
}
self.analyzer.issues.push(
Issue::warning(
issue_codes::UNSUPPORTED_SYNTAX,
"PIVOT lineage extracted with best-effort identifier matching",
)
.with_statement(self.ctx.statement_index),
);
}
TableFactor::Unpivot {
table,
columns,
alias,
..
} => {
self.visit_table_factor(table);
for col in columns {
self.extract_identifiers_from_expr(&col.expr);
}
if let Some(a) = alias {
self.ctx
.register_subquery_alias_in_scope(a.name.to_string());
}
self.analyzer.issues.push(
Issue::warning(
issue_codes::UNSUPPORTED_SYNTAX,
"UNPIVOT lineage extracted with best-effort identifier matching",
)
.with_statement(self.ctx.statement_index),
);
}
TableFactor::UNNEST {
array_exprs, alias, ..
} => {
for expr in array_exprs {
let mut ea = ExpressionAnalyzer::new(self.analyzer, self.ctx);
let column_refs = ea.extract_column_refs_with_warning(expr);
for col_ref in &column_refs {
if let Some(table_canonical) = self.analyzer.resolve_column_table(
self.ctx,
col_ref.table.as_deref(),
&col_ref.column,
) {
self.add_source_table(&table_canonical);
}
}
}
if let Some(a) = alias {
self.ctx
.register_subquery_alias_in_scope(a.name.to_string());
}
}
_ => {}
}
}
fn visit_values(&mut self, values: &Values) {
let mut expr_analyzer = ExpressionAnalyzer::new(self.analyzer, self.ctx);
for row in &values.rows {
for expr in row {
expr_analyzer.analyze(expr);
}
}
}
}