use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::Catalog;
use super::iceberg::{append_batch, ensure_table_schema, IcebergWarehouse, TABLE_SURFACE_COVERAGE};
pub use nornir_testmatrix::coverage::{
rows_for, seed_allowlist, stale_allowlist_entries, AllowEntry, Allowlist, CoverageRow,
CoverageSummary, GateReport, Verdict,
};
const COL_RUN_ID: usize = 0;
const COL_TS_MICROS: usize = 1;
const COL_WORKSPACE: usize = 2;
const COL_SURFACE_KEY: usize = 3;
const COL_KIND: usize = 4;
const COL_ID: usize = 5;
const COL_MODE: usize = 6;
const COL_VERDICT: usize = 7;
const COL_REASON: usize = 8;
#[derive(Debug, Clone)]
pub enum CoverageSelector {
All,
Run(String),
Workspace(String),
}
pub async fn append_surface_coverage(wh: &IcebergWarehouse, rows: &[CoverageRow]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let ident = wh.table_ident(TABLE_SURFACE_COVERAGE);
let table = wh.catalog().load_table(&ident).await?;
let table = ensure_table_schema(
wh.catalog(),
&ident,
table,
&super::iceberg_schema::surface_coverage()?,
)
.await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(rows.iter().map(|r| r.workspace.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.surface_key.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.kind.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.id.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.mode.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.verdict.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.reason.clone()).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
pub async fn query_surface_coverage(
wh: &IcebergWarehouse,
sel: &CoverageSelector,
) -> Result<Vec<CoverageRow>> {
let table = wh.catalog().load_table(&wh.table_ident(TABLE_SURFACE_COVERAGE)).await?;
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut out: Vec<CoverageRow> = Vec::new();
for b in &batches {
let run_id = col_str(b, COL_RUN_ID)?;
let ts = col_ts(b, COL_TS_MICROS)?;
let workspace = col_str(b, COL_WORKSPACE)?;
let surface_key = col_str(b, COL_SURFACE_KEY)?;
let kind = col_str(b, COL_KIND)?;
let id = col_str(b, COL_ID)?;
let mode = col_str(b, COL_MODE)?;
let verdict = col_str(b, COL_VERDICT)?;
let reason = opt_col_str(b, COL_REASON);
for i in 0..b.num_rows() {
let row = CoverageRow {
run_id: run_id.value(i).to_string(),
ts_micros: ts.value(i),
workspace: workspace.value(i).to_string(),
surface_key: surface_key.value(i).to_string(),
kind: kind.value(i).to_string(),
id: id.value(i).to_string(),
mode: mode.value(i).to_string(),
verdict: verdict.value(i).to_string(),
reason: reason
.filter(|r| !r.is_null(i))
.map(|r| r.value(i).to_string())
.unwrap_or_default(),
};
let keep = match sel {
CoverageSelector::All => true,
CoverageSelector::Run(id) => &row.run_id == id,
CoverageSelector::Workspace(w) => &row.workspace == w,
};
if keep {
out.push(row);
}
}
}
out.sort_by(|a, b| (a.ts_micros, &a.surface_key).cmp(&(b.ts_micros, &b.surface_key)));
Ok(out)
}
pub async fn latest_surface_coverage(
wh: &IcebergWarehouse,
workspace: &str,
) -> Result<Vec<CoverageRow>> {
let all = query_surface_coverage(wh, &CoverageSelector::Workspace(workspace.to_string())).await?;
let Some(latest_ts) = all.iter().map(|r| r.ts_micros).max() else {
return Ok(Vec::new());
};
let run_id = all
.iter()
.filter(|r| r.ts_micros == latest_ts)
.map(|r| r.run_id.clone())
.next()
.unwrap_or_default();
Ok(all.into_iter().filter(|r| r.run_id == run_id).collect())
}
fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
b.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("surface_coverage col {idx} is not StringArray"))
}
fn col_ts<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a TimestampMicrosecondArray> {
b.column(idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| anyhow!("surface_coverage col {idx} is not TimestampMicrosecondArray"))
}
fn opt_col_str(b: &RecordBatch, idx: usize) -> Option<&StringArray> {
if idx >= b.num_columns() {
return None;
}
b.column(idx).as_any().downcast_ref::<StringArray>()
}
#[cfg(test)]
mod tests {
use super::*;
use nornir_testmatrix::discover::{cli_commands, mcp_tools, viz_tabs, Surface};
use std::collections::BTreeSet;
fn sample_surface() -> Surface {
let mut s = Surface::new();
s.extend(viz_tabs(["Test"]))
.extend(mcp_tools(["search"]))
.extend(cli_commands(["doctor"]));
s
}
#[test]
fn append_query_round_trip_exact_verdicts() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let surface = sample_surface(); let covered: BTreeSet<String> = ["viz_tab:Test@fat".to_string()].into_iter().collect();
let allowlist = Allowlist {
entries: vec![AllowEntry {
key: "viz_tab:Test@thin".into(),
reason: "RPC wiring tracked in n-006".into(),
}],
};
let rows = rows_for("run1", "nornir", &surface, &covered, &allowlist, 1000);
assert_eq!(rows.len(), 4);
wh.block_on(append_surface_coverage(&wh, &rows)).unwrap();
let back = wh
.block_on(query_surface_coverage(&wh, &CoverageSelector::Run("run1".into())))
.unwrap();
assert_eq!(back.len(), 4);
let fat = back.iter().find(|r| r.surface_key == "viz_tab:Test@fat").unwrap();
assert_eq!(fat.verdict(), Verdict::Covered);
assert_eq!(fat.reason, "");
let thin = back.iter().find(|r| r.surface_key == "viz_tab:Test@thin").unwrap();
assert_eq!(thin.verdict(), Verdict::Allowlisted);
assert_eq!(thin.reason, "RPC wiring tracked in n-006");
let missing: Vec<_> = back.iter().filter(|r| r.verdict() == Verdict::Missing).collect();
assert_eq!(missing.len(), 2);
let summary = CoverageSummary::from_rows(&back);
assert_eq!(summary.total, 4);
assert_eq!(summary.covered, 1);
assert_eq!(summary.allowlisted, 1);
assert_eq!(summary.gap, 2);
assert!(!summary.green);
let ws = wh
.block_on(query_surface_coverage(&wh, &CoverageSelector::Workspace("nornir".into())))
.unwrap();
assert_eq!(ws.len(), 4);
}
#[test]
fn latest_picks_newest_run() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let surface = sample_surface();
let a = rows_for("runA", "nornir", &surface, &BTreeSet::new(), &Allowlist::new(), 1000);
wh.block_on(append_surface_coverage(&wh, &a)).unwrap();
let all_covered: BTreeSet<String> = surface.nodes.iter().map(|n| n.key_str()).collect();
let b = rows_for("runB", "nornir", &surface, &all_covered, &Allowlist::new(), 2000);
wh.block_on(append_surface_coverage(&wh, &b)).unwrap();
let latest = wh.block_on(latest_surface_coverage(&wh, "nornir")).unwrap();
assert_eq!(latest.len(), 4);
assert!(latest.iter().all(|r| r.run_id == "runB"), "only the newest run's rows");
let summary = CoverageSummary::from_rows(&latest);
assert_eq!(summary.gap, 0);
assert!(summary.green, "newest run is the green one");
let none = wh.block_on(latest_surface_coverage(&wh, "ghost-ws")).unwrap();
assert!(none.is_empty());
}
#[test]
fn empty_append_is_noop() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.block_on(append_surface_coverage(&wh, &[])).unwrap();
let all = wh
.block_on(query_surface_coverage(&wh, &CoverageSelector::All))
.unwrap();
assert!(all.is_empty());
}
}