nornir 0.4.31

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
//! Historize the coarsened [`super::ArchGraph`] in the Iceberg warehouse,
//! keyed by git sha — the ARCH4 storage seam.
//!
//! Every `nornir arch` generation lands one row in the `architecture_wiring`
//! Iceberg table ([`crate::warehouse::iceberg_schema::architecture_wiring`]),
//! carrying the serialized graph JSON, the rendered SVG, and counts, so any
//! past architecture (at a given git sha) is recoverable by a warehouse read —
//! exactly mirroring [`crate::docs::warehouse`] (doc_exports).
//!
//! Writes are deduplicated on `(repo, git_sha, graph_sha256)`: a re-generation
//! of byte-identical graph content returns the existing row instead of
//! appending a duplicate (best-effort read-then-append, like doc_exports).

use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::array::{
    Array, Int64Array, LargeBinaryArray, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use sha2::{Digest, Sha256};
use uuid::Uuid;

use super::ArchGraph;
use crate::warehouse::iceberg::{append_batch, IcebergWarehouse, TABLE_ARCHITECTURE_WIRING};

/// One historized architecture-wiring snapshot (metadata; the graph JSON +
/// SVG bytes live in the warehouse and round-trip via [`load_arch_wiring`]).
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ArchWiringRecord {
    pub wiring_id: String,
    pub workspace: String,
    pub repo: String,
    pub git_sha: String,
    pub node_count: i64,
    pub edge_count: i64,
    pub graph_sha256: String,
    /// RFC3339 UTC timestamp.
    pub generated_at: String,
}

fn sha256_hex(bytes: &[u8]) -> String {
    let mut h = Sha256::new();
    h.update(bytes);
    let out = h.finalize();
    let mut s = String::with_capacity(out.len() * 2);
    for b in out {
        use std::fmt::Write;
        let _ = write!(s, "{b:02x}");
    }
    s
}

fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
    batch
        .column_by_name(name)
        .ok_or_else(|| anyhow!("projected batch missing column `{name}`"))?
        .as_any()
        .downcast_ref::<T>()
        .ok_or_else(|| anyhow!("column `{name}` has unexpected arrow type"))
}

fn rfc3339(ts_micros: i64) -> String {
    DateTime::<Utc>::from_timestamp_micros(ts_micros)
        .unwrap_or_else(Utc::now)
        .to_rfc3339()
}

/// Record one architecture-wiring snapshot. The `graph` is serialized to JSON
/// (the canonical form), `svg` is the rendered board. Deduplicated by
/// `(repo, git_sha, graph_sha256)` where the hash is over the graph JSON.
pub async fn record_arch_wiring_async(
    wh: &IcebergWarehouse,
    workspace: &str,
    repo: &str,
    git_sha: &str,
    graph: &ArchGraph,
    svg: &str,
) -> Result<ArchWiringRecord> {
    let graph_json = serde_json::to_vec(graph)?;
    let graph_sha256 = sha256_hex(&graph_json);
    if let Some(existing) = find_existing(wh, repo, git_sha, &graph_sha256).await? {
        return Ok(existing);
    }

    let wiring_id = Uuid::new_v4().to_string();
    let ts = Utc::now();
    let node_count = graph.nodes.len() as i64;
    let edge_count = graph.edges.len() as i64;

    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
        .await?;
    let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(StringArray::from(vec![wiring_id.clone()])),
        Arc::new(StringArray::from(vec![workspace.to_string()])),
        Arc::new(StringArray::from(vec![repo.to_string()])),
        Arc::new(StringArray::from(vec![git_sha.to_string()])),
        Arc::new(
            TimestampMicrosecondArray::from(vec![ts.timestamp_micros()]).with_timezone("+00:00"),
        ),
        Arc::new(Int64Array::from(vec![node_count])),
        Arc::new(Int64Array::from(vec![edge_count])),
        Arc::new(StringArray::from(vec![graph_sha256.clone()])),
        Arc::new(LargeBinaryArray::from(vec![graph_json.as_slice()])),
        Arc::new(LargeBinaryArray::from(vec![svg.as_bytes()])),
    ];
    let batch = RecordBatch::try_new(schema, cols)?;
    append_batch(wh.catalog(), table, batch).await?;

    Ok(ArchWiringRecord {
        wiring_id,
        workspace: workspace.to_string(),
        repo: repo.to_string(),
        git_sha: git_sha.to_string(),
        node_count,
        edge_count,
        graph_sha256,
        generated_at: ts.to_rfc3339(),
    })
}

