use anyhow::{anyhow, Result};
use arrow::array::{Array, BooleanArray, Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use iceberg::Catalog;
use super::iceberg::{IcebergWarehouse, TABLE_TEST_INVENTORY};
use super::test_results::{query_test_results, status, TestResultRow, TestSelector};
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct TestInventoryRow {
pub repo: String,
pub crate_name: String,
pub module_path: String,
pub test_name: String,
pub file: String,
pub line: u32,
pub is_heavy: bool,
pub is_async: bool,
}
impl TestInventoryRow {
pub fn key(&self) -> (String, String, String) {
(self.repo.clone(), self.crate_name.clone(), self.test_name.clone())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MatrixState {
Ok,
Fail,
NotRun,
}
impl MatrixState {
pub fn glyph(self) -> &'static str {
match self {
MatrixState::Ok => "ok",
MatrixState::Fail => "fail",
MatrixState::NotRun => "X",
}
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize)]
pub struct TestMatrixRow {
pub repo: String,
pub suite: String,
pub test_name: String,
pub is_heavy: bool,
pub is_async: bool,
pub state: String,
pub last_status: String,
pub message: String,
}
pub fn query_test_inventory(wh: &IcebergWarehouse, repo: &str) -> Result<Vec<TestInventoryRow>> {
wh.block_on(query_test_inventory_async(wh, repo))
}
pub async fn query_test_inventory_async(
wh: &IcebergWarehouse,
repo: &str,
) -> Result<Vec<TestInventoryRow>> {
let batches: Vec<RecordBatch> = super::iceberg::load_and_read_filtered(
wh,
TABLE_TEST_INVENTORY,
&skade::ScanFilter::eq("repo", repo),
&[],
)
.await?;
let mut latest: Option<(String, i64)> = None;
for b in &batches {
let snaps = col::<StringArray>(b, "snapshot_id")?;
let repos = col::<StringArray>(b, "repo")?;
let ts = col::<TimestampMicrosecondArray>(b, "ts_micros")?;
for i in 0..b.num_rows() {
if repos.value(i) != repo {
continue;
}
let t = ts.value(i);
if latest.as_ref().map(|(_, lt)| t > *lt).unwrap_or(true) {
latest = Some((snaps.value(i).to_string(), t));
}
}
}
let Some((snap, _)) = latest else {
return Ok(Vec::new());
};
let mut out = Vec::new();
for b in &batches {
let snaps = col::<StringArray>(b, "snapshot_id")?;
let repos = col::<StringArray>(b, "repo")?;
let crate_name = col::<StringArray>(b, "crate_name")?;
let module_path = col::<StringArray>(b, "module_path")?;
let test_name = col::<StringArray>(b, "test_name")?;
let file = col::<StringArray>(b, "file")?;
let line = col::<Int32Array>(b, "line")?;
let is_heavy = col::<BooleanArray>(b, "is_heavy")?;
let is_async = col::<BooleanArray>(b, "is_async")?;
for i in 0..b.num_rows() {
if snaps.value(i) != snap || repos.value(i) != repo {
continue;
}
out.push(TestInventoryRow {
repo: repos.value(i).to_string(),
crate_name: crate_name.value(i).to_string(),
module_path: module_path.value(i).to_string(),
test_name: test_name.value(i).to_string(),
file: file.value(i).to_string(),
line: line.value(i).max(0) as u32,
is_heavy: is_heavy.value(i),
is_async: is_async.value(i),
});
}
}
out.sort_by(|a, b| a.key().cmp(&b.key()));
out.dedup();
Ok(out)
}
pub fn join_matrix(
inventory: &[TestInventoryRow],
results: &[TestResultRow],
) -> Vec<TestMatrixRow> {
use std::collections::HashMap;
let mut latest: HashMap<(String, String, String), &TestResultRow> = HashMap::new();
for r in results {
if status::is_listed(&r.status) {
continue;
}
let key = (r.repo.clone(), r.suite.clone(), r.test_name.clone());
match latest.get(&key) {
Some(prev) if prev.ts_micros >= r.ts_micros => {}
_ => {
latest.insert(key, r);
}
}
}
let mut out: Vec<TestMatrixRow> = inventory
.iter()
.map(|inv| {
let key = inv.key();
let (state, last_status, message) = match latest.get(&key) {
Some(r) if r.status == status::PASS => {
(MatrixState::Ok, r.status.clone(), r.message.clone())
}
Some(r) if status::is_red(&r.status) => {
(MatrixState::Fail, r.status.clone(), r.message.clone())
}
Some(r) => (MatrixState::NotRun, r.status.clone(), r.message.clone()),
None => (MatrixState::NotRun, String::new(), String::new()),
};
TestMatrixRow {
repo: inv.repo.clone(),
suite: inv.crate_name.clone(),
test_name: inv.test_name.clone(),
is_heavy: inv.is_heavy,
is_async: inv.is_async,
state: state.glyph().to_string(),
last_status,
message,
}
})
.collect();
out.sort_by(|a, b| {
(&a.repo, &a.suite, &a.test_name).cmp(&(&b.repo, &b.suite, &b.test_name))
});
out
}
pub fn query_test_matrix(wh: &IcebergWarehouse, repo: &str) -> Result<Vec<TestMatrixRow>> {
let inventory = query_test_inventory(wh, repo)?;
let results = wh.block_on(query_test_results(wh, &TestSelector::Repo(repo.to_string())))?;
Ok(join_matrix(&inventory, &results))
}
fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
batch
.column_by_name(name)
.ok_or_else(|| anyhow!("test_inventory missing column `{name}`"))?
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("test_inventory column `{name}` has unexpected type"))
}
#[cfg(test)]
mod tests {
use super::*;
fn inv(repo: &str, suite: &str, name: &str, heavy: bool) -> TestInventoryRow {
TestInventoryRow {
repo: repo.into(),
crate_name: suite.into(),
module_path: format!("{suite}::tests"),
test_name: name.into(),
file: "src/lib.rs".into(),
line: 10,
is_heavy: heavy,
is_async: false,
}
}
#[test]
fn join_left_outer_gives_x_for_not_run_and_ok_fail_for_results() {
let inventory = vec![
inv("nornir", "nornir", "never_run", false),
inv("nornir", "nornir", "passes", false),
inv("nornir", "nornir", "fails", false),
inv("nornir", "nornir", "heavy_corpus", true),
];
let results = vec![
TestResultRow::unit("r1", "nornir", "nornir", "passes", status::FAIL, 1.0, 100, "old"),
TestResultRow::unit("r2", "nornir", "nornir", "passes", status::PASS, 1.0, 200, ""),
TestResultRow::unit("r2", "nornir", "nornir", "fails", status::FAIL, 1.0, 200, "boom"),
TestResultRow::unit("r2", "nornir", "nornir", "never_run", status::LISTED, 0.0, 200, ""),
];
let m = join_matrix(&inventory, &results);
let by = |n: &str| m.iter().find(|r| r.test_name == n).unwrap();
assert_eq!(by("never_run").state, "X", "no result + listed-only → not-run X");
assert_eq!(by("passes").state, "ok", "newest result (pass) wins over old fail");
assert_eq!(by("fails").state, "fail");
assert_eq!(by("fails").message, "boom");
assert_eq!(by("heavy_corpus").state, "X", "discovered, never run → X");
assert!(by("heavy_corpus").is_heavy, "heavy flag rides the join");
assert_eq!(m.len(), 4);
}
#[test]
fn inventory_roundtrips_and_joins_against_results() {
use crate::knowledge::symbols::{SymbolScan, TestDefRow};
use crate::warehouse::test_results::append_test_results;
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let scan = SymbolScan {
snapshot_id: uuid::Uuid::new_v4(),
ts: chrono::Utc::now(),
repo: "demo".into(),
tests: vec![
TestDefRow {
crate_name: "demo".into(), module_path: "demo::tests".into(),
test_name: "alpha".into(), file: "src/lib.rs".into(), line: 3,
is_heavy: false, is_async: false,
},
TestDefRow {
crate_name: "demo".into(), module_path: "demo::tests".into(),
test_name: "beta_heavy".into(), file: "src/lib.rs".into(), line: 9,
is_heavy: true, is_async: true,
},
],
..Default::default()
};
wh.append_symbol_scan(&scan).unwrap();
let back = query_test_inventory(&wh, "demo").unwrap();
assert_eq!(back.len(), 2, "two tests inventoried, no build");
assert!(back.iter().any(|r| r.test_name == "beta_heavy" && r.is_heavy && r.is_async));
let m0 = query_test_matrix(&wh, "demo").unwrap();
assert!(m0.iter().all(|r| r.state == "X"), "{m0:?}");
wh.block_on(append_test_results(
&wh,
&[TestResultRow::unit("run", "demo", "demo", "alpha", status::PASS, 5.0, 500, "")],
))
.unwrap();
let m1 = query_test_matrix(&wh, "demo").unwrap();
let alpha = m1.iter().find(|r| r.test_name == "alpha").unwrap();
let beta = m1.iter().find(|r| r.test_name == "beta_heavy").unwrap();
assert_eq!(alpha.state, "ok", "the run filled ok");
assert_eq!(beta.state, "X", "the un-run test stays not-run");
}
fn blank_table_metadata(root: &std::path::Path, table: &str) -> usize {
let meta_dir = root.join("warehouse").join("nornir").join(table).join("metadata");
let mut blanked = 0usize;
if meta_dir.is_dir() {
for entry in std::fs::read_dir(&meta_dir).unwrap().flatten() {
let p = entry.path();
if p.extension().and_then(|e| e.to_str()) == Some("json") {
std::fs::write(&p, b"").unwrap();
blanked += 1;
}
}
}
blanked
}
#[test]
fn blank_metadata_degrades_workspace_fetch_and_test_history_to_zero_rows() {
use crate::knowledge::symbols::{SymbolScan, TestDefRow};
use crate::warehouse::test_results::{
append_test_results, query_test_results, TestSelector,
};
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_path_buf();
let wh = IcebergWarehouse::open(&root).unwrap();
let scan = SymbolScan {
snapshot_id: uuid::Uuid::new_v4(),
ts: chrono::Utc::now(),
repo: "facett".into(),
tests: vec![TestDefRow {
crate_name: "facett".into(),
module_path: "facett::tests".into(),
test_name: "renders".into(),
file: "src/lib.rs".into(),
line: 7,
is_heavy: false,
is_async: false,
}],
..Default::default()
};
wh.append_symbol_scan(&scan).unwrap();
wh.block_on(append_test_results(
&wh,
&[TestResultRow::unit(
"runF", "facett", "facett", "renders", status::PASS, 4.0, 700, "",
)],
))
.unwrap();
assert_eq!(
query_test_inventory(&wh, "facett").unwrap().len(),
1,
"seeded inventory readable before blanking"
);
assert_eq!(
wh.block_on(query_test_results(&wh, &TestSelector::Repo("facett".into())))
.unwrap()
.len(),
1,
"seeded results readable before blanking"
);
drop(wh);
let blanked_inv = blank_table_metadata(&root, TABLE_TEST_INVENTORY);
let blanked_res =
blank_table_metadata(&root, crate::warehouse::iceberg::TABLE_TEST_RESULTS);
assert!(blanked_inv > 0, "a test_inventory metadata.json was blanked");
assert!(blanked_res > 0, "a test_results metadata.json was blanked");
let ro = IcebergWarehouse::open_read_only(&root).unwrap();
let inv = query_test_inventory(&ro, "facett");
assert!(
inv.is_ok(),
"Workspaces.Fetch path (query_test_inventory) must NOT EOF on blank metadata: {inv:?}"
);
assert_eq!(inv.unwrap().len(), 0, "blank metadata → 0 inventory rows");
let res = ro.block_on(query_test_results(&ro, &TestSelector::Repo("facett".into())));
assert!(
res.is_ok(),
"test history / Warehouse.Scan path (query_test_results) must NOT EOF on blank metadata: {res:?}"
);
assert_eq!(res.unwrap().len(), 0, "blank metadata → 0 result rows");
}
}