nornir 0.4.32

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
//! Iceberg writer + reader for the **autonom completeness gate**
//! (`surface_coverage`, AUT2 / n-005).
//!
//! The PURE model ([`CoverageRow`], [`CoverageSummary`], [`GateReport`], the
//! [`Verdict`] tags, [`rows_for`]/[`seed_allowlist`]/[`stale_allowlist_entries`])
//! lives in [`nornir_testmatrix::coverage`] and is re-exported here verbatim, so
//! every path (`nornir::warehouse::surface_coverage::{CoverageRow, …}`) is
//! stable. This module is the Iceberg-coupled write/read seam, mirroring
//! [`super::test_results`].
//!
//! Write path: [`append_surface_coverage`] appends one Iceberg snapshot per gate
//! run; all rows share one `ts_micros`.
//!
//! Read path: [`query_surface_coverage`] scans the table, scopes by
//! [`CoverageSelector`], and returns rows sorted by `surface_key`.
//!
//! Schema: [`super::iceberg_schema::surface_coverage`] — additive (ships via the
//! standard schema-evolution path; old binaries keep reading).

use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::array::{Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::Catalog;

use super::iceberg::{append_batch, ensure_table_schema, IcebergWarehouse, TABLE_SURFACE_COVERAGE};

// ─── re-export the pure model from the extracted crate ──────────────────────
pub use nornir_testmatrix::coverage::{
    rows_for, seed_allowlist, stale_allowlist_entries, AllowEntry, Allowlist, CoverageRow,
    CoverageSummary, GateReport, Verdict,
};

// Column order matches `iceberg_schema::surface_coverage`.
const COL_RUN_ID: usize = 0;
const COL_TS_MICROS: usize = 1;
const COL_WORKSPACE: usize = 2;
const COL_SURFACE_KEY: usize = 3;
const COL_KIND: usize = 4;
const COL_ID: usize = 5;
const COL_MODE: usize = 6;
const COL_VERDICT: usize = 7;
const COL_REASON: usize = 8;

/// Scope a read of the `surface_coverage` table.
#[derive(Debug, Clone)]
pub enum CoverageSelector {
    /// Every coverage row ever recorded.
    All,
    /// Only one gate run's rows.
    Run(String),
    /// Only one workspace's rows.
    Workspace(String),
}

/// Append a batch of coverage rows as ONE Iceberg snapshot. A no-op on an empty
/// slice. Each row already carries its `run_id`/`ts_micros`/`workspace`.
pub async fn append_surface_coverage(wh: &IcebergWarehouse, rows: &[CoverageRow]) -> Result<()> {
    if rows.is_empty() {
        return Ok(());
    }
    let ident = wh.table_ident(TABLE_SURFACE_COVERAGE);
    let table = wh.catalog().load_table(&ident).await?;
    // Forward-evolve a table created by an older binary (no-op once current) —
    // same safety contract as the other fact writers.
    let table = ensure_table_schema(
        wh.catalog(),
        &ident,
        table,
        &super::iceberg_schema::surface_coverage()?,
    )
    .await?;
    let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
        Arc::new(
            TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
                .with_timezone("+00:00"),
        ),
        Arc::new(StringArray::from(rows.iter().map(|r| r.workspace.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.surface_key.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.kind.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.id.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.mode.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.verdict.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.reason.clone()).collect::<Vec<_>>())),
    ];
    let batch = RecordBatch::try_new(arrow_schema, cols)?;
    append_batch(wh.catalog(), table, batch).await?;
    Ok(())
}

