nornir 0.4.34

Companion to cargo: dependency tracking, release gating, deploy, benchmarks, and documentation assembly. Project-agnostic.
//! Reader + **matrix join** for the build-free test INVENTORY (`test_inventory`).
//!
//! The inventory is filled by the syn knowledge scan (no `cargo build`) — see
//! [`crate::knowledge::symbols`] and the `append_symbol_scan*`/`ingest_symbol_scans*`
//! writers in [`super::iceberg`]. This module reads the **latest snapshot per
//! repo** back, and joins it against the actual run outcomes in `test_results`
//! to produce the tri-state matrix every test shows:
//!
//! ```text
//! test_inventory  LEFT JOIN  test_results   ON (repo, suite, test_name)
//!   ⇒ status = latest result's ok|fail   (a real run exists), else  X (not-run)
//! ```
//!
//! `X` is the **not-run** state: the test EXISTS (discovered on load, free) but
//! has never been executed (execution only happens in `nornir release` / `nornir
//! test run`). The viz Test pane + `nornir test list` both render the same
//! `ok | fail | X` per test, plus the `is_heavy` flag.

use anyhow::{anyhow, Result};
use arrow::array::{Array, BooleanArray, Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use futures::TryStreamExt;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;

use super::iceberg::{IcebergWarehouse, TABLE_TEST_INVENTORY};
use super::test_results::{query_test_results, status, TestResultRow, TestSelector};

/// One discovered test, as read back from `test_inventory`.
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct TestInventoryRow {
    pub repo: String,
    pub crate_name: String,
    pub module_path: String,
    pub test_name: String,
    pub file: String,
    pub line: u32,
    pub is_heavy: bool,
    pub is_async: bool,
}

impl TestInventoryRow {
    /// The `(repo, suite, test_name)` join key against `test_results`. The
    /// inventory's `crate_name` is the test's suite (what the runner reports as
    /// the binary/suite), matching how run rows are keyed.
    pub fn key(&self) -> (String, String, String) {
        (self.repo.clone(), self.crate_name.clone(), self.test_name.clone())
    }
}

/// The tri-state matrix verdict for one test.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MatrixState {
    /// A run recorded a pass.
    Ok,
    /// A run recorded a fail/stalled.
    Fail,
    /// Discovered but never run (the `X` cell).
    NotRun,
}

impl MatrixState {
    /// The single-glyph cell the matrix shows: `ok` · `fail` · `X`.
    pub fn glyph(self) -> &'static str {
        match self {
            MatrixState::Ok => "ok",
            MatrixState::Fail => "fail",
            MatrixState::NotRun => "X",
        }
    }
}

/// One joined matrix row: a discovered test + its `ok|fail|X` verdict (+ heavy).
#[derive(Debug, Clone, PartialEq, serde::Serialize)]
pub struct TestMatrixRow {
    pub repo: String,
    pub suite: String,
    pub test_name: String,
    pub is_heavy: bool,
    pub is_async: bool,
    /// `"ok"` | `"fail"` | `"X"` — the tri-state cell.
    pub state: String,
    /// The raw `test_results.status` of the latest result, or `""` if not-run.
    pub last_status: String,
    /// Failure/stall message of the latest result (`""` when none / not-run).
    pub message: String,
}

/// Load the latest-snapshot test inventory for `repo` (max `ts_micros`).
/// Predicate-pushed on `repo` so the planner prunes other repos' data files.
pub fn query_test_inventory(wh: &IcebergWarehouse, repo: &str) -> Result<Vec<TestInventoryRow>> {
    wh.block_on(query_test_inventory_async(wh, repo))
}