/// Sync wrapper for the CLI.
pub fn record_arch_wiring(
    wh: &IcebergWarehouse,
    workspace: &str,
    repo: &str,
    git_sha: &str,
    graph: &ArchGraph,
    svg: &str,
) -> Result<ArchWiringRecord> {
    wh.block_on(record_arch_wiring_async(wh, workspace, repo, git_sha, graph, svg))
}

async fn find_existing(
    wh: &IcebergWarehouse,
    repo: &str,
    git_sha: &str,
    graph_sha256: &str,
) -> Result<Option<ArchWiringRecord>> {
    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
        .await?;
    let predicate = Reference::new("repo")
        .equal_to(Datum::string(repo))
        .and(Reference::new("git_sha").equal_to(Datum::string(git_sha)))
        .and(Reference::new("graph_sha256").equal_to(Datum::string(graph_sha256)));
    let scan = table
        .scan()
        .with_filter(predicate)
        .select([
            "wiring_id", "workspace", "repo", "git_sha", "ts_micros", "node_count", "edge_count",
            "graph_sha256",
        ])
        .build()?;
    let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
    for batch in &batches {
        for r in rows_from_batch(batch)? {
            // Residual check (pushdown prunes at file granularity).
            if r.repo == repo && r.git_sha == git_sha && r.graph_sha256 == graph_sha256 {
                return Ok(Some(r));
            }
        }
    }
    Ok(None)
}

/// List historized wiring snapshots for `repo`, newest first.
pub async fn list_arch_wiring_async(
    wh: &IcebergWarehouse,
    repo: &str,
) -> Result<Vec<ArchWiringRecord>> {
    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
        .await?;
    let scan = table
        .scan()
        .with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
        .select([
            "wiring_id", "workspace", "repo", "git_sha", "ts_micros", "node_count", "edge_count",
            "graph_sha256",
        ])
        .build()?;
    let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
    let mut out: Vec<(i64, ArchWiringRecord)> = Vec::new();
    for batch in &batches {
        let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
        for (i, r) in rows_from_batch(batch)?.into_iter().enumerate() {
            if r.repo != repo {
                continue;
            }
            out.push((ts.value(i), r));
        }
    }
    out.sort_by(|a, b| b.0.cmp(&a.0));
    Ok(out.into_iter().map(|(_, r)| r).collect())
}

/// Sync wrapper for the CLI.
pub fn list_arch_wiring(wh: &IcebergWarehouse, repo: &str) -> Result<Vec<ArchWiringRecord>> {
    wh.block_on(list_arch_wiring_async(wh, repo))
}

/// Load the full graph + SVG for one historized snapshot (by `wiring_id`).
pub async fn load_arch_wiring_async(
    wh: &IcebergWarehouse,
    wiring_id: &str,
) -> Result<Option<(ArchGraph, String)>> {
    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
        .await?;
    let scan = table
        .scan()
        .with_filter(Reference::new("wiring_id").equal_to(Datum::string(wiring_id)))
        .select(["wiring_id", "graph_json", "svg"])
        .build()?;
    let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
    for batch in &batches {
        let ids = col::<StringArray>(batch, "wiring_id")?;
        let graphs = col::<LargeBinaryArray>(batch, "graph_json")?;
        let svgs = col::<LargeBinaryArray>(batch, "svg")?;
        for i in 0..batch.num_rows() {
            if ids.value(i) != wiring_id {
                continue;
            }
            let graph: ArchGraph = serde_json::from_slice(graphs.value(i))?;
            let svg = String::from_utf8_lossy(svgs.value(i)).into_owned();
            return Ok(Some((graph, svg)));
        }
    }
    Ok(None)
}

