use super::complexity;
use super::context::StatementContext;
use super::expression::ExpressionAnalyzer;
use super::helpers::{
classify_query_type, extract_simple_name, generate_column_node_id, generate_edge_id,
generate_node_id, split_qualified_identifiers,
};
use super::visitor::{LineageVisitor, Visitor};
use super::Analyzer;
use crate::error::ParseError;
use crate::types::{
issue_codes, Edge, EdgeType, Issue, JoinType, Node, NodeType, Span, StatementLineage,
};
use regex::Regex;
use sqlparser::ast::{
self, AlterTableOperation, Assignment, AssignmentTarget, CopyIntoSnowflakeKind, CopySource,
CopyTarget, Expr, FromTable, MergeAction, MergeClause, MergeInsertKind, ObjectName,
RenameTableNameKind, Statement, TableFactor, TableWithJoins, UpdateTableFromKind,
};
use std::collections::{HashMap, HashSet};
use std::ops::Range;
use std::sync::{Arc, LazyLock};
#[cfg(feature = "tracing")]
use tracing::{info, info_span};
#[cfg(feature = "templating")]
use crate::templater::TemplateMode;
fn span_from_range(range: Option<&Range<usize>>) -> Option<Span> {
let range = range?;
if range.start > range.end {
return None;
}
Some(Span::new(range.start, range.end))
}
struct JoinNodeInfo {
node_id: Arc<str>,
join_type: Option<JoinType>,
join_condition: Option<Arc<str>>,
}
pub(super) struct StatementSource {
pub source_range: Range<usize>,
pub original_source_range: Option<Range<usize>>,
pub original_sql: Option<String>,
pub resolved_sql: Option<String>,
}
impl<'a> Analyzer<'a> {
#[cfg_attr(feature = "tracing", tracing::instrument(skip(self, statement, source), fields(index, name = source_name.as_deref())))]
pub(super) fn analyze_statement(
&mut self,
index: usize,
statement: &Statement,
source_name: Option<String>,
source: StatementSource,
) -> Result<StatementLineage, ParseError> {
let StatementSource {
source_range,
original_source_range,
original_sql,
resolved_sql,
} = source;
let mut ctx = StatementContext::new(index);
let statement_type = match statement {
Statement::Query(query) => {
let model_name = if self.is_dbt_mode() {
source_name.as_ref().map(|path| extract_model_name(path))
} else {
None
};
let normalized_model_name = model_name.map(|n| self.normalize_table_name(n));
let model_node_type = dbt_model_relation_type(original_sql.as_deref());
let sink_target_id: Option<Arc<str>> = if let Some(ref name) = normalized_model_name
{
match model_node_type {
NodeType::View => self.tracker.record_view_produced(name, index),
NodeType::Table => self.tracker.record_produced(name, index),
NodeType::Cte => self.tracker.declare_ephemeral(name),
NodeType::Output | NodeType::Column => {
unreachable!("dbt model sinks must be relation-like")
}
}
let (canonical_id, node_type) = self.tracker.relation_identity(name);
let producer_span = span_from_range(original_source_range.as_ref())
.or_else(|| span_from_range(Some(&source_range)));
Some(ctx.ensure_model_relation_sink(
name,
canonical_id,
node_type,
producer_span,
))
} else {
ctx.ensure_output_node_with_model(None);
None
};
self.analyze_query(&mut ctx, query, sink_target_id.as_deref());
classify_query_type(query)
}
Statement::Insert(insert) => {
self.analyze_insert(&mut ctx, insert);
"INSERT".to_string()
}
Statement::CreateTable(create) => {
if let Some(query) = &create.query {
self.analyze_create_table_as(&mut ctx, &create.name, query, create.temporary);
"CREATE_TABLE_AS".to_string()
} else {
self.analyze_create_table(
&mut ctx,
&create.name,
&create.columns,
&create.constraints,
create.temporary,
);
"CREATE_TABLE".to_string()
}
}
Statement::CreateView(create_view) => {
self.analyze_create_view(
&mut ctx,
&create_view.name,
&create_view.query,
&create_view.columns,
create_view.temporary,
);
"CREATE_VIEW".to_string()
}
Statement::Update(update) => {
self.analyze_update(
&mut ctx,
&update.table,
&update.assignments,
&update.from,
&update.selection,
);
"UPDATE".to_string()
}
Statement::Delete(delete) => {
self.analyze_delete(
&mut ctx,
&delete.tables,
&delete.from,
&delete.using,
&delete.selection,
);
"DELETE".to_string()
}
Statement::Merge(merge) => {
self.analyze_merge(
&mut ctx,
merge.into,
&merge.table,
&merge.source,
&merge.on,
&merge.clauses,
);
"MERGE".to_string()
}
Statement::Drop {
object_type, names, ..
} => {
self.analyze_drop(&mut ctx, object_type, names);
"DROP".to_string()
}
Statement::AlterTable(alter_table) => {
self.analyze_alter_table(&mut ctx, &alter_table.name, &alter_table.operations);
"ALTER_TABLE".to_string()
}
Statement::AlterView { .. } => "ALTER_VIEW".to_string(),
Statement::AlterIndex { .. } => "ALTER_INDEX".to_string(),
Statement::AlterSchema(_) => "ALTER_SCHEMA".to_string(),
Statement::AlterRole { .. } => "ALTER_ROLE".to_string(),
Statement::Grant { .. } => "GRANT".to_string(),
Statement::Revoke { .. } => "REVOKE".to_string(),
Statement::Set(_) => "SET".to_string(),
Statement::ShowVariable { .. } | Statement::ShowVariables { .. } => "SHOW".to_string(),
Statement::Truncate { .. } => "TRUNCATE".to_string(),
Statement::Comment { .. } => "COMMENT".to_string(),
Statement::Explain { .. } | Statement::ExplainTable { .. } => "EXPLAIN".to_string(),
Statement::Analyze { .. } => "ANALYZE".to_string(),
Statement::Call(_) => "CALL".to_string(),
Statement::Use(_) => "USE".to_string(),
Statement::StartTransaction { .. }
| Statement::Commit { .. }
| Statement::Rollback { .. }
| Statement::Savepoint { .. } => "TRANSACTION".to_string(),
Statement::CreateIndex(_) => "CREATE_INDEX".to_string(),
Statement::CreateSchema { .. } => "CREATE_SCHEMA".to_string(),
Statement::CreateDatabase { .. } => "CREATE_DATABASE".to_string(),
Statement::CreateRole { .. } => "CREATE_ROLE".to_string(),
Statement::CreateFunction { .. } => "CREATE_FUNCTION".to_string(),
Statement::CreateProcedure { .. } => "CREATE_PROCEDURE".to_string(),
Statement::CreateTrigger { .. } => "CREATE_TRIGGER".to_string(),
Statement::CreateType { .. } => "CREATE_TYPE".to_string(),
Statement::CreateSequence { .. } => "CREATE_SEQUENCE".to_string(),
Statement::CreateExtension { .. } => "CREATE_EXTENSION".to_string(),
Statement::DropFunction { .. } => "DROP_FUNCTION".to_string(),
Statement::DropProcedure { .. } => "DROP_PROCEDURE".to_string(),
Statement::DropTrigger { .. } => "DROP_TRIGGER".to_string(),
Statement::Copy {
source, to, target, ..
} => {
self.analyze_copy(&mut ctx, source, *to, target);
"COPY".to_string()
}
Statement::CopyIntoSnowflake {
kind,
into,
from_obj,
from_query,
..
} => {
self.analyze_copy_into_snowflake(&mut ctx, kind, into, from_obj, from_query);
"COPY".to_string()
}
Statement::Unload {
query, query_text, ..
} => {
self.analyze_unload(&mut ctx, query, query_text);
"UNLOAD".to_string()
}
_ => {
self.issues.push(
Issue::warning(
issue_codes::UNSUPPORTED_SYNTAX,
"Statement type not fully supported for lineage analysis",
)
.with_statement(index),
);
"UNKNOWN".to_string()
}
};
self.apply_pending_filters(&mut ctx);
self.propagate_inferred_columns(&mut ctx);
self.add_join_dependency_edges(&mut ctx);
Self::propagate_join_info_to_edges(&mut ctx);
self.register_source_tables_schema(&ctx);
if ctx.instance_limit_reached {
let mut issue = Issue::warning(
issue_codes::MEMORY_LIMIT_EXCEEDED,
"Alias instance limit reached; lineage for some self-join aliases may be incomplete",
);
issue.statement_index = Some(index);
self.issues.push(issue);
}
let join_count = complexity::count_joins(&ctx.joined_table_info);
let complexity_score = complexity::calculate_complexity(&ctx.nodes, &ctx.joined_table_info);
Ok(StatementLineage {
statement_index: index,
statement_type,
source_name,
nodes: ctx.nodes,
edges: ctx.edges,
span: Some(Span::new(source_range.start, source_range.end)),
join_count,
complexity_score,
resolved_sql,
})
}
fn add_join_dependency_edges(&self, ctx: &mut StatementContext) {
let sink_node_id = match ctx.sink_node_id.as_ref() {
Some(node_id) => node_id.clone(),
None => return,
};
let output_column_ids: HashSet<_> = if self.column_lineage_enabled {
ctx.edges
.iter()
.filter(|edge| edge.edge_type == EdgeType::Ownership && edge.from == sink_node_id)
.map(|edge| edge.to.clone())
.collect()
} else {
HashSet::new()
};
if self.column_lineage_enabled {
let has_direct_output_lineage = ctx.edges.iter().any(|edge| {
matches!(edge.edge_type, EdgeType::DataFlow | EdgeType::Derivation)
&& edge.to == sink_node_id
});
if output_column_ids.is_empty() && !has_direct_output_lineage {
return;
}
}
let mut table_columns: HashMap<Arc<str>, Vec<Arc<str>>> = HashMap::new();
if self.column_lineage_enabled {
for edge in &ctx.edges {
if edge.edge_type == EdgeType::Ownership {
table_columns
.entry(edge.from.clone())
.or_default()
.push(edge.to.clone());
}
}
}
let join_nodes: Vec<JoinNodeInfo> = ctx
.nodes
.iter()
.filter(|node| {
node.node_type.is_table_like() && ctx.joined_table_info.contains_key(&node.id)
})
.filter_map(|node| {
let info = ctx.joined_table_info.get(&node.id)?;
Some(JoinNodeInfo {
node_id: node.id.clone(),
join_type: info.join_type,
join_condition: info.join_condition.as_deref().map(Into::into),
})
})
.collect();
for join_info in join_nodes {
let JoinNodeInfo {
node_id,
join_type,
join_condition,
} = join_info;
let contributes_to_output = if self.column_lineage_enabled {
let owned_columns = table_columns.get(&node_id).cloned().unwrap_or_default();
ctx.edges.iter().any(|edge| {
matches!(edge.edge_type, EdgeType::DataFlow | EdgeType::Derivation)
&& (edge.from == node_id
|| owned_columns.iter().any(|col| col == &edge.from))
&& (edge.to == sink_node_id || output_column_ids.contains(&edge.to))
})
} else {
false
};
if contributes_to_output {
continue;
}
let edge_key = format!(
"join_dependency:{node_id}:{join_type:?}:{}",
join_condition.as_deref().unwrap_or("")
);
let edge_id = generate_edge_id(&edge_key, sink_node_id.as_ref());
if ctx.edge_ids.contains(&edge_id) {
continue;
}
ctx.add_edge(Edge {
id: edge_id,
from: node_id,
to: sink_node_id.clone(),
edge_type: EdgeType::JoinDependency,
expression: None,
operation: None,
join_type,
join_condition,
metadata: None,
approximate: None,
statement_ids: Vec::new(),
});
}
}
fn propagate_join_info_to_edges(ctx: &mut StatementContext) {
if ctx.joined_table_info.is_empty() {
return;
}
let mut column_to_table: HashMap<Arc<str>, Arc<str>> = HashMap::new();
let mut candidate_indices: Vec<usize> = Vec::new();
for (i, edge) in ctx.edges.iter().enumerate() {
match edge.edge_type {
EdgeType::Ownership => {
column_to_table.insert(edge.to.clone(), edge.from.clone());
}
EdgeType::DataFlow | EdgeType::Derivation | EdgeType::JoinDependency
if edge.join_type.is_none() =>
{
candidate_indices.push(i);
}
_ => {}
}
}
for i in candidate_indices {
let edge = &ctx.edges[i];
let source_table_id = if ctx.joined_table_info.contains_key(&edge.from) {
Some(edge.from.clone())
} else {
column_to_table
.get(&edge.from)
.filter(|table_id| ctx.joined_table_info.contains_key(*table_id))
.cloned()
};
if let Some(info) = source_table_id.and_then(|id| ctx.joined_table_info.get(&id)) {
let edge = &mut ctx.edges[i];
edge.join_type = info.join_type;
if edge.join_condition.is_none() {
edge.join_condition = info.join_condition.as_deref().map(Into::into);
}
}
}
}
pub(super) fn analyze_insert(&mut self, ctx: &mut StatementContext, insert: &ast::Insert) {
let target_name = insert.table.to_string();
let canonical = self.normalize_table_name(&target_name);
let target_label = extract_simple_name(&target_name);
let target_id = ctx.add_node(Node {
id: generate_node_id("table", &canonical),
node_type: NodeType::Table,
label: target_label.clone().into(),
qualified_name: Some(canonical.clone().into()),
..Default::default()
});
if let Some(span) = self.locate_relation_name_span(ctx, &target_name) {
ctx.add_name_span(&target_id, span);
}
self.tracker
.record_produced(&canonical, ctx.statement_index);
if let Some(ref source_body) = insert.source {
self.analyze_query(ctx, source_body, Some(&target_id));
}
}
pub(super) fn analyze_update(
&mut self,
ctx: &mut StatementContext,
table: &TableWithJoins,
assignments: &[Assignment],
from: &Option<UpdateTableFromKind>,
selection: &Option<Expr>,
) {
let target_node_id = {
let mut visitor = LineageVisitor::new(self, ctx, None);
visitor.analyze_dml_target_from_table_with_joins(table)
};
{
let target = LineageVisitor::target_from_arc(target_node_id.as_ref());
let mut visitor = LineageVisitor::new(self, ctx, target);
if let Some(from_kind) = from {
match from_kind {
UpdateTableFromKind::BeforeSet(tables) => {
for t in tables {
visitor.visit_table_with_joins(t);
}
}
UpdateTableFromKind::AfterSet(tables) => {
for t in tables {
visitor.visit_table_with_joins(t);
}
}
}
}
for join in &table.joins {
visitor.set_last_operation(Some("JOIN".to_string()));
visitor.visit_table_factor(&join.relation);
}
}
for assignment in assignments {
if let Some(col_name) = self.extract_assignment_target_name(assignment) {
Self::add_target_column_node(ctx, target_node_id.as_deref(), &col_name);
}
}
let mut expr_analyzer = ExpressionAnalyzer::new(self, ctx);
for assignment in assignments {
Self::analyze_assignment_targets(&mut expr_analyzer, assignment);
expr_analyzer.analyze(&assignment.value);
}
if let Some(expr) = selection {
expr_analyzer.analyze(expr);
}
}
pub(super) fn analyze_delete(
&mut self,
ctx: &mut StatementContext,
tables: &[ObjectName],
from: &FromTable,
using: &Option<Vec<TableWithJoins>>,
selection: &Option<Expr>,
) {
let mut target_ids: Vec<Arc<str>> = Vec::new();
{
let mut visitor = LineageVisitor::new(self, ctx, None);
match from {
FromTable::WithFromKeyword(ts) | FromTable::WithoutKeyword(ts) => {
for t in ts {
visitor.register_aliases_in_table_with_joins(t);
}
}
}
if let Some(us) = using {
for t in us {
visitor.register_aliases_in_table_with_joins(t);
}
}
if !tables.is_empty() {
for obj in tables {
let name = obj.to_string();
let target_canonical = visitor
.resolve_table_alias(Some(&name))
.unwrap_or_else(|| visitor.canonicalize_table_reference(&name).canonical);
if let Some((_canonical, node_id)) =
visitor.analyze_dml_target(&target_canonical, None)
{
#[cfg(feature = "tracing")]
info!(target: "analyzer", "DELETE target identified: {} (ID: {})", _canonical, node_id);
target_ids.push(node_id);
}
}
} else {
let ts = match from {
FromTable::WithFromKeyword(ts) | FromTable::WithoutKeyword(ts) => ts,
};
if let Some(first) = ts.first() {
if let TableFactor::Table { name, alias, .. } = &first.relation {
let name_str = name.to_string();
if let Some((_canonical, node_id)) =
visitor.analyze_dml_target(&name_str, alias.as_ref())
{
#[cfg(feature = "tracing")]
info!(target: "analyzer", "DELETE target identified: {} (ID: {})", _canonical, node_id);
target_ids.push(node_id);
}
}
}
}
}
let sources: Vec<&[TableWithJoins]> = {
let from_tables = match from {
FromTable::WithFromKeyword(ts) | FromTable::WithoutKeyword(ts) => ts.as_slice(),
};
let mut sources = vec![from_tables];
if let Some(us) = using {
sources.push(us.as_slice());
}
sources
};
if target_ids.is_empty() {
let mut visitor = LineageVisitor::new(self, ctx, None);
for ts in sources {
for t in ts {
visitor.visit_table_with_joins(t);
}
}
} else {
for target_id in &target_ids {
let mut visitor = LineageVisitor::new(self, ctx, Some(target_id.to_string()));
for ts in &sources {
for t in *ts {
visitor.visit_table_with_joins(t);
}
}
}
}
if let Some(expr) = selection {
let mut expr_analyzer = ExpressionAnalyzer::new(self, ctx);
expr_analyzer.analyze(expr);
}
}
pub(super) fn analyze_merge(
&mut self,
ctx: &mut StatementContext,
_into: bool,
table: &TableFactor,
source: &TableFactor,
on: &Expr,
clauses: &[MergeClause],
) {
let mut visitor = LineageVisitor::new(self, ctx, None);
let target_id = visitor.analyze_dml_target_factor(table);
visitor.set_target_node(LineageVisitor::target_from_arc(target_id.as_ref()));
visitor.visit_table_factor(source);
for clause in clauses {
match &clause.action {
MergeAction::Update(update_expr) => {
for assignment in &update_expr.assignments {
if let Some(col_name) = self.extract_assignment_target_name(assignment) {
Self::add_target_column_node(ctx, target_id.as_deref(), &col_name);
}
}
}
MergeAction::Insert(insert_expr) => {
for col in &insert_expr.columns {
if let Some(ident) = col.0.iter().filter_map(|p| p.as_ident()).next_back() {
let col_name = self.normalize_identifier(&ident.value);
Self::add_target_column_node(ctx, target_id.as_deref(), &col_name);
}
}
}
MergeAction::Delete { .. } => {}
}
}
let mut expr_analyzer = ExpressionAnalyzer::new(self, ctx);
expr_analyzer.analyze(on);
for clause in clauses {
match &clause.action {
MergeAction::Update(update_expr) => {
for assignment in &update_expr.assignments {
Self::analyze_assignment_targets(&mut expr_analyzer, assignment);
expr_analyzer.analyze(&assignment.value);
}
}
MergeAction::Insert(insert_expr) => {
for col in &insert_expr.columns {
Self::analyze_object_name_as_column(&mut expr_analyzer, col);
}
match &insert_expr.kind {
MergeInsertKind::Values(values) => {
for row in &values.rows {
for value in row {
expr_analyzer.analyze(value);
}
}
}
MergeInsertKind::Row => {}
}
}
MergeAction::Delete { .. } => {}
}
if let Some(ref predicate) = clause.predicate {
expr_analyzer.analyze(predicate);
}
}
}
fn analyze_assignment_targets(
expr_analyzer: &mut ExpressionAnalyzer<'_, '_>,
assignment: &Assignment,
) {
let names = match &assignment.target {
AssignmentTarget::ColumnName(name) => vec![name],
AssignmentTarget::Tuple(names) => names.iter().collect(),
};
for name in names {
Self::analyze_object_name_as_column(expr_analyzer, name);
}
}
fn add_target_column_node(ctx: &mut StatementContext, target_id: Option<&str>, col_name: &str) {
let parent = target_id;
let col_node_id = generate_column_node_id(parent, col_name);
ctx.add_node(Node {
id: col_node_id.clone(),
node_type: NodeType::Column,
label: col_name.into(),
..Default::default()
});
if let Some(tid) = target_id {
let edge_id = generate_edge_id(tid, &col_node_id);
if !ctx.edge_ids.contains(&edge_id) {
ctx.add_edge(Edge {
id: edge_id,
from: tid.into(),
to: col_node_id,
edge_type: EdgeType::Ownership,
expression: None,
operation: None,
join_type: None,
join_condition: None,
metadata: None,
approximate: None,
statement_ids: Vec::new(),
});
}
}
}
fn extract_assignment_target_name(&self, assignment: &Assignment) -> Option<String> {
let name = match &assignment.target {
AssignmentTarget::ColumnName(name) => name,
AssignmentTarget::Tuple(_) => return None,
};
name.0
.last()
.and_then(|p| p.as_ident())
.map(|ident| self.normalize_identifier(&ident.value))
}
fn analyze_object_name_as_column(
expr_analyzer: &mut ExpressionAnalyzer<'_, '_>,
name: &ObjectName,
) {
let idents: Vec<_> = name
.0
.iter()
.filter_map(|p| p.as_ident().cloned())
.collect();
match idents.len() {
0 => {}
1 => expr_analyzer.analyze(&Expr::Identifier(idents.into_iter().next().unwrap())),
_ => expr_analyzer.analyze(&Expr::CompoundIdentifier(idents)),
}
}
pub(super) fn analyze_drop(
&mut self,
_ctx: &mut StatementContext,
object_type: &ast::ObjectType,
names: &[ObjectName],
) {
if self.allow_implied()
&& matches!(object_type, ast::ObjectType::Table | ast::ObjectType::View)
{
for name in names {
let table_name = name.to_string();
let canonical = self.normalize_table_name(&table_name);
self.schema.remove_implied(&canonical);
self.tracker.remove(&canonical);
}
}
}
pub(super) fn analyze_copy(
&mut self,
ctx: &mut StatementContext,
source: &CopySource,
to: bool,
_target: &CopyTarget,
) {
match source {
CopySource::Table { table_name, .. } => {
let name = table_name.to_string();
let canonical = self.normalize_table_name(&name);
let node_id = generate_node_id("table", &canonical);
let label = extract_simple_name(&name);
ctx.add_node(Node {
id: node_id.clone(),
node_type: NodeType::Table,
label: label.clone().into(),
qualified_name: Some(canonical.clone().into()),
..Default::default()
});
if let Some(span) = self.locate_relation_name_span(ctx, &name) {
ctx.add_name_span(&node_id, span);
}
if to {
self.tracker
.record_consumed(&canonical, ctx.statement_index);
} else {
self.tracker
.record_produced(&canonical, ctx.statement_index);
}
}
CopySource::Query(query) => {
self.analyze_query(ctx, query, None);
}
}
}
pub(super) fn analyze_copy_into_snowflake(
&mut self,
ctx: &mut StatementContext,
kind: &CopyIntoSnowflakeKind,
into: &ObjectName,
from_obj: &Option<ObjectName>,
from_query: &Option<Box<ast::Query>>,
) {
match kind {
CopyIntoSnowflakeKind::Table => {
let name = into.to_string();
let canonical = self.normalize_table_name(&name);
let target_id = generate_node_id("table", &canonical);
let label = extract_simple_name(&name);
ctx.add_node(Node {
id: target_id.clone(),
node_type: NodeType::Table,
label: label.clone().into(),
qualified_name: Some(canonical.clone().into()),
..Default::default()
});
if let Some(span) = self.locate_relation_name_span(ctx, &name) {
ctx.add_name_span(&target_id, span);
}
self.tracker
.record_produced(&canonical, ctx.statement_index);
if let Some(query) = from_query {
self.analyze_query(ctx, query, Some(&target_id));
}
}
CopyIntoSnowflakeKind::Location => {
if let Some(query) = from_query {
self.analyze_query(ctx, query, None);
} else if let Some(table_name) = from_obj {
let name = table_name.to_string();
let canonical = self.normalize_table_name(&name);
let node_id = generate_node_id("table", &canonical);
let label = extract_simple_name(&name);
ctx.add_node(Node {
id: node_id.clone(),
node_type: NodeType::Table,
label: label.clone().into(),
qualified_name: Some(canonical.clone().into()),
..Default::default()
});
if let Some(span) = self.locate_relation_name_span(ctx, &name) {
ctx.add_name_span(&node_id, span);
}
self.tracker
.record_consumed(&canonical, ctx.statement_index);
}
}
}
}
pub(super) fn analyze_alter_table(
&mut self,
ctx: &mut StatementContext,
old_name: &ObjectName,
operations: &[AlterTableOperation],
) {
for op in operations {
if let AlterTableOperation::RenameTable { table_name } = op {
self.analyze_rename_table(ctx, old_name, table_name);
}
}
}
fn analyze_rename_table(
&mut self,
ctx: &mut StatementContext,
old_name: &ObjectName,
new_name: &RenameTableNameKind,
) {
let new_table_name = match new_name {
RenameTableNameKind::To(name) | RenameTableNameKind::As(name) => name,
};
let old_name_str = old_name.to_string();
let old_canonical = self.normalize_table_name(&old_name_str);
let old_node_id = generate_node_id("table", &old_canonical);
let new_name_str = new_table_name.to_string();
let mut inherited_parts = split_qualified_identifiers(&old_name_str);
let new_parts = split_qualified_identifiers(&new_name_str);
let new_name_with_schema = if new_parts.len() == 1 && inherited_parts.len() > 1 {
inherited_parts.pop();
inherited_parts.push(new_name_str.clone());
inherited_parts.join(".")
} else {
new_name_str.clone()
};
let new_canonical = self.normalize_table_name(&new_name_with_schema);
let new_node_id = generate_node_id("table", &new_canonical);
let old_label = extract_simple_name(&old_name_str);
ctx.add_node(Node {
id: old_node_id.clone(),
node_type: NodeType::Table,
label: old_label.clone().into(),
qualified_name: Some(old_canonical.clone().into()),
..Default::default()
});
if let Some(span) = self.locate_relation_name_span(ctx, &old_name_str) {
ctx.add_name_span(&old_node_id, span);
}
let new_label = extract_simple_name(&new_name_str);
ctx.add_node(Node {
id: new_node_id.clone(),
node_type: NodeType::Table,
label: new_label.clone().into(),
qualified_name: Some(new_canonical.clone().into()),
..Default::default()
});
if let Some(span) = self.locate_relation_name_span(ctx, &new_name_str) {
ctx.add_name_span(&new_node_id, span);
}
let edge_id = generate_edge_id(&old_node_id, &new_node_id);
ctx.add_edge(Edge {
id: edge_id,
from: old_node_id,
to: new_node_id,
edge_type: EdgeType::DataFlow,
expression: None,
operation: Some("RENAME".into()),
join_type: None,
join_condition: None,
metadata: None,
approximate: None,
statement_ids: Vec::new(),
});
self.tracker
.record_consumed(&old_canonical, ctx.statement_index);
self.tracker
.record_produced(&new_canonical, ctx.statement_index);
}
pub(super) fn analyze_unload(
&mut self,
ctx: &mut StatementContext,
query: &Option<Box<ast::Query>>,
query_text: &Option<String>,
) {
if let Some(ref parsed_query) = query {
self.analyze_query(ctx, parsed_query, None);
return;
}
if let Some(ref text) = query_text {
let dialect = self.request.dialect.to_sqlparser_dialect();
match sqlparser::parser::Parser::parse_sql(dialect.as_ref(), text) {
Ok(statements) => {
for stmt in statements {
if let Statement::Query(parsed_query) = stmt {
self.analyze_query(ctx, &parsed_query, None);
}
}
}
Err(_) => {
self.issues.push(
Issue::warning(
issue_codes::PARSE_ERROR,
"Could not parse UNLOAD query string for lineage analysis",
)
.with_statement(ctx.statement_index),
);
}
}
}
}
#[cfg(feature = "templating")]
pub(super) fn is_dbt_mode(&self) -> bool {
self.request
.template_config
.as_ref()
.map(|c| c.mode == TemplateMode::Dbt)
.unwrap_or(false)
}
#[cfg(not(feature = "templating"))]
pub(super) fn is_dbt_mode(&self) -> bool {
false
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum DbtMaterialization {
Table,
View,
Incremental,
Ephemeral,
Snapshot,
MaterializedView,
}
impl DbtMaterialization {
fn node_type(self) -> NodeType {
match self {
DbtMaterialization::Table
| DbtMaterialization::Incremental
| DbtMaterialization::Snapshot => NodeType::Table,
DbtMaterialization::View | DbtMaterialization::MaterializedView => NodeType::View,
DbtMaterialization::Ephemeral => NodeType::Cte,
}
}
}
static MATERIALIZED_KWARG: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r#"(?i)materialized\s*=\s*(?:'(\w+)'|"(\w+)")"#)
.expect("materialized kwarg regex is valid")
});
static MATERIALIZED_KWARG_PRESENT: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"(?i)(^|[^A-Za-z0-9_])materialized\s*=")
.expect("materialized presence regex is valid")
});
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum DbtMaterializationDetection {
NotConfigured,
Known(DbtMaterialization),
Unresolved,
}
impl DbtMaterializationDetection {
pub(super) fn node_type(self) -> Option<NodeType> {
match self {
DbtMaterializationDetection::Known(m) => Some(m.node_type()),
DbtMaterializationDetection::NotConfigured
| DbtMaterializationDetection::Unresolved => None,
}
}
}
pub(super) fn detect_dbt_model_materialization(sql: &str) -> DbtMaterializationDetection {
let lower = sql.to_ascii_lowercase();
let mut search_from = 0;
let mut last_value: Option<String> = None;
let mut saw_materialized_kwarg = false;
while let Some((body_range, next_search_from)) =
find_next_config_call_body(sql, &lower, search_from)
{
let body = &sql[body_range.clone()];
if !saw_materialized_kwarg && MATERIALIZED_KWARG_PRESENT.is_match(body) {
saw_materialized_kwarg = true;
}
if let Some(captures) = MATERIALIZED_KWARG.captures(body) {
last_value = captures
.get(1)
.or_else(|| captures.get(2))
.map(|m| m.as_str().to_string());
}
search_from = next_search_from;
}
if let Some(raw) = last_value {
match raw.to_ascii_lowercase().as_str() {
"table" => DbtMaterializationDetection::Known(DbtMaterialization::Table),
"view" => DbtMaterializationDetection::Known(DbtMaterialization::View),
"incremental" => DbtMaterializationDetection::Known(DbtMaterialization::Incremental),
"ephemeral" => DbtMaterializationDetection::Known(DbtMaterialization::Ephemeral),
"snapshot" => DbtMaterializationDetection::Known(DbtMaterialization::Snapshot),
"materialized_view" => {
DbtMaterializationDetection::Known(DbtMaterialization::MaterializedView)
}
_ => DbtMaterializationDetection::Unresolved,
}
} else if saw_materialized_kwarg {
DbtMaterializationDetection::Unresolved
} else {
DbtMaterializationDetection::NotConfigured
}
}
pub(super) fn dbt_model_relation_type(sql: Option<&str>) -> NodeType {
match sql.map(detect_dbt_model_materialization) {
Some(DbtMaterializationDetection::Known(m)) => m.node_type(),
_ => NodeType::Table,
}
}
const CONFIG_BODY_SCAN_LIMIT: usize = 1_000_000;
const CONFIG_BODY_MAX_DEPTH: u32 = 256;
fn find_next_config_call_body(
sql: &str,
lower: &str,
search_from: usize,
) -> Option<(Range<usize>, usize)> {
debug_assert_eq!(sql.len(), lower.len());
let bytes = sql.as_bytes();
let mut search_from = search_from;
while let Some(rel) = lower[search_from..].find("config") {
let start = search_from + rel;
let after_keyword = start + "config".len();
if start > 0 {
let prev = bytes[start - 1];
if prev.is_ascii_alphanumeric() || prev == b'_' {
search_from = after_keyword;
continue;
}
}
let mut cursor = after_keyword;
while cursor < bytes.len() && bytes[cursor].is_ascii_whitespace() {
cursor += 1;
}
if cursor >= bytes.len() || bytes[cursor] != b'(' {
search_from = after_keyword;
continue;
}
let body_start = cursor + 1;
let mut depth: u32 = 1;
let mut quote: Option<u8> = None;
let mut i = body_start;
let mut iterations: usize = 0;
while i < bytes.len() {
iterations += 1;
if iterations > CONFIG_BODY_SCAN_LIMIT {
return None;
}
let b = bytes[i];
if let Some(q) = quote {
if b == b'\\' && i + 1 < bytes.len() {
i += 2;
continue;
}
if b == q {
quote = None;
}
} else {
match b {
b'\'' | b'"' => quote = Some(b),
b'(' => {
depth += 1;
if depth > CONFIG_BODY_MAX_DEPTH {
return None;
}
}
b')' => {
depth -= 1;
if depth == 0 {
return Some((body_start..i, i + 1));
}
}
_ => {}
}
}
i += 1;
}
return None;
}
None
}
#[cfg(test)]
mod dbt_materialization_tests {
use super::{
detect_dbt_model_materialization, DbtMaterialization, DbtMaterializationDetection,
};
fn known(sql: &str) -> Option<DbtMaterialization> {
match detect_dbt_model_materialization(sql) {
DbtMaterializationDetection::Known(m) => Some(m),
_ => None,
}
}
#[test]
fn matches_single_quoted_view() {
let sql = "{{ config(materialized='view') }}\nSELECT 1";
assert_eq!(known(sql), Some(DbtMaterialization::View));
}
#[test]
fn matches_double_quoted_view() {
let sql = r#"{{ config(materialized="view") }} SELECT 1"#;
assert_eq!(known(sql), Some(DbtMaterialization::View));
}
#[test]
fn tolerates_whitespace_around_tokens() {
let sql = "{{ config ( materialized = 'view' ) }} SELECT 1";
assert_eq!(known(sql), Some(DbtMaterialization::View));
}
#[test]
fn handles_nested_parens_in_sibling_kwargs() {
let sql = "{{ config(materialized='view', partition_by=date_trunc('day', ts)) }} \
SELECT 1";
assert_eq!(known(sql), Some(DbtMaterialization::View));
}
#[test]
fn recognizes_all_known_materializations() {
let cases = [
("table", DbtMaterialization::Table),
("view", DbtMaterialization::View),
("incremental", DbtMaterialization::Incremental),
("ephemeral", DbtMaterialization::Ephemeral),
("snapshot", DbtMaterialization::Snapshot),
("materialized_view", DbtMaterialization::MaterializedView),
];
for (keyword, expected) in cases {
let sql = format!("{{{{ config(materialized='{keyword}') }}}} SELECT 1");
assert_eq!(
known(&sql),
Some(expected),
"failed for materialization '{keyword}'"
);
}
}
#[test]
fn unknown_materialization_returns_unresolved() {
let sql = "{{ config(materialized='customthing') }} SELECT 1";
assert_eq!(
detect_dbt_model_materialization(sql),
DbtMaterializationDetection::Unresolved
);
}
#[test]
fn node_type_mapping() {
use crate::types::NodeType;
assert_eq!(DbtMaterialization::Table.node_type(), NodeType::Table);
assert_eq!(DbtMaterialization::Incremental.node_type(), NodeType::Table);
assert_eq!(DbtMaterialization::Snapshot.node_type(), NodeType::Table);
assert_eq!(DbtMaterialization::View.node_type(), NodeType::View);
assert_eq!(DbtMaterialization::Ephemeral.node_type(), NodeType::Cte);
assert_eq!(
DbtMaterialization::MaterializedView.node_type(),
NodeType::View
);
}
#[test]
fn ignores_materialized_outside_config() {
let sql = "-- materialized='view'\nSELECT materialized FROM t";
assert_eq!(
detect_dbt_model_materialization(sql),
DbtMaterializationDetection::NotConfigured
);
}
#[test]
fn ignores_config_substring_of_another_identifier() {
let sql = "{{ reconfig(materialized='view') }} SELECT 1";
assert_eq!(
detect_dbt_model_materialization(sql),
DbtMaterializationDetection::NotConfigured
);
}
#[test]
fn unterminated_config_call_returns_not_configured() {
let sql = "{{ config(materialized='view'";
assert_eq!(
detect_dbt_model_materialization(sql),
DbtMaterializationDetection::NotConfigured
);
}
#[test]
fn paren_inside_string_does_not_close_config() {
let sql = r#"{{ config(alias=')', materialized='view') }} SELECT 1"#;
assert_eq!(known(sql), Some(DbtMaterialization::View));
}
#[test]
fn case_insensitive_keywords() {
let sql = "{{ CONFIG(MATERIALIZED='VIEW') }} SELECT 1";
assert_eq!(known(sql), Some(DbtMaterialization::View));
}
#[test]
fn returns_not_configured_when_materialized_kwarg_missing() {
let sql = "{{ config(tags=['daily']) }} SELECT 1";
assert_eq!(
detect_dbt_model_materialization(sql),
DbtMaterializationDetection::NotConfigured
);
}
#[test]
fn finds_materialized_in_later_config_call() {
let sql = "{{ config(tags=['daily']) }} {{ config(materialized='view') }} SELECT 1";
assert_eq!(known(sql), Some(DbtMaterialization::View));
}
#[test]
fn later_config_call_overrides_earlier_materialized_value() {
let sql = "{{ config(materialized='table') }} {{ config(materialized='view') }} SELECT 1";
assert_eq!(known(sql), Some(DbtMaterialization::View));
}
#[test]
fn escaped_quote_in_earlier_kwarg_does_not_break_parsing() {
let sql = r#"{{ config(alias='don\'t', materialized='view') }} SELECT 1"#;
assert_eq!(known(sql), Some(DbtMaterialization::View));
}
#[test]
fn escaped_quote_inside_materialized_value_is_unresolved() {
let sql = r#"{{ config(materialized='it\'s_a_view') }} SELECT 1"#;
assert_eq!(
detect_dbt_model_materialization(sql),
DbtMaterializationDetection::Unresolved
);
}
#[test]
fn unknown_adapter_materialization_reports_unresolved_and_defaults_to_table() {
use super::dbt_model_relation_type;
use crate::types::NodeType;
let sql = "{{ config(materialized='dynamic_table') }} SELECT 1";
assert_eq!(
detect_dbt_model_materialization(sql),
DbtMaterializationDetection::Unresolved
);
assert_eq!(dbt_model_relation_type(Some(sql)), NodeType::Table);
}
#[test]
fn dynamic_jinja_materialization_reports_unresolved_and_defaults_to_table() {
use super::dbt_model_relation_type;
use crate::types::NodeType;
let sql =
"{{ config(materialized=('view' if target.name == 'dev' else 'table')) }} SELECT 1";
assert_eq!(
detect_dbt_model_materialization(sql),
DbtMaterializationDetection::Unresolved
);
assert_eq!(dbt_model_relation_type(Some(sql)), NodeType::Table);
}
}
pub(super) fn extract_model_name(path: &str) -> &str {
let filename = path.rsplit('/').next().unwrap_or(path);
let filename = filename.rsplit('\\').next().unwrap_or(filename);
filename
.strip_suffix(".sql")
.or_else(|| filename.strip_suffix(".sql.jinja"))
.unwrap_or(filename)
}