/// Async core of [`query_test_inventory`].
pub async fn query_test_inventory_async(
    wh: &IcebergWarehouse,
    repo: &str,
) -> Result<Vec<TestInventoryRow>> {
    let table = wh.catalog().load_table(&wh.table_ident(TABLE_TEST_INVENTORY)).await?;
    let scan = table
        .scan()
        .with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
        .build()?;
    let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;

    // Latest snapshot for this repo.
    let mut latest: Option<(String, i64)> = None;
    for b in &batches {
        let snaps = col::<StringArray>(b, "snapshot_id")?;
        let repos = col::<StringArray>(b, "repo")?;
        let ts = col::<TimestampMicrosecondArray>(b, "ts_micros")?;
        for i in 0..b.num_rows() {
            if repos.value(i) != repo {
                continue;
            }
            let t = ts.value(i);
            if latest.as_ref().map(|(_, lt)| t > *lt).unwrap_or(true) {
                latest = Some((snaps.value(i).to_string(), t));
            }
        }
    }
    let Some((snap, _)) = latest else {
        return Ok(Vec::new());
    };

    let mut out = Vec::new();
    for b in &batches {
        let snaps = col::<StringArray>(b, "snapshot_id")?;
        let repos = col::<StringArray>(b, "repo")?;
        let crate_name = col::<StringArray>(b, "crate_name")?;
        let module_path = col::<StringArray>(b, "module_path")?;
        let test_name = col::<StringArray>(b, "test_name")?;
        let file = col::<StringArray>(b, "file")?;
        let line = col::<Int32Array>(b, "line")?;
        let is_heavy = col::<BooleanArray>(b, "is_heavy")?;
        let is_async = col::<BooleanArray>(b, "is_async")?;
        for i in 0..b.num_rows() {
            if snaps.value(i) != snap || repos.value(i) != repo {
                continue;
            }
            out.push(TestInventoryRow {
                repo: repos.value(i).to_string(),
                crate_name: crate_name.value(i).to_string(),
                module_path: module_path.value(i).to_string(),
                test_name: test_name.value(i).to_string(),
                file: file.value(i).to_string(),
                line: line.value(i).max(0) as u32,
                is_heavy: is_heavy.value(i),
                is_async: is_async.value(i),
            });
        }
    }
    out.sort_by(|a, b| a.key().cmp(&b.key()));
    out.dedup();
    Ok(out)
}

/// The **matrix join** for `repo`: every discovered inventory test LEFT JOINed
/// against `test_results`. A test with a run result takes that result's
/// `ok`/`fail`; a test with none shows `X` (not-run). Pure once given both
/// sides — fed canned rows by tests.
pub fn join_matrix(
    inventory: &[TestInventoryRow],
    results: &[TestResultRow],
) -> Vec<TestMatrixRow> {
    use std::collections::HashMap;
    // Latest result per (repo, suite, test_name) — the newest ts_micros wins.
    let mut latest: HashMap<(String, String, String), &TestResultRow> = HashMap::new();
    for r in results {
        // Discovery-only `listed` rows never override a real verdict, and they
        // carry no verdict themselves — skip them in the join (the inventory IS
        // the discovery source now).
        if status::is_listed(&r.status) {
            continue;
        }
        let key = (r.repo.clone(), r.suite.clone(), r.test_name.clone());
        match latest.get(&key) {
            Some(prev) if prev.ts_micros >= r.ts_micros => {}
            _ => {
                latest.insert(key, r);
            }
        }
    }

    let mut out: Vec<TestMatrixRow> = inventory
        .iter()
        .map(|inv| {
            let key = inv.key();
            let (state, last_status, message) = match latest.get(&key) {
                Some(r) if r.status == status::PASS => {
                    (MatrixState::Ok, r.status.clone(), r.message.clone())
                }
                Some(r) if status::is_red(&r.status) => {
                    (MatrixState::Fail, r.status.clone(), r.message.clone())
                }
                // ignored / skip / anything else that ran → treat as not-run for
                // the tri-state (it didn't pass, didn't fail).
                Some(r) => (MatrixState::NotRun, r.status.clone(), r.message.clone()),
                None => (MatrixState::NotRun, String::new(), String::new()),
            };
            TestMatrixRow {
                repo: inv.repo.clone(),
                suite: inv.crate_name.clone(),
                test_name: inv.test_name.clone(),
                is_heavy: inv.is_heavy,
                is_async: inv.is_async,
                state: state.glyph().to_string(),
                last_status,
                message,
            }
        })
        .collect();
    out.sort_by(|a, b| {
        (&a.repo, &a.suite, &a.test_name).cmp(&(&b.repo, &b.suite, &b.test_name))
    });
    out
}

/// Read the joined `ok|fail|X` matrix for `repo` straight from the warehouse:
/// reads `test_inventory` (latest snapshot) + `test_results` (every run) and
/// [`join_matrix`]es them. The CLI (`nornir test list`) + the viz both call this.
pub fn query_test_matrix(wh: &IcebergWarehouse, repo: &str) -> Result<Vec<TestMatrixRow>> {
    let inventory = query_test_inventory(wh, repo)?;
    let results = wh.block_on(query_test_results(wh, &TestSelector::Repo(repo.to_string())))?;
    Ok(join_matrix(&inventory, &results))
}

fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
    batch
        .column_by_name(name)
        .ok_or_else(|| anyhow!("test_inventory missing column `{name}`"))?
        .as_any()
        .downcast_ref::<T>()
        .ok_or_else(|| anyhow!("test_inventory column `{name}` has unexpected type"))
}

#[cfg(test)]
mod tests {
    use super::*;