/// Sync wrapper.
pub fn load_arch_wiring(
    wh: &IcebergWarehouse,
    wiring_id: &str,
) -> Result<Option<(ArchGraph, String)>> {
    wh.block_on(load_arch_wiring_async(wh, wiring_id))
}

/// Reconstruct metadata rows (no blobs) from a projected batch.
fn rows_from_batch(batch: &RecordBatch) -> Result<Vec<ArchWiringRecord>> {
    let ids = col::<StringArray>(batch, "wiring_id")?;
    let workspaces = col::<StringArray>(batch, "workspace")?;
    let repos = col::<StringArray>(batch, "repo")?;
    let shas = col::<StringArray>(batch, "git_sha")?;
    let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
    let ncount = col::<Int64Array>(batch, "node_count")?;
    let ecount = col::<Int64Array>(batch, "edge_count")?;
    let gsha = col::<StringArray>(batch, "graph_sha256")?;
    let mut out = Vec::with_capacity(batch.num_rows());
    for i in 0..batch.num_rows() {
        out.push(ArchWiringRecord {
            wiring_id: ids.value(i).to_string(),
            workspace: workspaces.value(i).to_string(),
            repo: repos.value(i).to_string(),
            git_sha: shas.value(i).to_string(),
            node_count: ncount.value(i),
            edge_count: ecount.value(i),
            graph_sha256: gsha.value(i).to_string(),
            generated_at: rfc3339(ts.value(i)),
        });
    }
    Ok(out)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::arch::{ArchEdge, ArchEdgeKind, ArchNode, NodeKind};

    fn wh() -> (tempfile::TempDir, IcebergWarehouse) {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();
        (dir, wh)
    }

    fn graph() -> ArchGraph {
        ArchGraph {
            nodes: vec![
                ArchNode { id: "component:TestTab".into(), label: "TestTab".into(), kind: NodeKind::Component },
                ArchNode { id: "table:test_results".into(), label: "test_results".into(), kind: NodeKind::Table },
            ],
            edges: vec![ArchEdge {
                from: "component:TestTab".into(),
                to: "table:test_results".into(),
                kind: ArchEdgeKind::Reads,
            }],
        }
    }

    #[test]
    fn record_then_list_and_load_round_trip() {
        let (_d, wh) = wh();
        let g = graph();
        let svg = g.to_svg();
        let rec = record_arch_wiring(&wh, "ws", "nornir", "abc123", &g, &svg).unwrap();
        assert_eq!(rec.node_count, 2);
        assert_eq!(rec.edge_count, 1);
        assert_eq!(rec.git_sha, "abc123");

        let rows = list_arch_wiring(&wh, "nornir").unwrap();
        assert_eq!(rows.len(), 1);
        assert_eq!(rows[0].wiring_id, rec.wiring_id);

        // Full round-trip: the stored graph + svg come back byte-exact.
        let (back_g, back_svg) = load_arch_wiring(&wh, &rec.wiring_id).unwrap().unwrap();
        assert_eq!(back_g, g, "graph round-trips exactly");
        assert_eq!(back_svg, svg, "svg round-trips exactly");
        assert!(back_svg.contains(">TestTab<"));
    }

    #[test]
    fn dedup_skips_identical_graph() {
        let (_d, wh) = wh();
        let g = graph();
        let svg = g.to_svg();
        let a = record_arch_wiring(&wh, "ws", "nornir", "sha", &g, &svg).unwrap();
        let b = record_arch_wiring(&wh, "ws", "nornir", "sha", &g, &svg).unwrap();
        assert_eq!(a.wiring_id, b.wiring_id, "identical graph reuses the row");
        assert_eq!(list_arch_wiring(&wh, "nornir").unwrap().len(), 1);
    }

    #[test]
    fn different_git_sha_makes_new_row() {
        let (_d, wh) = wh();
        let g = graph();
        let svg = g.to_svg();
        record_arch_wiring(&wh, "ws", "nornir", "sha1", &g, &svg).unwrap();
        record_arch_wiring(&wh, "ws", "nornir", "sha2", &g, &svg).unwrap();
        assert_eq!(list_arch_wiring(&wh, "nornir").unwrap().len(), 2, "new sha => new row");
    }
}