nornir 0.4.20

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
Documentation
//! Iceberg writer + reader for the **C6 test-matrix** (`test_results`).
//!
//! The PURE model (`TestResultRow`, `TestSelector`, `RunSummary`, the `status`
//! tags, `render_matrix` / `summarize_runs` / `rows_to_json` / `new_run_id` /
//! `short_run`) was EXTRACTED (EPIC L) into the [`nornir_testmatrix`] crate and
//! is **re-exported here verbatim**, so every existing path
//! (`nornir::warehouse::test_results::{TestResultRow, render_matrix, …}`) keeps
//! working. This module keeps only the Iceberg-coupled write/read + the
//! [`TestSink`] implementation over the warehouse.
//!
//! Write path: [`append_test_results`] appends one Iceberg snapshot per run.
//! Each call shares one `run_id` + `ts_micros` across the run's rows.
//!
//! Read path: [`query_test_results`] scans the table, scopes by `run_id` or
//! `repo`, and returns rows sorted by `TestResultRow::key()`.
//!
//! Schema: [`super::iceberg_schema::test_results`] — carries the EPIC L
//! `aspect` + `metric` columns (nullable/defaulted: pre-L rows read back with
//! `aspect = "unit"`, `metric = 0.0`).

use std::sync::Arc;

use anyhow::{anyhow, Result};
use arrow::array::{
    Array, Float64Array, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;

use super::iceberg::{IcebergWarehouse, TABLE_TEST_RESULTS, append_batch};

// ─── re-export the pure model from the extracted crate ──────────────────
//
// These were defined here before EPIC L; they now live in `nornir-testmatrix`.
// Re-exporting keeps `nornir::warehouse::test_results::*` stable.
pub use nornir_testmatrix::model::{
    new_run_id, render_matrix, rows_to_json, short_run, status, summarize_runs, RunSummary,
    TestResultRow, TestSelector, ASPECT_UNIT,
};
pub use nornir_testmatrix::sink::TestSink;

// Column indices match `iceberg_schema::test_results` field order.
const COL_RUN_ID: usize = 0;
const COL_REPO: usize = 1;
const COL_SUITE: usize = 2;
const COL_TEST_NAME: usize = 3;
const COL_STATUS: usize = 4;
const COL_DURATION_MS: usize = 5;
const COL_TS_MICROS: usize = 6;
const COL_MESSAGE: usize = 7;
const COL_ASPECT: usize = 8;
const COL_METRIC: usize = 9;

/// Append a batch of `test_results` rows (one Iceberg snapshot).
pub async fn append_test_results(wh: &IcebergWarehouse, rows: &[TestResultRow]) -> Result<()> {
    if rows.is_empty() {
        return Ok(());
    }
    let table = wh.catalog().load_table(&wh.table_ident(TABLE_TEST_RESULTS)).await?;
    let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);

    let cols: Vec<Arc<dyn Array>> = vec![
        Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.repo.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.suite.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.test_name.clone()).collect::<Vec<_>>())),
        Arc::new(StringArray::from(rows.iter().map(|r| r.status.clone()).collect::<Vec<_>>())),
        Arc::new(Float64Array::from(rows.iter().map(|r| r.duration_ms).collect::<Vec<_>>())),
        Arc::new(
            TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
                .with_timezone("+00:00"),
        ),
        Arc::new(StringArray::from(rows.iter().map(|r| r.message.clone()).collect::<Vec<_>>())),
        // EPIC L: aspect + metric. Default a blank aspect to `unit` so a row
        // built without setting it still lands a valid value.
        Arc::new(StringArray::from(
            rows.iter()
                .map(|r| if r.aspect.is_empty() { ASPECT_UNIT.to_string() } else { r.aspect.clone() })
                .collect::<Vec<_>>(),
        )),
        Arc::new(Float64Array::from(rows.iter().map(|r| r.metric).collect::<Vec<_>>())),
    ];
    let batch = RecordBatch::try_new(arrow_schema, cols)?;
    append_batch(wh.catalog(), table, batch).await?;
    Ok(())
}

