use std::collections::{HashMap, HashSet};
use std::io::{IsTerminal, Write};
use path_slash::PathExt as _;
use petgraph::visit::{EdgeRef, IntoEdgeReferences};
use serde::Serialize;
use serde_json::Value;
use crate::graph::types::*;
pub const GRAPH_NODE_FIELDS: &[&str] = &[
"unique_id",
"label",
"node_type",
"file_path",
"description",
"materialization",
"tags",
"columns",
"sql_content",
"exposure",
];
pub const GRAPH_DEFAULT_FIELDS: &[&str] = &["unique_id", "label", "node_type", "file_path"];
pub fn resolve_graph_fields(
json_fields: Option<&[String]>,
json_full: bool,
) -> Result<HashSet<String>, String> {
if json_full {
return Ok(GRAPH_NODE_FIELDS.iter().map(|s| (*s).to_string()).collect());
}
match json_fields {
Some(fields) => {
let known: HashSet<&str> = GRAPH_NODE_FIELDS.iter().copied().collect();
let mut unknown: Vec<&str> = Vec::new();
for f in fields {
if !known.contains(f.as_str()) {
unknown.push(f);
}
}
if !unknown.is_empty() {
return Err(format!(
"unknown JSON field(s): {}. Available fields: {}",
unknown.join(", "),
GRAPH_NODE_FIELDS.join(", "),
));
}
Ok(fields.iter().cloned().collect())
}
None => Ok(GRAPH_DEFAULT_FIELDS
.iter()
.map(|s| (*s).to_string())
.collect()),
}
}
#[derive(Serialize)]
struct JsonGraph {
nodes: Vec<Value>,
edges: Vec<JsonEdge>,
}
#[derive(Serialize)]
struct JsonEdge {
source: String,
target: String,
edge_type: String,
#[serde(skip_serializing_if = "Option::is_none")]
collapsed_through: Option<usize>,
}
pub fn build_node_value(
node: &NodeData,
fields: &HashSet<String>,
sql_contents: Option<&HashMap<String, String>>,
) -> Value {
let mut map = serde_json::Map::new();
if fields.contains("unique_id") {
map.insert("unique_id".into(), Value::String(node.unique_id.clone()));
}
if fields.contains("label") {
map.insert("label".into(), Value::String(node.label.clone()));
}
if fields.contains("node_type") {
map.insert(
"node_type".into(),
Value::String(node.node_type.label().to_string()),
);
}
if fields.contains("file_path") {
map.insert(
"file_path".into(),
match node.file_path {
Some(ref p) => Value::String(p.to_slash_lossy().into_owned()),
None => Value::Null,
},
);
}
if fields.contains("description") {
map.insert(
"description".into(),
match node.description {
Some(ref d) => Value::String(d.clone()),
None => Value::Null,
},
);
}
if fields.contains("materialization") {
map.insert(
"materialization".into(),
match node.materialization {
Some(ref m) => Value::String(m.clone()),
None => Value::Null,
},
);
}
if fields.contains("tags") {
map.insert(
"tags".into(),
Value::Array(node.tags.iter().map(|t| Value::String(t.clone())).collect()),
);
}
if fields.contains("columns") {
map.insert(
"columns".into(),
Value::Array(
node.columns
.iter()
.map(|c| Value::String(c.clone()))
.collect(),
),
);
}
if fields.contains("sql_content") {
map.insert(
"sql_content".into(),
match sql_contents.and_then(|m| m.get(&node.unique_id)) {
Some(sql) => Value::String(sql.clone()),
None => Value::Null,
},
);
}
if fields.contains("exposure") {
let opt_str = |v: &Option<String>| -> Value {
v.as_ref().map_or(Value::Null, |s| Value::String(s.clone()))
};
map.insert(
"exposure".into(),
match node.exposure {
Some(ref exp) => {
let mut exp_map = serde_json::Map::new();
exp_map.insert("label".into(), opt_str(&exp.label));
exp_map.insert("type".into(), opt_str(&exp.exposure_type));
exp_map.insert("url".into(), opt_str(&exp.url));
exp_map.insert("maturity".into(), opt_str(&exp.maturity));
exp_map.insert(
"owner".into(),
match exp.owner {
Some(ref o) => {
let mut owner_map = serde_json::Map::new();
owner_map.insert("name".into(), opt_str(&o.name));
owner_map.insert("email".into(), opt_str(&o.email));
Value::Object(owner_map)
}
None => Value::Null,
},
);
Value::Object(exp_map)
}
None => Value::Null,
},
);
}
Value::Object(map)
}
pub fn render_json(
graph: &LineageGraph,
sql_contents: Option<&HashMap<String, String>>,
fields: &HashSet<String>,
) {
let mut stdout = std::io::stdout().lock();
let pretty = stdout.is_terminal();
super::handle_stdout_result(render_json_to_writer(
graph,
sql_contents,
fields,
&mut stdout,
pretty,
));
}
fn render_json_to_writer<W: Write>(
graph: &LineageGraph,
sql_contents: Option<&HashMap<String, String>>,
fields: &HashSet<String>,
w: &mut W,
pretty: bool,
) -> std::io::Result<()> {
let mut nodes: Vec<(String, Value)> = graph
.node_indices()
.map(|idx| {
let node = &graph[idx];
let sort_key = node.unique_id.clone();
let value = build_node_value(node, fields, sql_contents);
(sort_key, value)
})
.collect();
nodes.sort_unstable_by(|a, b| a.0.cmp(&b.0));
let nodes: Vec<Value> = nodes.into_iter().map(|(_, v)| v).collect();
let mut edges: Vec<JsonEdge> = graph
.edge_references()
.map(|edge| {
let source = &graph[edge.source()];
let target = &graph[edge.target()];
JsonEdge {
source: source.unique_id.clone(),
target: target.unique_id.clone(),
edge_type: edge.weight().edge_type.label().to_string(),
collapsed_through: edge.weight().collapsed_through,
}
})
.collect();
edges.sort_unstable_by(|a, b| {
a.source
.cmp(&b.source)
.then(a.target.cmp(&b.target))
.then(a.edge_type.cmp(&b.edge_type))
});
let json_graph = JsonGraph { nodes, edges };
if pretty {
serde_json::to_writer_pretty(&mut *w, &json_graph).map_err(super::serde_io_error)?;
} else {
serde_json::to_writer(&mut *w, &json_graph).map_err(super::serde_io_error)?;
}
writeln!(w)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
use crate::render::test_helpers::make_node;
fn all_fields() -> HashSet<String> {
GRAPH_NODE_FIELDS.iter().map(|s| (*s).to_string()).collect()
}
fn render_to_string(graph: &LineageGraph) -> String {
let mut buf = Vec::new();
render_json_to_writer(graph, None, &all_fields(), &mut buf, true).unwrap();
String::from_utf8(buf).unwrap()
}
#[test]
fn test_empty_graph() {
let graph = LineageGraph::new();
let output = render_to_string(&graph);
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
assert_eq!(parsed["nodes"].as_array().unwrap().len(), 0);
assert_eq!(parsed["edges"].as_array().unwrap().len(), 0);
}
#[test]
fn test_single_node() {
let mut graph = LineageGraph::new();
graph.add_node(make_node("model.orders", "orders", NodeType::Model));
let output = render_to_string(&graph);
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let nodes = parsed["nodes"].as_array().unwrap();
assert_eq!(nodes.len(), 1);
assert_eq!(nodes[0]["unique_id"], "model.orders");
assert_eq!(nodes[0]["label"], "orders");
assert_eq!(nodes[0]["node_type"], "model");
assert!(nodes[0]["file_path"].is_null());
assert!(nodes[0]["description"].is_null());
}
#[test]
fn test_node_with_file_path_and_description() {
let mut graph = LineageGraph::new();
graph.add_node(NodeData {
unique_id: "model.orders".into(),
label: "orders".into(),
node_type: NodeType::Model,
file_path: Some(PathBuf::from("models/orders.sql")),
description: Some("Orders mart model".into()),
materialization: None,
tags: vec![],
columns: vec![],
exposure: None,
});
let output = render_to_string(&graph);
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let nodes = parsed["nodes"].as_array().unwrap();
assert_eq!(nodes[0]["file_path"], "models/orders.sql");
assert_eq!(nodes[0]["description"], "Orders mart model");
}
#[test]
fn test_edges() {
let mut graph = LineageGraph::new();
let a = graph.add_node(make_node(
"source.raw.orders",
"raw.orders",
NodeType::Source,
));
let b = graph.add_node(make_node("model.stg_orders", "stg_orders", NodeType::Model));
graph.add_edge(a, b, EdgeData::direct(EdgeType::Source));
let output = render_to_string(&graph);
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let edges = parsed["edges"].as_array().unwrap();
assert_eq!(edges.len(), 1);
assert_eq!(edges[0]["source"], "source.raw.orders");
assert_eq!(edges[0]["target"], "model.stg_orders");
assert_eq!(edges[0]["edge_type"], "source");
}
#[test]
fn test_all_edge_types() {
assert_eq!(EdgeType::Ref.label(), "ref");
assert_eq!(EdgeType::Source.label(), "source");
assert_eq!(EdgeType::Test.label(), "test");
assert_eq!(EdgeType::Exposure.label(), "exposure");
}
#[test]
fn test_all_node_types() {
let mut graph = LineageGraph::new();
let types = [
("model.a", NodeType::Model, "model"),
("source.a.b", NodeType::Source, "source"),
("seed.a", NodeType::Seed, "seed"),
("snapshot.a", NodeType::Snapshot, "snapshot"),
("test.a", NodeType::Test, "test"),
("exposure.a", NodeType::Exposure, "exposure"),
("model.unknown", NodeType::Phantom, "phantom"),
];
for (id, nt, _) in &types {
graph.add_node(make_node(id, "a", *nt));
}
let output = render_to_string(&graph);
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let nodes = parsed["nodes"].as_array().unwrap();
let mut actual: Vec<(&str, &str)> = nodes
.iter()
.map(|n| {
(
n["unique_id"].as_str().unwrap(),
n["node_type"].as_str().unwrap(),
)
})
.collect();
actual.sort();
let mut expected: Vec<(&str, &str)> = types.iter().map(|(id, _, t)| (*id, *t)).collect();
expected.sort();
assert_eq!(actual, expected);
}
#[test]
fn test_deterministic_node_order() {
let mut graph = LineageGraph::new();
graph.add_node(make_node("model.z_last", "z_last", NodeType::Model));
graph.add_node(make_node("model.a_first", "a_first", NodeType::Model));
graph.add_node(make_node("model.m_middle", "m_middle", NodeType::Model));
let output = render_to_string(&graph);
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let nodes = parsed["nodes"].as_array().unwrap();
assert_eq!(nodes[0]["unique_id"], "model.a_first");
assert_eq!(nodes[1]["unique_id"], "model.m_middle");
assert_eq!(nodes[2]["unique_id"], "model.z_last");
}
#[test]
fn test_deterministic_edge_order() {
let mut graph = LineageGraph::new();
let a = graph.add_node(make_node("model.a", "a", NodeType::Model));
let b = graph.add_node(make_node("model.b", "b", NodeType::Model));
let c = graph.add_node(make_node("model.c", "c", NodeType::Model));
graph.add_edge(c, a, EdgeData::direct(EdgeType::Ref));
graph.add_edge(a, b, EdgeData::direct(EdgeType::Ref));
graph.add_edge(a, c, EdgeData::direct(EdgeType::Ref));
let output = render_to_string(&graph);
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let edges = parsed["edges"].as_array().unwrap();
assert_eq!(edges[0]["source"], "model.a");
assert_eq!(edges[0]["target"], "model.b");
assert_eq!(edges[1]["source"], "model.a");
assert_eq!(edges[1]["target"], "model.c");
assert_eq!(edges[2]["source"], "model.c");
assert_eq!(edges[2]["target"], "model.a");
}
#[test]
fn test_valid_json() {
let mut graph = LineageGraph::new();
let a = graph.add_node(make_node("model.a", "a", NodeType::Model));
let b = graph.add_node(make_node("model.b", "b", NodeType::Model));
graph.add_edge(a, b, EdgeData::direct(EdgeType::Ref));
let output = render_to_string(&graph);
let _: serde_json::Value = serde_json::from_str(&output).unwrap();
}
#[test]
fn test_snapshot_lineage() {
let graph = crate::render::test_helpers::make_sample_lineage_graph();
let output = render_to_string(&graph);
insta::assert_snapshot!(output);
}
#[test]
fn test_snapshot_node_metadata() {
let mut graph = LineageGraph::new();
graph.add_node(NodeData {
unique_id: "model.orders".into(),
label: "orders".into(),
node_type: NodeType::Model,
file_path: Some(PathBuf::from("models/orders.sql")),
description: Some("Orders mart model".into()),
materialization: Some("table".into()),
tags: vec!["daily".into(), "core".into()],
columns: vec!["order_id".into(), "customer_id".into()],
exposure: None,
});
let output = render_to_string(&graph);
insta::assert_snapshot!(output);
}
#[test]
fn test_snapshot_json_with_sql() {
let mut graph = LineageGraph::new();
graph.add_node(NodeData {
unique_id: "model.orders".into(),
label: "orders".into(),
node_type: NodeType::Model,
file_path: Some(PathBuf::from("models/orders.sql")),
description: None,
materialization: Some("table".into()),
tags: vec![],
columns: vec![],
exposure: None,
});
graph.add_node(make_node(
"source.raw.orders",
"raw.orders",
NodeType::Source,
));
let sql_contents = HashMap::from([(
"model.orders".to_string(),
"SELECT * FROM {{ ref('stg_orders') }}".to_string(),
)]);
let mut buf = Vec::new();
render_json_to_writer(&graph, Some(&sql_contents), &all_fields(), &mut buf, true).unwrap();
let output = String::from_utf8(buf).unwrap();
insta::assert_snapshot!(output);
}
#[test]
fn test_compact_json_single_line() {
let mut graph = LineageGraph::new();
let a = graph.add_node(make_node("model.a", "a", NodeType::Model));
let b = graph.add_node(make_node("model.b", "b", NodeType::Model));
graph.add_edge(a, b, EdgeData::direct(EdgeType::Ref));
let mut buf = Vec::new();
render_json_to_writer(&graph, None, &all_fields(), &mut buf, false).unwrap();
let output = String::from_utf8(buf).unwrap();
let lines: Vec<&str> = output.trim_end().split('\n').collect();
assert_eq!(lines.len(), 1, "compact JSON should be a single line");
let _: serde_json::Value = serde_json::from_str(&output).unwrap();
}
#[test]
fn test_node_with_materialization_tags_columns() {
let mut graph = LineageGraph::new();
graph.add_node(NodeData {
unique_id: "model.orders".into(),
label: "orders".into(),
node_type: NodeType::Model,
file_path: None,
description: None,
materialization: Some("table".into()),
tags: vec!["daily".into(), "core".into()],
columns: vec!["order_id".into(), "customer_id".into()],
exposure: None,
});
let output = render_to_string(&graph);
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let node = &parsed["nodes"][0];
assert_eq!(node["materialization"], "table");
assert_eq!(node["tags"][0], "daily");
assert_eq!(node["tags"][1], "core");
assert_eq!(node["columns"][0], "order_id");
assert_eq!(node["columns"][1], "customer_id");
}
#[test]
fn test_transitive_edge_has_collapsed_through() {
let mut graph = LineageGraph::new();
let a = graph.add_node(make_node("source.raw.a", "a", NodeType::Source));
let b = graph.add_node(make_node("model.b", "b", NodeType::Model));
graph.add_edge(a, b, EdgeData::transitive(EdgeType::Source, 2));
let output = render_to_string(&graph);
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let edges = parsed["edges"].as_array().unwrap();
assert_eq!(edges.len(), 1);
assert_eq!(edges[0]["edge_type"], "source");
assert_eq!(edges[0]["collapsed_through"], 2);
}
#[test]
fn test_direct_edge_omits_collapsed_through() {
let mut graph = LineageGraph::new();
let a = graph.add_node(make_node("model.a", "a", NodeType::Model));
let b = graph.add_node(make_node("model.b", "b", NodeType::Model));
graph.add_edge(a, b, EdgeData::direct(EdgeType::Ref));
let output = render_to_string(&graph);
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let edges = parsed["edges"].as_array().unwrap();
assert!(edges[0].get("collapsed_through").is_none());
}
#[test]
fn test_default_fields_only() {
let mut graph = LineageGraph::new();
graph.add_node(NodeData {
unique_id: "model.orders".into(),
label: "orders".into(),
node_type: NodeType::Model,
file_path: Some(PathBuf::from("models/orders.sql")),
description: Some("desc".into()),
materialization: Some("table".into()),
tags: vec!["daily".into()],
columns: vec!["id".into()],
exposure: None,
});
let fields = resolve_graph_fields(None, false).unwrap();
let mut buf = Vec::new();
render_json_to_writer(&graph, None, &fields, &mut buf, false).unwrap();
let output = String::from_utf8(buf).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let node = &parsed["nodes"][0];
assert_eq!(node["unique_id"], "model.orders");
assert_eq!(node["label"], "orders");
assert_eq!(node["node_type"], "model");
assert_eq!(node["file_path"], "models/orders.sql");
assert!(node.get("description").is_none());
assert!(node.get("materialization").is_none());
assert!(node.get("tags").is_none());
assert!(node.get("columns").is_none());
assert!(node.get("exposure").is_none());
}
#[test]
fn test_custom_fields() {
let mut graph = LineageGraph::new();
graph.add_node(NodeData {
unique_id: "model.orders".into(),
label: "orders".into(),
node_type: NodeType::Model,
file_path: Some(PathBuf::from("models/orders.sql")),
description: Some("desc".into()),
materialization: Some("table".into()),
tags: vec![],
columns: vec![],
exposure: None,
});
let fields =
resolve_graph_fields(Some(&["unique_id".into(), "description".into()]), false).unwrap();
let mut buf = Vec::new();
render_json_to_writer(&graph, None, &fields, &mut buf, false).unwrap();
let output = String::from_utf8(buf).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let node = &parsed["nodes"][0];
assert_eq!(node["unique_id"], "model.orders");
assert_eq!(node["description"], "desc");
assert!(node.get("label").is_none());
assert!(node.get("node_type").is_none());
assert!(node.get("file_path").is_none());
}
#[test]
fn test_json_full_includes_all() {
let mut graph = LineageGraph::new();
graph.add_node(NodeData {
unique_id: "model.orders".into(),
label: "orders".into(),
node_type: NodeType::Model,
file_path: Some(PathBuf::from("models/orders.sql")),
description: Some("desc".into()),
materialization: Some("table".into()),
tags: vec!["daily".into()],
columns: vec!["id".into()],
exposure: None,
});
let fields = resolve_graph_fields(None, true).unwrap();
let mut buf = Vec::new();
render_json_to_writer(&graph, None, &fields, &mut buf, false).unwrap();
let output = String::from_utf8(buf).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let node = &parsed["nodes"][0];
assert_eq!(node["description"], "desc");
assert_eq!(node["materialization"], "table");
assert_eq!(node["tags"][0], "daily");
assert_eq!(node["columns"][0], "id");
}
#[test]
fn test_unknown_field_error() {
let result = resolve_graph_fields(Some(&["unique_id".into(), "nonexistent".into()]), false);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.contains("nonexistent"));
assert!(err.contains("Available fields"));
}
#[test]
fn test_exposure_fields_in_json() {
let mut graph = LineageGraph::new();
graph.add_node(NodeData {
unique_id: "exposure.dashboard".into(),
label: "dashboard".into(),
node_type: NodeType::Exposure,
file_path: None,
description: Some("Main dashboard".into()),
materialization: None,
tags: vec![],
columns: vec![],
exposure: Some(ExposureInfo {
label: Some("Main Dashboard".into()),
exposure_type: Some("dashboard".into()),
url: Some("https://bi.example.com".into()),
maturity: Some("high".into()),
owner: Some(OwnerInfo {
name: Some("Data Team".into()),
email: Some("data@example.com".into()),
}),
}),
});
let fields = resolve_graph_fields(None, true).unwrap();
let mut buf = Vec::new();
render_json_to_writer(&graph, None, &fields, &mut buf, true).unwrap();
let output = String::from_utf8(buf).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let node = &parsed["nodes"][0];
let exposure = &node["exposure"];
assert_eq!(exposure["label"], "Main Dashboard");
assert_eq!(exposure["type"], "dashboard");
assert_eq!(exposure["url"], "https://bi.example.com");
assert_eq!(exposure["maturity"], "high");
assert_eq!(exposure["owner"]["name"], "Data Team");
assert_eq!(exposure["owner"]["email"], "data@example.com");
}
#[test]
fn test_exposure_null_for_non_exposure_nodes() {
let mut graph = LineageGraph::new();
graph.add_node(make_node("model.orders", "orders", NodeType::Model));
let fields = resolve_graph_fields(None, true).unwrap();
let mut buf = Vec::new();
render_json_to_writer(&graph, None, &fields, &mut buf, true).unwrap();
let output = String::from_utf8(buf).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&output).unwrap();
let node = &parsed["nodes"][0];
assert!(node["exposure"].is_null());
}
}