/// Read coverage rows, scoped by `sel`, sorted by `(ts_micros, surface_key)`.
pub async fn query_surface_coverage(
    wh: &IcebergWarehouse,
    sel: &CoverageSelector,
) -> Result<Vec<CoverageRow>> {
    let table = wh.catalog().load_table(&wh.table_ident(TABLE_SURFACE_COVERAGE)).await?;
    let scan = table.scan().build()?;
    let stream = scan.to_arrow().await?;
    let batches: Vec<RecordBatch> = stream.try_collect().await?;

    let mut out: Vec<CoverageRow> = Vec::new();
    for b in &batches {
        let run_id = col_str(b, COL_RUN_ID)?;
        let ts = col_ts(b, COL_TS_MICROS)?;
        let workspace = col_str(b, COL_WORKSPACE)?;
        let surface_key = col_str(b, COL_SURFACE_KEY)?;
        let kind = col_str(b, COL_KIND)?;
        let id = col_str(b, COL_ID)?;
        let mode = col_str(b, COL_MODE)?;
        let verdict = col_str(b, COL_VERDICT)?;
        let reason = opt_col_str(b, COL_REASON);
        for i in 0..b.num_rows() {
            let row = CoverageRow {
                run_id: run_id.value(i).to_string(),
                ts_micros: ts.value(i),
                workspace: workspace.value(i).to_string(),
                surface_key: surface_key.value(i).to_string(),
                kind: kind.value(i).to_string(),
                id: id.value(i).to_string(),
                mode: mode.value(i).to_string(),
                verdict: verdict.value(i).to_string(),
                reason: reason
                    .filter(|r| !r.is_null(i))
                    .map(|r| r.value(i).to_string())
                    .unwrap_or_default(),
            };
            let keep = match sel {
                CoverageSelector::All => true,
                CoverageSelector::Run(id) => &row.run_id == id,
                CoverageSelector::Workspace(w) => &row.workspace == w,
            };
            if keep {
                out.push(row);
            }
        }
    }
    out.sort_by(|a, b| (a.ts_micros, &a.surface_key).cmp(&(b.ts_micros, &b.surface_key)));
    Ok(out)
}

/// Read the LATEST gate run's coverage rows for a workspace (max `ts_micros`).
/// The viz Test pane + `test_coverage` tool read this to show the current
/// verdict. Empty when no gate run has been persisted for the workspace.
pub async fn latest_surface_coverage(
    wh: &IcebergWarehouse,
    workspace: &str,
) -> Result<Vec<CoverageRow>> {
    let all = query_surface_coverage(wh, &CoverageSelector::Workspace(workspace.to_string())).await?;
    // Find the most-recent run (max ts), then keep only its rows.
    let Some(latest_ts) = all.iter().map(|r| r.ts_micros).max() else {
        return Ok(Vec::new());
    };
    let run_id = all
        .iter()
        .filter(|r| r.ts_micros == latest_ts)
        .map(|r| r.run_id.clone())
        .next()
        .unwrap_or_default();
    Ok(all.into_iter().filter(|r| r.run_id == run_id).collect())
}

fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
    b.column(idx)
        .as_any()
        .downcast_ref::<StringArray>()
        .ok_or_else(|| anyhow!("surface_coverage col {idx} is not StringArray"))
}

fn col_ts<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a TimestampMicrosecondArray> {
    b.column(idx)
        .as_any()
        .downcast_ref::<TimestampMicrosecondArray>()
        .ok_or_else(|| anyhow!("surface_coverage col {idx} is not TimestampMicrosecondArray"))
}

fn opt_col_str(b: &RecordBatch, idx: usize) -> Option<&StringArray> {
    if idx >= b.num_columns() {
        return None;
    }
    b.column(idx).as_any().downcast_ref::<StringArray>()
}

#[cfg(test)]
mod tests {
    use super::*;
    use nornir_testmatrix::discover::{cli_commands, mcp_tools, viz_tabs, Surface};
    use std::collections::BTreeSet;

    fn sample_surface() -> Surface {
        let mut s = Surface::new();
        s.extend(viz_tabs(["Test"]))
            .extend(mcp_tools(["search"]))
            .extend(cli_commands(["doctor"]));
        s
    }