/// Read test results, scoped by `sel`, returned sorted by
/// `(ts_micros, run_id, suite, test_name)`.
pub async fn query_test_results(
    wh: &IcebergWarehouse,
    sel: &TestSelector,
) -> Result<Vec<TestResultRow>> {
    let table = wh.catalog().load_table(&wh.table_ident(TABLE_TEST_RESULTS)).await?;
    let scan = table.scan().build()?;
    let stream = scan.to_arrow().await?;
    let batches: Vec<RecordBatch> = stream.try_collect().await?;

    let mut out: Vec<TestResultRow> = Vec::new();
    for b in &batches {
        let run_id = col_str(b, COL_RUN_ID)?;
        let repo = col_str(b, COL_REPO)?;
        let suite = col_str(b, COL_SUITE)?;
        let test_name = col_str(b, COL_TEST_NAME)?;
        let st = col_str(b, COL_STATUS)?;
        let dur = col_f64(b, COL_DURATION_MS)?;
        let ts = col_ts(b, COL_TS_MICROS)?;
        let msg = col_str(b, COL_MESSAGE)?;
        // EPIC L columns are migration-safe: tables written before they existed
        // won't carry them, so read them optionally (default unit / 0.0).
        let aspect = opt_col_str(b, COL_ASPECT);
        let metric = opt_col_f64(b, COL_METRIC);
        for i in 0..b.num_rows() {
            let row = TestResultRow {
                run_id: run_id.value(i).to_string(),
                repo: repo.value(i).to_string(),
                suite: suite.value(i).to_string(),
                test_name: test_name.value(i).to_string(),
                status: st.value(i).to_string(),
                duration_ms: dur.value(i),
                ts_micros: ts.value(i),
                message: msg.value(i).to_string(),
                aspect: aspect
                    .filter(|a| !a.is_null(i))
                    .map(|a| a.value(i).to_string())
                    .unwrap_or_else(|| ASPECT_UNIT.to_string()),
                metric: metric.filter(|m| !m.is_null(i)).map(|m| m.value(i)).unwrap_or(0.0),
            };
            let keep = match sel {
                TestSelector::Run(id) => &row.run_id == id,
                TestSelector::Repo(r) => &row.repo == r,
                TestSelector::All => true,
            };
            if keep {
                out.push(row);
            }
        }
    }
    out.sort_by_key(|r| r.key());
    Ok(out)
}

/// The warehouse-backed [`TestSink`]: appends matrix rows to the Iceberg
/// `test_results` table. Leaf repos with no warehouse use the crate's
/// `JsonFileSink` / `NullSink` instead.
pub struct IcebergTestSink<'a> {
    wh: &'a IcebergWarehouse,
}

impl<'a> IcebergTestSink<'a> {
    pub fn new(wh: &'a IcebergWarehouse) -> Self {
        Self { wh }
    }
}

impl TestSink for IcebergTestSink<'_> {
    fn append(&self, rows: &[TestResultRow]) -> Result<()> {
        // The warehouse's block_on bridges the sync TestSink trait to the async
        // Iceberg writer (the rest of nornir's CLI is sync over block_on too).
        self.wh.block_on(append_test_results(self.wh, rows))
    }
}

// ─── column helpers ──────────────────────────────────────────────────────

fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
    b.column(idx)
        .as_any()
        .downcast_ref::<StringArray>()
        .ok_or_else(|| anyhow!("test_results col {idx} is not StringArray"))
}

fn col_f64<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a Float64Array> {
    b.column(idx)
        .as_any()
        .downcast_ref::<Float64Array>()
        .ok_or_else(|| anyhow!("test_results col {idx} is not Float64Array"))
}

fn col_ts<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a TimestampMicrosecondArray> {
    b.column(idx)
        .as_any()
        .downcast_ref::<TimestampMicrosecondArray>()
        .ok_or_else(|| anyhow!("test_results col {idx} is not TimestampMicrosecondArray"))
}

/// Optionally fetch a string column (returns None when the batch has fewer
/// columns — a pre-EPIC-L table that predates `aspect`/`metric`).
fn opt_col_str(b: &RecordBatch, idx: usize) -> Option<&StringArray> {
    if idx >= b.num_columns() {
        return None;
    }
    b.column(idx).as_any().downcast_ref::<StringArray>()
}

