nornir 0.4.42

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
//! Historize the coarsened [`super::ArchGraph`] in the Iceberg warehouse,
//! keyed by git sha — the ARCH4 storage seam.
//!
//! Every `nornir arch` generation lands one row in the `architecture_wiring`
//! Iceberg table ([`crate::warehouse::iceberg_schema::architecture_wiring`]),
//! carrying the serialized graph JSON, the rendered SVG, and counts, so any
//! past architecture (at a given git sha) is recoverable by a warehouse read —
//! exactly mirroring [`crate::docs::warehouse`] (doc_exports).
//!
//! Writes are deduplicated on `(repo, git_sha, graph_sha256)`: a re-generation
//! of byte-identical graph content returns the existing row instead of
//! appending a duplicate (best-effort read-then-append, like doc_exports).

use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::array::{
    Array, Int64Array, LargeBinaryArray, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::{DateTime, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use sha2::{Digest, Sha256};
use uuid::Uuid;

use super::ArchGraph;
use crate::warehouse::iceberg::{append_batch, IcebergWarehouse, TABLE_ARCHITECTURE_WIRING};

/// One historized architecture-wiring snapshot (metadata; the graph JSON +
/// SVG bytes live in the warehouse and round-trip via [`load_arch_wiring`]).
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ArchWiringRecord {
    pub wiring_id: String,
    pub workspace: String,
    pub repo: String,
    pub git_sha: String,
    pub node_count: i64,
    pub edge_count: i64,
    pub graph_sha256: String,
    /// RFC3339 UTC timestamp.
    pub generated_at: String,
}

fn sha256_hex(bytes: &[u8]) -> String {
    let mut h = Sha256::new();
    h.update(bytes);
    let out = h.finalize();
    let mut s = String::with_capacity(out.len() * 2);
    for b in out {
        use std::fmt::Write;
        let _ = write!(s, "{b:02x}");
    }
    s
}

fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
    batch
        .column_by_name(name)
        .ok_or_else(|| anyhow!("projected batch missing column `{name}`"))?
        .as_any()
        .downcast_ref::<T>()
        .ok_or_else(|| anyhow!("column `{name}` has unexpected arrow type"))
}

fn rfc3339(ts_micros: i64) -> String {
    DateTime::<Utc>::from_timestamp_micros(ts_micros)
        .unwrap_or_else(Utc::now)
        .to_rfc3339()
}

/// Record one architecture-wiring snapshot. The `graph` is serialized to JSON
/// (the canonical form), `svg` is the rendered board. Deduplicated by
/// `(repo, git_sha, graph_sha256)` where the hash is over the graph JSON.
pub async fn record_arch_wiring_async(
    wh: &IcebergWarehouse,
    workspace: &str,
    repo: &str,
    git_sha: &str,
    graph: &ArchGraph,
    svg: &str,
) -> Result<ArchWiringRecord> {
    let graph_json = serde_json::to_vec(graph)?;
    let graph_sha256 = sha256_hex(&graph_json);
    if let Some(existing) = find_existing(wh, repo, git_sha, &graph_sha256).await? {
        return Ok(existing);
    }

    let wiring_id = Uuid::new_v4().to_string();
    let ts = Utc::now();
    let node_count = graph.nodes.len() as i64;
    let edge_count = graph.edges.len() as i64;

    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
        .await?;
    let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(StringArray::from(vec![wiring_id.clone()])),
        Arc::new(StringArray::from(vec![workspace.to_string()])),
        Arc::new(StringArray::from(vec![repo.to_string()])),
        Arc::new(StringArray::from(vec![git_sha.to_string()])),
        Arc::new(
            TimestampMicrosecondArray::from(vec![ts.timestamp_micros()]).with_timezone("+00:00"),
        ),
        Arc::new(Int64Array::from(vec![node_count])),
        Arc::new(Int64Array::from(vec![edge_count])),
        Arc::new(StringArray::from(vec![graph_sha256.clone()])),
        Arc::new(LargeBinaryArray::from(vec![graph_json.as_slice()])),
        Arc::new(LargeBinaryArray::from(vec![svg.as_bytes()])),
    ];
    let batch = RecordBatch::try_new(schema, cols)?;
    append_batch(wh.catalog(), table, batch).await?;

    Ok(ArchWiringRecord {
        wiring_id,
        workspace: workspace.to_string(),
        repo: repo.to_string(),
        git_sha: git_sha.to_string(),
        node_count,
        edge_count,
        graph_sha256,
        generated_at: ts.to_rfc3339(),
    })
}

/// Sync wrapper for the CLI.
pub fn record_arch_wiring(
    wh: &IcebergWarehouse,
    workspace: &str,
    repo: &str,
    git_sha: &str,
    graph: &ArchGraph,
    svg: &str,
) -> Result<ArchWiringRecord> {
    wh.block_on(record_arch_wiring_async(wh, workspace, repo, git_sha, graph, svg))
}

