use std::collections::{BTreeMap, BTreeSet};
use std::io::{self, Write};
use crate::graph::column_lineage::{ColumnImpactReport, ModelColumnLineage, TransformationType};
pub fn render_column_graph_plain(reports: &[ModelColumnLineage]) {
super::handle_stdout_result(render_column_graph_plain_to_writer(
reports,
&mut std::io::stdout().lock(),
));
}
pub fn render_column_graph_plain_to_writer<W: Write>(
reports: &[ModelColumnLineage],
w: &mut W,
) -> io::Result<()> {
for (i, report) in reports.iter().enumerate() {
if i > 0 {
writeln!(w)?;
}
writeln!(
w,
"{} ({}/{} columns traced)",
report.model, report.traced_columns, report.total_columns
)?;
if report.columns.is_empty() {
continue;
}
let col_width = report
.columns
.iter()
.map(|e| e.column.len())
.max()
.unwrap_or(0);
let indent = " ".repeat(2 + col_width + 5);
for entry in &report.columns {
if entry.sources.is_empty() {
writeln!(
w,
" {:width$} (no sources)",
entry.column,
width = col_width
)?;
continue;
}
let first = &entry.sources[0];
let src_str = format_source(
first.table.as_str(),
first.column.as_str(),
&first.model_path,
);
writeln!(
w,
" {:width$} → {} ({})",
entry.column,
src_str,
transformation_label(&entry.transformation),
width = col_width
)?;
for src in entry.sources.iter().skip(1) {
let src_str =
format_source(src.table.as_str(), src.column.as_str(), &src.model_path);
writeln!(
w,
"{} {} ({})",
indent,
src_str,
transformation_label(&entry.transformation),
)?;
}
}
}
Ok(())
}
pub fn render_column_graph_mermaid(reports: &[ModelColumnLineage]) {
super::handle_stdout_result(render_column_graph_mermaid_to_writer(
reports,
&mut std::io::stdout().lock(),
));
}
pub fn render_column_graph_mermaid_to_writer<W: Write>(
reports: &[ModelColumnLineage],
w: &mut W,
) -> io::Result<()> {
writeln!(w, "flowchart LR")?;
let mut model_columns: BTreeMap<String, BTreeSet<String>> = BTreeMap::new();
let mut edges: Vec<(String, String, String, String, String)> = Vec::new();
for report in reports {
let target_model = &report.model;
model_columns.entry(target_model.clone()).or_default();
for entry in &report.columns {
model_columns
.entry(target_model.clone())
.or_default()
.insert(entry.column.clone());
for src in &entry.sources {
model_columns
.entry(src.table.clone())
.or_default()
.insert(src.column.clone());
let via_str = if src.model_path.is_empty() {
String::new()
} else {
let escaped: Vec<String> = src
.model_path
.iter()
.map(|m| super::mermaid_escape(m))
.collect();
format!(" (via {})", escaped.join(" \u{2192} "))
};
let label = format!("{}{}", transformation_label(&entry.transformation), via_str);
edges.push((
src.table.clone(),
src.column.clone(),
target_model.clone(),
entry.column.clone(),
label,
));
}
}
}
let model_index: BTreeMap<&str, usize> = model_columns
.keys()
.enumerate()
.map(|(i, k)| (k.as_str(), i))
.collect();
let column_index: BTreeMap<&str, BTreeMap<&str, usize>> = model_columns
.iter()
.map(|(model, cols)| {
let col_map = cols
.iter()
.enumerate()
.map(|(i, c)| (c.as_str(), i))
.collect();
(model.as_str(), col_map)
})
.collect();
for (model, columns) in &model_columns {
let midx = model_index[model.as_str()];
writeln!(
w,
" subgraph sg{}[\"{}\"]",
midx,
super::mermaid_escape(model)
)?;
for (cidx, col) in columns.iter().enumerate() {
writeln!(
w,
" n{}_{}[\"{}\"]",
midx,
cidx,
super::mermaid_escape(col)
)?;
}
writeln!(w, " end")?;
}
writeln!(w)?;
let mut seen: BTreeSet<String> = BTreeSet::new();
for (from_model, from_col, to_model, to_col, label) in &edges {
let from_node = indexed_node_id(&model_index, &column_index, from_model, from_col);
let to_node = indexed_node_id(&model_index, &column_index, to_model, to_col);
let edge_str = format!(" {} -->|\"{}\"|{}", from_node, label, to_node);
if seen.insert(edge_str.clone()) {
writeln!(w, "{}", edge_str)?;
}
}
Ok(())
}
pub fn render_column_impact_plain(reports: &[ColumnImpactReport]) {
super::handle_stdout_result(render_column_impact_plain_to_writer(
reports,
&mut std::io::stdout().lock(),
));
}
pub fn render_column_impact_plain_to_writer<W: Write>(
reports: &[ColumnImpactReport],
w: &mut W,
) -> io::Result<()> {
for (i, report) in reports.iter().enumerate() {
if i > 0 {
writeln!(w)?;
}
writeln!(
w,
"{}.{} ({} impacted column{})",
report.model,
report.column,
report.impacted_columns.len(),
if report.impacted_columns.len() == 1 {
""
} else {
"s"
},
)?;
if report.impacted_columns.is_empty() {
continue;
}
let col_width = report
.impacted_columns
.iter()
.map(|c| c.column.len())
.max()
.unwrap_or(0);
for ic in &report.impacted_columns {
let via_str = if ic.model_path.len() > 1 {
let intermediate = &ic.model_path[..ic.model_path.len() - 1];
format!(", via {}", intermediate.join(" → "))
} else {
String::new()
};
writeln!(
w,
" {:width$} → {} ({}{})",
ic.column,
ic.model,
transformation_label(&ic.transformation),
via_str,
width = col_width
)?;
}
}
Ok(())
}
pub fn render_column_impact_mermaid(reports: &[ColumnImpactReport]) {
super::handle_stdout_result(render_column_impact_mermaid_to_writer(
reports,
&mut std::io::stdout().lock(),
));
}
pub fn render_column_impact_mermaid_to_writer<W: Write>(
reports: &[ColumnImpactReport],
w: &mut W,
) -> io::Result<()> {
writeln!(w, "flowchart LR")?;
let mut model_columns: BTreeMap<String, BTreeSet<String>> = BTreeMap::new();
let mut edges: Vec<(String, String, String, String, String)> = Vec::new();
for report in reports {
model_columns
.entry(report.model.clone())
.or_default()
.insert(report.column.clone());
for ic in &report.impacted_columns {
model_columns
.entry(ic.model.clone())
.or_default()
.insert(ic.column.clone());
let via_str = if ic.model_path.len() > 1 {
let intermediate = &ic.model_path[..ic.model_path.len() - 1];
let escaped: Vec<String> = intermediate
.iter()
.map(|m| super::mermaid_escape(m))
.collect();
format!(" (via {})", escaped.join(" \u{2192} "))
} else {
String::new()
};
let label = format!("{}{}", transformation_label(&ic.transformation), via_str);
edges.push((
report.model.clone(),
report.column.clone(),
ic.model.clone(),
ic.column.clone(),
label,
));
}
}
let model_index: BTreeMap<&str, usize> = model_columns
.keys()
.enumerate()
.map(|(i, k)| (k.as_str(), i))
.collect();
let column_index: BTreeMap<&str, BTreeMap<&str, usize>> = model_columns
.iter()
.map(|(model, cols)| {
let col_map = cols
.iter()
.enumerate()
.map(|(i, c)| (c.as_str(), i))
.collect();
(model.as_str(), col_map)
})
.collect();
for (model, columns) in &model_columns {
let midx = model_index[model.as_str()];
writeln!(
w,
" subgraph sg{}[\"{}\"]",
midx,
super::mermaid_escape(model)
)?;
for (cidx, col) in columns.iter().enumerate() {
writeln!(
w,
" n{}_{}[\"{}\"]",
midx,
cidx,
super::mermaid_escape(col)
)?;
}
writeln!(w, " end")?;
}
writeln!(w)?;
let mut seen: BTreeSet<String> = BTreeSet::new();
for (from_model, from_col, to_model, to_col, label) in &edges {
let from_node = indexed_node_id(&model_index, &column_index, from_model, from_col);
let to_node = indexed_node_id(&model_index, &column_index, to_model, to_col);
let edge_str = format!(" {} -->|\"{}\"|{}", from_node, label, to_node);
if seen.insert(edge_str.clone()) {
writeln!(w, "{}", edge_str)?;
}
}
Ok(())
}
fn transformation_label(t: &TransformationType) -> &'static str {
match t {
TransformationType::Direct => "direct",
TransformationType::Aggregation => "aggregation",
TransformationType::Expression => "expression",
TransformationType::Cast => "cast",
TransformationType::Conditional => "conditional",
TransformationType::Unknown => "unknown",
}
}
fn format_source(table: &str, column: &str, model_path: &[String]) -> String {
if model_path.is_empty() {
format!("{}.{}", table, column)
} else {
format!("{}.{} via {}", table, column, model_path.join(" → "))
}
}
fn indexed_node_id(
model_index: &BTreeMap<&str, usize>,
column_index: &BTreeMap<&str, BTreeMap<&str, usize>>,
model: &str,
col: &str,
) -> String {
let midx = model_index
.get(model)
.copied()
.expect("model must be registered before calling indexed_node_id");
let cidx = column_index
.get(model)
.and_then(|m| m.get(col).copied())
.expect("column must be registered before calling indexed_node_id");
format!("n{}_{}", midx, cidx)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::column_lineage::{
ColumnLineageEntry, ColumnSource, ImpactedColumn, ModelColumnLineage,
};
fn make_lineage(
model: &str,
entries: Vec<(&str, TransformationType, Vec<(&str, &str)>)>,
) -> ModelColumnLineage {
let traced = entries.len();
let total = entries.len();
ModelColumnLineage {
model: model.to_string(),
traced_columns: traced,
total_columns: total,
columns: entries
.into_iter()
.map(|(col, trans, sources)| ColumnLineageEntry {
column: col.to_string(),
transformation: trans,
sources: sources
.into_iter()
.map(|(table, column)| ColumnSource {
table: table.to_string(),
column: column.to_string(),
model_path: vec![],
})
.collect(),
})
.collect(),
errors: vec![],
}
}
fn graph_plain(reports: &[ModelColumnLineage]) -> String {
let mut buf = Vec::new();
render_column_graph_plain_to_writer(reports, &mut buf).unwrap();
String::from_utf8(buf).unwrap()
}
fn graph_mermaid(reports: &[ModelColumnLineage]) -> String {
let mut buf = Vec::new();
render_column_graph_mermaid_to_writer(reports, &mut buf).unwrap();
String::from_utf8(buf).unwrap()
}
fn impact_plain(reports: &[ColumnImpactReport]) -> String {
let mut buf = Vec::new();
render_column_impact_plain_to_writer(reports, &mut buf).unwrap();
String::from_utf8(buf).unwrap()
}
fn impact_mermaid(reports: &[ColumnImpactReport]) -> String {
let mut buf = Vec::new();
render_column_impact_mermaid_to_writer(reports, &mut buf).unwrap();
String::from_utf8(buf).unwrap()
}
#[test]
fn test_plain_single_model() {
let report = make_lineage(
"orders",
vec![
(
"order_id",
TransformationType::Direct,
vec![("stg_orders", "order_id")],
),
(
"total",
TransformationType::Expression,
vec![("stg_orders", "price")],
),
],
);
insta::assert_snapshot!(graph_plain(&[report]));
}
#[test]
fn test_plain_no_sources() {
let report = ModelColumnLineage {
model: "orders".to_string(),
traced_columns: 0,
total_columns: 1,
columns: vec![ColumnLineageEntry {
column: "id".to_string(),
transformation: TransformationType::Unknown,
sources: vec![],
}],
errors: vec![],
};
insta::assert_snapshot!(graph_plain(&[report]));
}
#[test]
fn test_mermaid_single_model() {
let report = make_lineage(
"orders",
vec![(
"order_id",
TransformationType::Direct,
vec![("stg_orders", "order_id")],
)],
);
insta::assert_snapshot!(graph_mermaid(&[report]));
}
#[test]
fn test_mermaid_dotted_table_name() {
let report = make_lineage(
"orders",
vec![("id", TransformationType::Direct, vec![("raw.orders", "id")])],
);
insta::assert_snapshot!(graph_mermaid(&[report]));
}
#[test]
fn test_mermaid_id_collision_avoided() {
let report = make_lineage(
"raw_orders",
vec![("id", TransformationType::Direct, vec![("raw.orders", "id")])],
);
insta::assert_snapshot!(graph_mermaid(&[report]));
}
#[test]
fn test_mermaid_label_escaping() {
let report = make_lineage(
"orders",
vec![(
"amount<usd>",
TransformationType::Direct,
vec![("raw.orders", "amount<usd>")],
)],
);
insta::assert_snapshot!(graph_mermaid(&[report]));
}
#[test]
fn test_impact_plain() {
let report = ColumnImpactReport {
model: "stg_orders".to_string(),
column: "order_id".to_string(),
impacted_columns: vec![ImpactedColumn {
unique_id: "model.orders".to_string(),
model: "orders".to_string(),
column: "order_id".to_string(),
transformation: TransformationType::Direct,
model_path: vec!["orders".to_string()],
}],
errors: vec![],
};
insta::assert_snapshot!(impact_plain(&[report]));
}
#[test]
fn test_impact_plain_multi_hop() {
let report = ColumnImpactReport {
model: "stg_orders".to_string(),
column: "order_id".to_string(),
impacted_columns: vec![ImpactedColumn {
unique_id: "model.customers".to_string(),
model: "customers".to_string(),
column: "customer_order_id".to_string(),
transformation: TransformationType::Direct,
model_path: vec!["orders".to_string(), "customers".to_string()],
}],
errors: vec![],
};
insta::assert_snapshot!(impact_plain(&[report]));
}
#[test]
fn test_impact_mermaid() {
let report = ColumnImpactReport {
model: "stg_orders".to_string(),
column: "order_id".to_string(),
impacted_columns: vec![ImpactedColumn {
unique_id: "model.orders".to_string(),
model: "orders".to_string(),
column: "order_id".to_string(),
transformation: TransformationType::Direct,
model_path: vec!["orders".to_string()],
}],
errors: vec![],
};
insta::assert_snapshot!(impact_mermaid(&[report]));
}
#[test]
fn test_impact_mermaid_indirect_edge_label() {
let report = ColumnImpactReport {
model: "stg_orders".to_string(),
column: "order_id".to_string(),
impacted_columns: vec![ImpactedColumn {
unique_id: "model.customers".to_string(),
model: "customers".to_string(),
column: "customer_order_id".to_string(),
transformation: TransformationType::Direct,
model_path: vec!["orders".to_string(), "customers".to_string()],
}],
errors: vec![],
};
insta::assert_snapshot!(impact_mermaid(&[report]));
}
}