    fn inv(repo: &str, suite: &str, name: &str, heavy: bool) -> TestInventoryRow {
        TestInventoryRow {
            repo: repo.into(),
            crate_name: suite.into(),
            module_path: format!("{suite}::tests"),
            test_name: name.into(),
            file: "src/lib.rs".into(),
            line: 10,
            is_heavy: heavy,
            is_async: false,
        }
    }

    /// PART B — the LEFT JOIN: a discovered test with NO run result shows `X`,
    /// one with a `pass` shows `ok`, one with a `fail` shows `fail`, and the heavy
    /// flag rides through. The newest result wins. (inject + assert)
    #[test]
    fn join_left_outer_gives_x_for_not_run_and_ok_fail_for_results() {
        let inventory = vec![
            inv("nornir", "nornir", "never_run", false),
            inv("nornir", "nornir", "passes", false),
            inv("nornir", "nornir", "fails", false),
            inv("nornir", "nornir", "heavy_corpus", true),
        ];
        let results = vec![
            // an OLD fail then a NEW pass for `passes` → newest (pass) wins.
            TestResultRow::unit("r1", "nornir", "nornir", "passes", status::FAIL, 1.0, 100, "old"),
            TestResultRow::unit("r2", "nornir", "nornir", "passes", status::PASS, 1.0, 200, ""),
            TestResultRow::unit("r2", "nornir", "nornir", "fails", status::FAIL, 1.0, 200, "boom"),
            // a `listed` discovery row must NOT count as a verdict.
            TestResultRow::unit("r2", "nornir", "nornir", "never_run", status::LISTED, 0.0, 200, ""),
        ];

        let m = join_matrix(&inventory, &results);
        let by = |n: &str| m.iter().find(|r| r.test_name == n).unwrap();

        assert_eq!(by("never_run").state, "X", "no result + listed-only → not-run X");
        assert_eq!(by("passes").state, "ok", "newest result (pass) wins over old fail");
        assert_eq!(by("fails").state, "fail");
        assert_eq!(by("fails").message, "boom");
        assert_eq!(by("heavy_corpus").state, "X", "discovered, never run → X");
        assert!(by("heavy_corpus").is_heavy, "heavy flag rides the join");

        // Every inventory test is present exactly once (it's the LEFT side).
        assert_eq!(m.len(), 4);
    }

    /// Warehouse round-trip: write a SymbolScan with tests, read the inventory
    /// back, and join it against persisted results → the tri-state matrix.
    #[test]
    fn inventory_roundtrips_and_joins_against_results() {
        use crate::knowledge::symbols::{SymbolScan, TestDefRow};
        use crate::warehouse::test_results::append_test_results;

        let dir = tempfile::tempdir().unwrap();
        let wh = IcebergWarehouse::open(dir.path()).unwrap();

        let scan = SymbolScan {
            snapshot_id: uuid::Uuid::new_v4(),
            ts: chrono::Utc::now(),
            repo: "demo".into(),
            tests: vec![
                TestDefRow {
                    crate_name: "demo".into(), module_path: "demo::tests".into(),
                    test_name: "alpha".into(), file: "src/lib.rs".into(), line: 3,
                    is_heavy: false, is_async: false,
                },
                TestDefRow {
                    crate_name: "demo".into(), module_path: "demo::tests".into(),
                    test_name: "beta_heavy".into(), file: "src/lib.rs".into(), line: 9,
                    is_heavy: true, is_async: true,
                },
            ],
            ..Default::default()
        };
        wh.append_symbol_scan(&scan).unwrap();

        let back = query_test_inventory(&wh, "demo").unwrap();
        assert_eq!(back.len(), 2, "two tests inventoried, no build");
        assert!(back.iter().any(|r| r.test_name == "beta_heavy" && r.is_heavy && r.is_async));

        // No results yet → both are X.
        let m0 = query_test_matrix(&wh, "demo").unwrap();
        assert!(m0.iter().all(|r| r.state == "X"), "{m0:?}");

        // Run one → it becomes ok; the other stays X.
        wh.block_on(append_test_results(
            &wh,
            &[TestResultRow::unit("run", "demo", "demo", "alpha", status::PASS, 5.0, 500, "")],
        ))
        .unwrap();
        let m1 = query_test_matrix(&wh, "demo").unwrap();
        let alpha = m1.iter().find(|r| r.test_name == "alpha").unwrap();
        let beta = m1.iter().find(|r| r.test_name == "beta_heavy").unwrap();
        assert_eq!(alpha.state, "ok", "the run filled ok");
        assert_eq!(beta.state, "X", "the un-run test stays not-run");
    }
}