use anyhow::{anyhow, Result};
use arrow::array::{Array, BooleanArray, Int32Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use futures::TryStreamExt;
use iceberg::expr::Reference;
use iceberg::spec::Datum;
use iceberg::Catalog;
use super::iceberg::{IcebergWarehouse, TABLE_TEST_INVENTORY};
use super::test_results::{query_test_results, status, TestResultRow, TestSelector};
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct TestInventoryRow {
pub repo: String,
pub crate_name: String,
pub module_path: String,
pub test_name: String,
pub file: String,
pub line: u32,
pub is_heavy: bool,
pub is_async: bool,
}
impl TestInventoryRow {
pub fn key(&self) -> (String, String, String) {
(self.repo.clone(), self.crate_name.clone(), self.test_name.clone())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MatrixState {
Ok,
Fail,
NotRun,
}
impl MatrixState {
pub fn glyph(self) -> &'static str {
match self {
MatrixState::Ok => "ok",
MatrixState::Fail => "fail",
MatrixState::NotRun => "X",
}
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize)]
pub struct TestMatrixRow {
pub repo: String,
pub suite: String,
pub test_name: String,
pub is_heavy: bool,
pub is_async: bool,
pub state: String,
pub last_status: String,
pub message: String,
}
pub fn query_test_inventory(wh: &IcebergWarehouse, repo: &str) -> Result<Vec<TestInventoryRow>> {
wh.block_on(query_test_inventory_async(wh, repo))
}
pub async fn query_test_inventory_async(
wh: &IcebergWarehouse,
repo: &str,
) -> Result<Vec<TestInventoryRow>> {
let table = wh.catalog().load_table(&wh.table_ident(TABLE_TEST_INVENTORY)).await?;
let scan = table
.scan()
.with_filter(Reference::new("repo").equal_to(Datum::string(repo)))
.build()?;
let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
let mut latest: Option<(String, i64)> = None;
for b in &batches {
let snaps = col::<StringArray>(b, "snapshot_id")?;
let repos = col::<StringArray>(b, "repo")?;
let ts = col::<TimestampMicrosecondArray>(b, "ts_micros")?;
for i in 0..b.num_rows() {
if repos.value(i) != repo {
continue;
}
let t = ts.value(i);
if latest.as_ref().map(|(_, lt)| t > *lt).unwrap_or(true) {
latest = Some((snaps.value(i).to_string(), t));
}
}
}
let Some((snap, _)) = latest else {
return Ok(Vec::new());
};
let mut out = Vec::new();
for b in &batches {
let snaps = col::<StringArray>(b, "snapshot_id")?;
let repos = col::<StringArray>(b, "repo")?;
let crate_name = col::<StringArray>(b, "crate_name")?;
let module_path = col::<StringArray>(b, "module_path")?;
let test_name = col::<StringArray>(b, "test_name")?;
let file = col::<StringArray>(b, "file")?;
let line = col::<Int32Array>(b, "line")?;
let is_heavy = col::<BooleanArray>(b, "is_heavy")?;
let is_async = col::<BooleanArray>(b, "is_async")?;
for i in 0..b.num_rows() {
if snaps.value(i) != snap || repos.value(i) != repo {
continue;
}
out.push(TestInventoryRow {
repo: repos.value(i).to_string(),
crate_name: crate_name.value(i).to_string(),
module_path: module_path.value(i).to_string(),
test_name: test_name.value(i).to_string(),
file: file.value(i).to_string(),
line: line.value(i).max(0) as u32,
is_heavy: is_heavy.value(i),
is_async: is_async.value(i),
});
}
}
out.sort_by(|a, b| a.key().cmp(&b.key()));
out.dedup();
Ok(out)
}
pub fn join_matrix(
inventory: &[TestInventoryRow],
results: &[TestResultRow],
) -> Vec<TestMatrixRow> {
use std::collections::HashMap;
let mut latest: HashMap<(String, String, String), &TestResultRow> = HashMap::new();
for r in results {
if status::is_listed(&r.status) {
continue;
}
let key = (r.repo.clone(), r.suite.clone(), r.test_name.clone());
match latest.get(&key) {
Some(prev) if prev.ts_micros >= r.ts_micros => {}
_ => {
latest.insert(key, r);
}
}
}
let mut out: Vec<TestMatrixRow> = inventory
.iter()
.map(|inv| {
let key = inv.key();
let (state, last_status, message) = match latest.get(&key) {
Some(r) if r.status == status::PASS => {
(MatrixState::Ok, r.status.clone(), r.message.clone())
}
Some(r) if status::is_red(&r.status) => {
(MatrixState::Fail, r.status.clone(), r.message.clone())
}
Some(r) => (MatrixState::NotRun, r.status.clone(), r.message.clone()),
None => (MatrixState::NotRun, String::new(), String::new()),
};
TestMatrixRow {
repo: inv.repo.clone(),
suite: inv.crate_name.clone(),
test_name: inv.test_name.clone(),
is_heavy: inv.is_heavy,
is_async: inv.is_async,
state: state.glyph().to_string(),
last_status,
message,
}
})
.collect();
out.sort_by(|a, b| {
(&a.repo, &a.suite, &a.test_name).cmp(&(&b.repo, &b.suite, &b.test_name))
});
out
}
pub fn query_test_matrix(wh: &IcebergWarehouse, repo: &str) -> Result<Vec<TestMatrixRow>> {
let inventory = query_test_inventory(wh, repo)?;
let results = wh.block_on(query_test_results(wh, &TestSelector::Repo(repo.to_string())))?;
Ok(join_matrix(&inventory, &results))
}
fn col<'a, T: 'static>(batch: &'a RecordBatch, name: &str) -> Result<&'a T> {
batch
.column_by_name(name)
.ok_or_else(|| anyhow!("test_inventory missing column `{name}`"))?
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("test_inventory column `{name}` has unexpected type"))
}
#[cfg(test)]
mod tests {
use super::*;
fn inv(repo: &str, suite: &str, name: &str, heavy: bool) -> TestInventoryRow {
TestInventoryRow {
repo: repo.into(),
crate_name: suite.into(),
module_path: format!("{suite}::tests"),
test_name: name.into(),
file: "src/lib.rs".into(),
line: 10,
is_heavy: heavy,
is_async: false,
}
}
#[test]
fn join_left_outer_gives_x_for_not_run_and_ok_fail_for_results() {
let inventory = vec![
inv("nornir", "nornir", "never_run", false),
inv("nornir", "nornir", "passes", false),
inv("nornir", "nornir", "fails", false),
inv("nornir", "nornir", "heavy_corpus", true),
];
let results = vec![
TestResultRow::unit("r1", "nornir", "nornir", "passes", status::FAIL, 1.0, 100, "old"),
TestResultRow::unit("r2", "nornir", "nornir", "passes", status::PASS, 1.0, 200, ""),
TestResultRow::unit("r2", "nornir", "nornir", "fails", status::FAIL, 1.0, 200, "boom"),
TestResultRow::unit("r2", "nornir", "nornir", "never_run", status::LISTED, 0.0, 200, ""),
];
let m = join_matrix(&inventory, &results);
let by = |n: &str| m.iter().find(|r| r.test_name == n).unwrap();
assert_eq!(by("never_run").state, "X", "no result + listed-only → not-run X");
assert_eq!(by("passes").state, "ok", "newest result (pass) wins over old fail");
assert_eq!(by("fails").state, "fail");
assert_eq!(by("fails").message, "boom");
assert_eq!(by("heavy_corpus").state, "X", "discovered, never run → X");
assert!(by("heavy_corpus").is_heavy, "heavy flag rides the join");
assert_eq!(m.len(), 4);
}
#[test]
fn inventory_roundtrips_and_joins_against_results() {
use crate::knowledge::symbols::{SymbolScan, TestDefRow};
use crate::warehouse::test_results::append_test_results;
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let scan = SymbolScan {
snapshot_id: uuid::Uuid::new_v4(),
ts: chrono::Utc::now(),
repo: "demo".into(),
tests: vec![
TestDefRow {
crate_name: "demo".into(), module_path: "demo::tests".into(),
test_name: "alpha".into(), file: "src/lib.rs".into(), line: 3,
is_heavy: false, is_async: false,
},
TestDefRow {
crate_name: "demo".into(), module_path: "demo::tests".into(),
test_name: "beta_heavy".into(), file: "src/lib.rs".into(), line: 9,
is_heavy: true, is_async: true,
},
],
..Default::default()
};
wh.append_symbol_scan(&scan).unwrap();
let back = query_test_inventory(&wh, "demo").unwrap();
assert_eq!(back.len(), 2, "two tests inventoried, no build");
assert!(back.iter().any(|r| r.test_name == "beta_heavy" && r.is_heavy && r.is_async));
let m0 = query_test_matrix(&wh, "demo").unwrap();
assert!(m0.iter().all(|r| r.state == "X"), "{m0:?}");
wh.block_on(append_test_results(
&wh,
&[TestResultRow::unit("run", "demo", "demo", "alpha", status::PASS, 5.0, 500, "")],
))
.unwrap();
let m1 = query_test_matrix(&wh, "demo").unwrap();
let alpha = m1.iter().find(|r| r.test_name == "alpha").unwrap();
let beta = m1.iter().find(|r| r.test_name == "beta_heavy").unwrap();
assert_eq!(alpha.state, "ok", "the run filled ok");
assert_eq!(beta.state, "X", "the un-run test stays not-run");
}
}