use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{
Array, Int64Array, LargeBinaryArray, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use sha2::{Digest, Sha256};
use uuid::Uuid;
use super::ArchGraph;
use crate::warehouse::iceberg::{append_batch, IcebergWarehouse, TABLE_ARCHITECTURE_WIRING};
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ArchWiringRecord {
pub wiring_id: String,
pub workspace: String,
pub repo: String,
pub git_sha: String,
pub node_count: i64,
pub edge_count: i64,
pub graph_sha256: String,
pub generated_at: String,
}
fn sha256_hex(bytes: &[u8]) -> String {
let mut h = Sha256::new();
h.update(bytes);
let out = h.finalize();
let mut s = String::with_capacity(out.len() * 2);
for b in out {
use std::fmt::Write;
let _ = write!(s, "{b:02x}");
}
s
}
fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
batch
.column_by_name(name)
.ok_or_else(|| anyhow!("projected batch missing column `{name}`"))?
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("column `{name}` has unexpected arrow type"))
}
fn rfc3339(ts_micros: i64) -> String {
DateTime::<Utc>::from_timestamp_micros(ts_micros)
.unwrap_or_else(Utc::now)
.to_rfc3339()
}
pub async fn record_arch_wiring_async(
wh: &IcebergWarehouse,
workspace: &str,
repo: &str,
git_sha: &str,
graph: &ArchGraph,
svg: &str,
) -> Result<ArchWiringRecord> {
let graph_json = serde_json::to_vec(graph)?;
let graph_sha256 = sha256_hex(&graph_json);
if let Some(existing) = find_existing(wh, repo, git_sha, &graph_sha256).await? {
return Ok(existing);
}
let wiring_id = Uuid::new_v4().to_string();
let ts = Utc::now();
let node_count = graph.nodes.len() as i64;
let edge_count = graph.edges.len() as i64;
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
.await?;
let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![wiring_id.clone()])),
Arc::new(StringArray::from(vec![workspace.to_string()])),
Arc::new(StringArray::from(vec![repo.to_string()])),
Arc::new(StringArray::from(vec![git_sha.to_string()])),
Arc::new(
TimestampMicrosecondArray::from(vec![ts.timestamp_micros()]).with_timezone("+00:00"),
),
Arc::new(Int64Array::from(vec![node_count])),
Arc::new(Int64Array::from(vec![edge_count])),
Arc::new(StringArray::from(vec![graph_sha256.clone()])),
Arc::new(LargeBinaryArray::from(vec![graph_json.as_slice()])),
Arc::new(LargeBinaryArray::from(vec![svg.as_bytes()])),
];
let batch = RecordBatch::try_new(schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(ArchWiringRecord {
wiring_id,
workspace: workspace.to_string(),
repo: repo.to_string(),
git_sha: git_sha.to_string(),
node_count,
edge_count,
graph_sha256,
generated_at: ts.to_rfc3339(),
})
}
pub fn record_arch_wiring(
wh: &IcebergWarehouse,
workspace: &str,
repo: &str,
git_sha: &str,
graph: &ArchGraph,
svg: &str,
) -> Result<ArchWiringRecord> {
wh.block_on(record_arch_wiring_async(wh, workspace, repo, git_sha, graph, svg))
}
async fn find_existing(
wh: &IcebergWarehouse,
repo: &str,
git_sha: &str,
graph_sha256: &str,
) -> Result<Option<ArchWiringRecord>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
.await?;
let predicate = Reference::new("repo")
.equal_to(Datum::string(repo))
.and(Reference::new("git_sha").equal_to(Datum::string(git_sha)))
.and(Reference::new("graph_sha256").equal_to(Datum::string(graph_sha256)));
let scan = table
.scan()
.with_filter(predicate)
.select([
"wiring_id", "workspace", "repo", "git_sha", "ts_micros", "node_count", "edge_count",
"graph_sha256",
])
.build()?;
let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
for batch in &batches {
for r in rows_from_batch(batch)? {
if r.repo == repo && r.git_sha == git_sha && r.graph_sha256 == graph_sha256 {
return Ok(Some(r));
}
}
}
Ok(None)
}
pub async fn list_arch_wiring_async(
wh: &IcebergWarehouse,
repo: &str,
) -> Result<Vec<ArchWiringRecord>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
.await?;
let scan = table
.scan()
.with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
.select([
"wiring_id", "workspace", "repo", "git_sha", "ts_micros", "node_count", "edge_count",
"graph_sha256",
])
.build()?;
let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
let mut out: Vec<(i64, ArchWiringRecord)> = Vec::new();
for batch in &batches {
let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
for (i, r) in rows_from_batch(batch)?.into_iter().enumerate() {
if r.repo != repo {
continue;
}
out.push((ts.value(i), r));
}
}
out.sort_by(|a, b| b.0.cmp(&a.0));
Ok(out.into_iter().map(|(_, r)| r).collect())
}
pub fn list_arch_wiring(wh: &IcebergWarehouse, repo: &str) -> Result<Vec<ArchWiringRecord>> {
wh.block_on(list_arch_wiring_async(wh, repo))
}
pub async fn load_arch_wiring_async(
wh: &IcebergWarehouse,
wiring_id: &str,
) -> Result<Option<(ArchGraph, String)>> {
let table = wh
.catalog()
.load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
.await?;
let scan = table
.scan()
.with_filter(Reference::new("wiring_id").equal_to(Datum::string(wiring_id)))
.select(["wiring_id", "graph_json", "svg"])
.build()?;
let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
for batch in &batches {
let ids = col::<StringArray>(batch, "wiring_id")?;
let graphs = col::<LargeBinaryArray>(batch, "graph_json")?;
let svgs = col::<LargeBinaryArray>(batch, "svg")?;
for i in 0..batch.num_rows() {
if ids.value(i) != wiring_id {
continue;
}
let graph: ArchGraph = serde_json::from_slice(graphs.value(i))?;
let svg = String::from_utf8_lossy(svgs.value(i)).into_owned();
return Ok(Some((graph, svg)));
}
}
Ok(None)
}
pub fn load_arch_wiring(
wh: &IcebergWarehouse,
wiring_id: &str,
) -> Result<Option<(ArchGraph, String)>> {
wh.block_on(load_arch_wiring_async(wh, wiring_id))
}
fn rows_from_batch(batch: &RecordBatch) -> Result<Vec<ArchWiringRecord>> {
let ids = col::<StringArray>(batch, "wiring_id")?;
let workspaces = col::<StringArray>(batch, "workspace")?;
let repos = col::<StringArray>(batch, "repo")?;
let shas = col::<StringArray>(batch, "git_sha")?;
let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
let ncount = col::<Int64Array>(batch, "node_count")?;
let ecount = col::<Int64Array>(batch, "edge_count")?;
let gsha = col::<StringArray>(batch, "graph_sha256")?;
let mut out = Vec::with_capacity(batch.num_rows());
for i in 0..batch.num_rows() {
out.push(ArchWiringRecord {
wiring_id: ids.value(i).to_string(),
workspace: workspaces.value(i).to_string(),
repo: repos.value(i).to_string(),
git_sha: shas.value(i).to_string(),
node_count: ncount.value(i),
edge_count: ecount.value(i),
graph_sha256: gsha.value(i).to_string(),
generated_at: rfc3339(ts.value(i)),
});
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::arch::{ArchEdge, ArchEdgeKind, ArchNode, NodeKind};
fn wh() -> (tempfile::TempDir, IcebergWarehouse) {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
(dir, wh)
}
fn graph() -> ArchGraph {
ArchGraph {
nodes: vec![
ArchNode { id: "component:TestTab".into(), label: "TestTab".into(), kind: NodeKind::Component },
ArchNode { id: "table:test_results".into(), label: "test_results".into(), kind: NodeKind::Table },
],
edges: vec![ArchEdge {
from: "component:TestTab".into(),
to: "table:test_results".into(),
kind: ArchEdgeKind::Reads,
}],
}
}
#[test]
fn record_then_list_and_load_round_trip() {
let (_d, wh) = wh();
let g = graph();
let svg = g.to_svg();
let rec = record_arch_wiring(&wh, "ws", "nornir", "abc123", &g, &svg).unwrap();
assert_eq!(rec.node_count, 2);
assert_eq!(rec.edge_count, 1);
assert_eq!(rec.git_sha, "abc123");
let rows = list_arch_wiring(&wh, "nornir").unwrap();
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].wiring_id, rec.wiring_id);
let (back_g, back_svg) = load_arch_wiring(&wh, &rec.wiring_id).unwrap().unwrap();
assert_eq!(back_g, g, "graph round-trips exactly");
assert_eq!(back_svg, svg, "svg round-trips exactly");
assert!(back_svg.contains(">TestTab<"));
}
#[test]
fn dedup_skips_identical_graph() {
let (_d, wh) = wh();
let g = graph();
let svg = g.to_svg();
let a = record_arch_wiring(&wh, "ws", "nornir", "sha", &g, &svg).unwrap();
let b = record_arch_wiring(&wh, "ws", "nornir", "sha", &g, &svg).unwrap();
assert_eq!(a.wiring_id, b.wiring_id, "identical graph reuses the row");
assert_eq!(list_arch_wiring(&wh, "nornir").unwrap().len(), 1);
}
#[test]
fn different_git_sha_makes_new_row() {
let (_d, wh) = wh();
let g = graph();
let svg = g.to_svg();
record_arch_wiring(&wh, "ws", "nornir", "sha1", &g, &svg).unwrap();
record_arch_wiring(&wh, "ws", "nornir", "sha2", &g, &svg).unwrap();
assert_eq!(list_arch_wiring(&wh, "nornir").unwrap().len(), 2, "new sha => new row");
}
}