    /// LAW-1 round-trip: build a surface, difference it against a known covered +
    /// allowlist set, persist the per-node rows, read them back, and assert the
    /// exact verdicts + the rolled-up summary.
    #[test]
    fn append_query_round_trip_exact_verdicts() {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();

        let surface = sample_surface(); // 4 nodes
        let covered: BTreeSet<String> = ["viz_tab:Test@fat".to_string()].into_iter().collect();
        let allowlist = Allowlist {
            entries: vec![AllowEntry {
                key: "viz_tab:Test@thin".into(),
                reason: "RPC wiring tracked in n-006".into(),
            }],
        };
        let rows = rows_for("run1", "nornir", &surface, &covered, &allowlist, 1000);
        assert_eq!(rows.len(), 4);

        wh.block_on(append_surface_coverage(&wh, &rows)).unwrap();

        // Run scope returns exactly the 4 rows.
        let back = wh
            .block_on(query_surface_coverage(&wh, &CoverageSelector::Run("run1".into())))
            .unwrap();
        assert_eq!(back.len(), 4);

        // The covered node round-trips as covered with no reason.
        let fat = back.iter().find(|r| r.surface_key == "viz_tab:Test@fat").unwrap();
        assert_eq!(fat.verdict(), Verdict::Covered);
        assert_eq!(fat.reason, "");

        // The allowlisted node round-trips its reason exactly.
        let thin = back.iter().find(|r| r.surface_key == "viz_tab:Test@thin").unwrap();
        assert_eq!(thin.verdict(), Verdict::Allowlisted);
        assert_eq!(thin.reason, "RPC wiring tracked in n-006");

        // The two un-covered, un-allowlisted nodes are missing.
        let missing: Vec<_> = back.iter().filter(|r| r.verdict() == Verdict::Missing).collect();
        assert_eq!(missing.len(), 2);

        // The rolled-up summary matches.
        let summary = CoverageSummary::from_rows(&back);
        assert_eq!(summary.total, 4);
        assert_eq!(summary.covered, 1);
        assert_eq!(summary.allowlisted, 1);
        assert_eq!(summary.gap, 2);
        assert!(!summary.green);

        // Workspace scope crosses runs (only one here).
        let ws = wh
            .block_on(query_surface_coverage(&wh, &CoverageSelector::Workspace("nornir".into())))
            .unwrap();
        assert_eq!(ws.len(), 4);
    }

    /// `latest_surface_coverage` returns only the most-recent run's rows.
    #[test]
    fn latest_picks_newest_run() {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();
        let surface = sample_surface();

        // Run A (ts=1000): nothing covered → 4 missing.
        let a = rows_for("runA", "nornir", &surface, &BTreeSet::new(), &Allowlist::new(), 1000);
        wh.block_on(append_surface_coverage(&wh, &a)).unwrap();
        // Run B (ts=2000): all covered → 0 missing (the gate went green).
        let all_covered: BTreeSet<String> = surface.nodes.iter().map(|n| n.key_str()).collect();
        let b = rows_for("runB", "nornir", &surface, &all_covered, &Allowlist::new(), 2000);
        wh.block_on(append_surface_coverage(&wh, &b)).unwrap();

        let latest = wh.block_on(latest_surface_coverage(&wh, "nornir")).unwrap();
        assert_eq!(latest.len(), 4);
        assert!(latest.iter().all(|r| r.run_id == "runB"), "only the newest run's rows");
        let summary = CoverageSummary::from_rows(&latest);
        assert_eq!(summary.gap, 0);
        assert!(summary.green, "newest run is the green one");

        // A workspace with no rows yields an empty latest.
        let none = wh.block_on(latest_surface_coverage(&wh, "ghost-ws")).unwrap();
        assert!(none.is_empty());
    }

    #[test]
    fn empty_append_is_noop() {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();
        wh.block_on(append_surface_coverage(&wh, &[])).unwrap();
        let all = wh
            .block_on(query_surface_coverage(&wh, &CoverageSelector::All))
            .unwrap();
        assert!(all.is_empty());
    }
}