async fn find_existing(
    wh: &IcebergWarehouse,
    repo: &str,
    git_sha: &str,
    graph_sha256: &str,
) -> Result<Option<ArchWiringRecord>> {
    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
        .await?;
    let predicate = Reference::new("repo")
        .equal_to(Datum::string(repo))
        .and(Reference::new("git_sha").equal_to(Datum::string(git_sha)))
        .and(Reference::new("graph_sha256").equal_to(Datum::string(graph_sha256)));
    let scan = table
        .scan()
        .with_filter(predicate)
        .select([
            "wiring_id", "workspace", "repo", "git_sha", "ts_micros", "node_count", "edge_count",
            "graph_sha256",
        ])
        .build()?;
    let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
    for batch in &batches {
        for r in rows_from_batch(batch)? {
            // Residual check (pushdown prunes at file granularity).
            if r.repo == repo && r.git_sha == git_sha && r.graph_sha256 == graph_sha256 {
                return Ok(Some(r));
            }
        }
    }
    Ok(None)
}

/// List historized wiring snapshots for `repo`, newest first.
pub async fn list_arch_wiring_async(
    wh: &IcebergWarehouse,
    repo: &str,
) -> Result<Vec<ArchWiringRecord>> {
    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
        .await?;
    let scan = table
        .scan()
        .with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
        .select([
            "wiring_id", "workspace", "repo", "git_sha", "ts_micros", "node_count", "edge_count",
            "graph_sha256",
        ])
        .build()?;
    let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
    let mut out: Vec<(i64, ArchWiringRecord)> = Vec::new();
    for batch in &batches {
        let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
        for (i, r) in rows_from_batch(batch)?.into_iter().enumerate() {
            if r.repo != repo {
                continue;
            }
            out.push((ts.value(i), r));
        }
    }
    out.sort_by(|a, b| b.0.cmp(&a.0));
    Ok(out.into_iter().map(|(_, r)| r).collect())
}

/// Sync wrapper for the CLI.
pub fn list_arch_wiring(wh: &IcebergWarehouse, repo: &str) -> Result<Vec<ArchWiringRecord>> {
    wh.block_on(list_arch_wiring_async(wh, repo))
}

/// Load the full graph + SVG for one historized snapshot (by `wiring_id`).
pub async fn load_arch_wiring_async(
    wh: &IcebergWarehouse,
    wiring_id: &str,
) -> Result<Option<(ArchGraph, String)>> {
    let table = wh
        .catalog()
        .load_table(&wh.table_ident(TABLE_ARCHITECTURE_WIRING))
        .await?;
    let scan = table
        .scan()
        .with_filter(Reference::new("wiring_id").equal_to(Datum::string(wiring_id)))
        .select(["wiring_id", "graph_json", "svg"])
        .build()?;
    let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
    for batch in &batches {
        let ids = col::<StringArray>(batch, "wiring_id")?;
        let graphs = col::<LargeBinaryArray>(batch, "graph_json")?;
        let svgs = col::<LargeBinaryArray>(batch, "svg")?;
        for i in 0..batch.num_rows() {
            if ids.value(i) != wiring_id {
                continue;
            }
            let graph: ArchGraph = serde_json::from_slice(graphs.value(i))?;
            let svg = String::from_utf8_lossy(svgs.value(i)).into_owned();
            return Ok(Some((graph, svg)));
        }
    }
    Ok(None)
}

/// Sync wrapper.
pub fn load_arch_wiring(
    wh: &IcebergWarehouse,
    wiring_id: &str,
) -> Result<Option<(ArchGraph, String)>> {
    wh.block_on(load_arch_wiring_async(wh, wiring_id))
}