fn opt_col_f64(b: &RecordBatch, idx: usize) -> Option<&Float64Array> {
    if idx >= b.num_columns() {
        return None;
    }
    b.column(idx).as_any().downcast_ref::<Float64Array>()
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn append_query_round_trip_groups_and_counts() {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();

        let nornir_rows = vec![
            TestResultRow::unit("runA", "nornir", "nornir", "a::passes", status::PASS, 12.0, 100, ""),
            TestResultRow::unit("runA", "nornir", "nornir", "a::fails", status::FAIL, 3.0, 100, "assert left == right"),
            TestResultRow::unit("runA", "nornir", "nornir", "a::skipped", status::IGNORED, 0.0, 100, ""),
            TestResultRow::unit("runB", "nornir", "nornir", "b::hangs", status::STALLED, 120_000.0, 200, "no output for 120s"),
            // EPIC L: an aspect row with a metric (clippy: 4 warnings).
            TestResultRow {
                run_id: "runB".into(), repo: "nornir".into(), suite: "nornir".into(),
                test_name: "clippy".into(), status: status::FAIL.into(),
                duration_ms: 50.0, ts_micros: 200, message: "4 clippy warning(s)".into(),
                aspect: "clippy".into(), metric: 4.0,
            },
        ];
        let znippy_rows = vec![TestResultRow::unit(
            "runZ", "znippy", "znippy", "z::ok", status::PASS, 1.0, 50, "",
        )];
        wh.block_on(append_test_results(&wh, &nornir_rows)).unwrap();
        wh.block_on(append_test_results(&wh, &znippy_rows)).unwrap();

        // Run scope: runA has 3 cases (1 pass, 1 fail, 1 ignored).
        let a = wh
            .block_on(query_test_results(&wh, &TestSelector::Run("runA".into())))
            .unwrap();
        assert_eq!(a.len(), 3);
        let sum = summarize_runs(&a);
        assert_eq!(sum.len(), 1);
        assert_eq!((sum[0].passed, sum[0].failed, sum[0].ignored, sum[0].stalled), (1, 1, 1, 0));
        assert!(!sum[0].green(), "runA has a failing case");
        assert_eq!(sum[0].total(), 3);

        // Durations + message + aspect default round-trip exactly.
        let fail = a.iter().find(|r| r.test_name == "a::fails").unwrap();
        assert_eq!(fail.duration_ms, 3.0);
        assert_eq!(fail.message, "assert left == right");
        assert_eq!(fail.aspect, ASPECT_UNIT, "unit rows default the aspect");
        assert_eq!(fail.metric, 0.0);

        // Repo scope crosses runs: nornir has runA(3) + runB(2) = 5 cases.
        let nornir = wh
            .block_on(query_test_results(&wh, &TestSelector::Repo("nornir".into())))
            .unwrap();
        assert_eq!(nornir.len(), 5);

        // The clippy aspect row round-trips aspect + metric.
        let clippy = nornir.iter().find(|r| r.test_name == "clippy").unwrap();
        assert_eq!(clippy.aspect, "clippy");
        assert_eq!(clippy.metric, 4.0);

        let runs = summarize_runs(&nornir);
        assert_eq!(runs.len(), 2);
        // Newest run (runB, ts=200) is first; it stalled → red.
        assert_eq!(runs[0].run_id, "runB");
        assert_eq!(runs[0].stalled, 1);
        assert!(!runs[0].green());

        // All scope sees every repo's runs.
        let all = wh.block_on(query_test_results(&wh, &TestSelector::All)).unwrap();
        assert_eq!(all.len(), 6);

        // JSON is well-formed and carries the verdicts + aspect/metric.
        let json = rows_to_json(&a);
        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
        assert_eq!(parsed.as_array().unwrap().len(), 3);

        // The human matrix names the red cases.
        let matrix = render_matrix(&nornir);
        assert!(matrix.contains("b::hangs"), "stalled case shown: {matrix}");
        assert!(matrix.contains("clippy"), "clippy aspect failure shown");
    }

    #[test]
    fn iceberg_sink_appends_via_trait() {
        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();
        let sink = IcebergTestSink::new(&wh);
        let rows = vec![TestResultRow {
            run_id: "rS".into(), repo: "leaf".into(), suite: "leaf".into(),
            test_name: "coverage".into(), status: status::PASS.into(),
            duration_ms: 9.0, ts_micros: 300, message: "91.3% line coverage".into(),
            aspect: "coverage".into(), metric: 91.3,
        }];
        // Drive append through the TestSink trait (the leaf-repo seam).
        sink.append(&rows).unwrap();
        let back = wh
            .block_on(query_test_results(&wh, &TestSelector::Run("rS".into())))
            .unwrap();
        assert_eq!(back.len(), 1);
        assert_eq!(back[0].aspect, "coverage");
        assert_eq!(back[0].metric, 91.3);
        assert_eq!(back[0].status, status::PASS);
    }

    #[test]
    fn status_red_classification() {
        assert!(status::is_red(status::FAIL));
        assert!(status::is_red(status::STALLED));
        assert!(!status::is_red(status::PASS));
        assert!(!status::is_red(status::IGNORED));
        assert!(!status::is_red(status::SKIP));
    }
}