/// Merge every workspace member's **latest** persisted architecture board into
/// ONE workspace-wide [`ArchGraph`] — the fat/local path of the 🏛 Architecture
/// viz tab. For each member we read its newest `architecture_wiring` row
/// ([`list_arch_wiring`], newest-first), load that snapshot's graph
/// ([`load_arch_wiring`]), and fold its nodes + edges into the merged graph.
///
/// Nodes dedupe on `id` (first wins; later members reusing an `id` keep the
/// first label/kind). Edges dedupe on `(from, to, kind)`. The merged nodes/edges
/// are returned in deterministic sorted order so the rendered board is stable
/// regardless of member iteration order.
///
/// Returns `(merged_graph, members_with_data, errors)`:
///   * `members_with_data` — the members that contributed at least one board
///     (so the pane's status line / placeholder can name who's covered).
///   * `errors` — per-member read failures as `"<member>: <message>"`, surfaced
///     in the pane WITHOUT blanking the board: a member that errors is skipped,
///     the rest still merge (empty/missing board for a member is NOT an error —
///     it simply contributes nothing and is absent from `members_with_data`).
pub fn merged_arch_from_warehouse(
    wh: &IcebergWarehouse,
    members: &[String],
) -> (ArchGraph, Vec<String>, Vec<String>) {
    use std::collections::BTreeSet;

    let mut node_ids: BTreeSet<String> = BTreeSet::new();
    let mut edge_keys: BTreeSet<(String, String, &'static str)> = BTreeSet::new();
    let mut merged = ArchGraph::default();
    let mut with_data: Vec<String> = Vec::new();
    let mut errors: Vec<String> = Vec::new();

    for member in members {
        // Newest snapshot for this member (list is newest-first).
        let latest = match list_arch_wiring(wh, member) {
            Ok(rows) => rows.into_iter().next(),
            Err(e) => {
                errors.push(format!("{member}: list wiring: {e:#}"));
                continue;
            }
        };
        let Some(rec) = latest else {
            // No board recorded for this member yet — not an error.
            continue;
        };
        match load_arch_wiring(wh, &rec.wiring_id) {
            Ok(Some((graph, _svg))) => {
                let mut contributed = false;
                for n in graph.nodes {
                    if node_ids.insert(n.id.clone()) {
                        merged.nodes.push(n);
                        contributed = true;
                    }
                }
                for e in graph.edges {
                    if edge_keys.insert((e.from.clone(), e.to.clone(), e.kind.as_str())) {
                        merged.edges.push(e);
                        contributed = true;
                    }
                }
                // A member that round-tripped a board counts as "with data" even
                // if every node/edge was already merged from an earlier member
                // (shared wiring), so its coverage is reported truthfully.
                let _ = contributed;
                with_data.push(member.clone());
            }
            Ok(None) => {
                errors.push(format!("{member}: wiring {} not found", rec.wiring_id));
            }
            Err(e) => {
                errors.push(format!("{member}: load wiring {}: {e:#}", rec.wiring_id));
            }
        }
    }

    // Deterministic order (the types are Ord) so the laid-out board is stable.
    merged.nodes.sort();
    merged.edges.sort();
    (merged, with_data, errors)
}

/// Reconstruct metadata rows (no blobs) from a projected batch.
fn rows_from_batch(batch: &RecordBatch) -> Result<Vec<ArchWiringRecord>> {
    let ids = col::<StringArray>(batch, "wiring_id")?;
    let workspaces = col::<StringArray>(batch, "workspace")?;
    let repos = col::<StringArray>(batch, "repo")?;
    let shas = col::<StringArray>(batch, "git_sha")?;
    let ts = col::<TimestampMicrosecondArray>(batch, "ts_micros")?;
    let ncount = col::<Int64Array>(batch, "node_count")?;
    let ecount = col::<Int64Array>(batch, "edge_count")?;
    let gsha = col::<StringArray>(batch, "graph_sha256")?;
    let mut out = Vec::with_capacity(batch.num_rows());
    for i in 0..batch.num_rows() {
        out.push(ArchWiringRecord {
            wiring_id: ids.value(i).to_string(),
            workspace: workspaces.value(i).to_string(),
            repo: repos.value(i).to_string(),
            git_sha: shas.value(i).to_string(),
            node_count: ncount.value(i),
            edge_count: ecount.value(i),
            graph_sha256: gsha.value(i).to_string(),
            generated_at: rfc3339(ts.value(i)),
        });
    }
    Ok(out)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::arch::{ArchEdge, ArchEdgeKind, ArchNode, NodeKind};
    use std::collections::BTreeSet;

    fn wh() -> (tempfile::TempDir, IcebergWarehouse) {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();
        (dir, wh)
    }

    fn graph() -> ArchGraph {
        ArchGraph {
            nodes: vec![
                ArchNode { id: "component:TestTab".into(), label: "TestTab".into(), kind: NodeKind::Component },
                ArchNode { id: "table:test_results".into(), label: "test_results".into(), kind: NodeKind::Table },
            ],
            edges: vec![ArchEdge {
                from: "component:TestTab".into(),
                to: "table:test_results".into(),
                kind: ArchEdgeKind::Reads,
            }],
        }
    }

    #[test]
    fn record_then_list_and_load_round_trip() {
        let (_d, wh) = wh();
        let g = graph();
        let svg = g.to_svg();
        let rec = record_arch_wiring(&wh, "ws", "nornir", "abc123", &g, &svg).unwrap();
        assert_eq!(rec.node_count, 2);
        assert_eq!(rec.edge_count, 1);
        assert_eq!(rec.git_sha, "abc123");

        let rows = list_arch_wiring(&wh, "nornir").unwrap();
        assert_eq!(rows.len(), 1);
        assert_eq!(rows[0].wiring_id, rec.wiring_id);

        // Full round-trip: the stored graph + svg come back byte-exact.
        let (back_g, back_svg) = load_arch_wiring(&wh, &rec.wiring_id).unwrap().unwrap();
        assert_eq!(back_g, g, "graph round-trips exactly");
        assert_eq!(back_svg, svg, "svg round-trips exactly");
        assert!(back_svg.contains(">TestTab<"));
    }

    #[test]
    fn dedup_skips_identical_graph() {
        let (_d, wh) = wh();
        let g = graph();
        let svg = g.to_svg();
        let a = record_arch_wiring(&wh, "ws", "nornir", "sha", &g, &svg).unwrap();
        let b = record_arch_wiring(&wh, "ws", "nornir", "sha", &g, &svg).unwrap();
        assert_eq!(a.wiring_id, b.wiring_id, "identical graph reuses the row");
        assert_eq!(list_arch_wiring(&wh, "nornir").unwrap().len(), 1);
    }

    #[test]
    fn merge_folds_latest_per_member_and_reports_coverage() {
        let (_d, wh) = wh();
        // Member `nornir` records two snapshots; the LATEST (sha2) must win.
        let g_old = graph(); // TestTab + test_results (reads)
        record_arch_wiring(&wh, "ws", "nornir", "sha1", &g_old, &g_old.to_svg()).unwrap();
        let g_new = ArchGraph {
            nodes: vec![
                ArchNode { id: "grpc:Viz.Architecture".into(), label: "Viz.Architecture".into(), kind: NodeKind::Grpc },
                ArchNode { id: "table:architecture_wiring".into(), label: "architecture_wiring".into(), kind: NodeKind::Table },
            ],
            edges: vec![ArchEdge {
                from: "grpc:Viz.Architecture".into(),
                to: "table:architecture_wiring".into(),
                kind: ArchEdgeKind::Writes,
            }],
        };
        record_arch_wiring(&wh, "ws", "nornir", "sha2", &g_new, &g_new.to_svg()).unwrap();
        // Member `knut` records a board that SHARES the wiring table node id
        // with nornir's latest (dedupe must keep one).
        let g_knut = ArchGraph {
            nodes: vec![
                ArchNode { id: "component:KnutTab".into(), label: "KnutTab".into(), kind: NodeKind::Component },
                ArchNode { id: "table:architecture_wiring".into(), label: "architecture_wiring".into(), kind: NodeKind::Table },
            ],
            edges: vec![ArchEdge {
                from: "component:KnutTab".into(),
                to: "table:architecture_wiring".into(),
                kind: ArchEdgeKind::Reads,
            }],
        };
        record_arch_wiring(&wh, "ws", "knut", "sha9", &g_knut, &g_knut.to_svg()).unwrap();

        let members = vec!["nornir".to_string(), "knut".to_string(), "absent".to_string()];
        let (merged, with_data, errors) = merged_arch_from_warehouse(&wh, &members);

        // LATEST nornir board (Viz.Architecture + wiring table) merged with knut's
        // (KnutTab + the SHARED wiring table) = 3 unique nodes (wiring table deduped).
        assert_eq!(merged.nodes.len(), 3, "shared wiring-table node deduped: {:?}", merged.nodes);
        let ids: BTreeSet<&str> = merged.nodes.iter().map(|n| n.id.as_str()).collect();
        assert!(ids.contains("grpc:Viz.Architecture"));
        assert!(ids.contains("component:KnutTab"));
        assert!(ids.contains("table:architecture_wiring"));
        // The OLD nornir snapshot's TestTab must NOT appear (latest-only).
        assert!(!ids.contains("component:TestTab"), "stale snapshot not merged");
        // 2 distinct edges (writes + reads to the same table).
        assert_eq!(merged.edges.len(), 2);
        // Coverage: nornir + knut contributed; `absent` did not, and is no error.
        assert_eq!(with_data, vec!["nornir".to_string(), "knut".to_string()]);
        assert!(errors.is_empty(), "missing member board is not an error: {errors:?}");
        // Deterministic order: nodes sorted.
        let mut sorted = merged.nodes.clone();
        sorted.sort();
        assert_eq!(merged.nodes, sorted, "merged nodes are deterministically ordered");
    }

    #[test]
    fn different_git_sha_makes_new_row() {
        let (_d, wh) = wh();
        let g = graph();
        let svg = g.to_svg();
        record_arch_wiring(&wh, "ws", "nornir", "sha1", &g, &svg).unwrap();
        record_arch_wiring(&wh, "ws", "nornir", "sha2", &g, &svg).unwrap();
        assert_eq!(list_arch_wiring(&wh, "nornir").unwrap().len(), 2, "new sha => new row");
    }
}