use std::collections::BTreeMap;
use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{
Array, Float64Array, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use chrono::{TimeZone, Utc};
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use uuid::Uuid;
use super::iceberg::{IcebergWarehouse, TABLE_TEST_RESULTS, append_batch};
const COL_RUN_ID: usize = 0;
const COL_REPO: usize = 1;
const COL_SUITE: usize = 2;
const COL_TEST_NAME: usize = 3;
const COL_STATUS: usize = 4;
const COL_DURATION_MS: usize = 5;
const COL_TS_MICROS: usize = 6;
const COL_MESSAGE: usize = 7;
pub mod status {
pub const PASS: &str = "pass";
pub const FAIL: &str = "fail";
pub const IGNORED: &str = "ignored";
pub const STALLED: &str = "stalled";
pub fn is_red(s: &str) -> bool {
s == FAIL || s == STALLED
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct TestResultRow {
pub run_id: String,
pub repo: String,
pub suite: String,
pub test_name: String,
pub status: String,
pub duration_ms: f64,
pub ts_micros: i64,
pub message: String,
}
impl TestResultRow {
fn key(&self) -> (i64, String, String, String) {
(self.ts_micros, self.run_id.clone(), self.suite.clone(), self.test_name.clone())
}
}
pub async fn append_test_results(wh: &IcebergWarehouse, rows: &[TestResultRow]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let table = wh.catalog().load_table(&wh.table_ident(TABLE_TEST_RESULTS)).await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.repo.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.suite.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.test_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.status.clone()).collect::<Vec<_>>())),
Arc::new(Float64Array::from(rows.iter().map(|r| r.duration_ms).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(rows.iter().map(|r| r.message.clone()).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
#[derive(Debug, Clone)]
pub enum TestSelector {
Run(String),
Repo(String),
All,
}
pub async fn query_test_results(
wh: &IcebergWarehouse,
sel: &TestSelector,
) -> Result<Vec<TestResultRow>> {
let table = wh.catalog().load_table(&wh.table_ident(TABLE_TEST_RESULTS)).await?;
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut out: Vec<TestResultRow> = Vec::new();
for b in &batches {
let run_id = col_str(b, COL_RUN_ID)?;
let repo = col_str(b, COL_REPO)?;
let suite = col_str(b, COL_SUITE)?;
let test_name = col_str(b, COL_TEST_NAME)?;
let st = col_str(b, COL_STATUS)?;
let dur = col_f64(b, COL_DURATION_MS)?;
let ts = col_ts(b, COL_TS_MICROS)?;
let msg = col_str(b, COL_MESSAGE)?;
for i in 0..b.num_rows() {
let row = TestResultRow {
run_id: run_id.value(i).to_string(),
repo: repo.value(i).to_string(),
suite: suite.value(i).to_string(),
test_name: test_name.value(i).to_string(),
status: st.value(i).to_string(),
duration_ms: dur.value(i),
ts_micros: ts.value(i),
message: msg.value(i).to_string(),
};
let keep = match sel {
TestSelector::Run(id) => &row.run_id == id,
TestSelector::Repo(r) => &row.repo == r,
TestSelector::All => true,
};
if keep {
out.push(row);
}
}
}
out.sort_by_key(|r| r.key());
Ok(out)
}
pub fn rows_to_json(rows: &[TestResultRow]) -> String {
fn esc(s: &str) -> String {
let mut o = String::with_capacity(s.len() + 2);
for c in s.chars() {
match c {
'"' => o.push_str("\\\""),
'\\' => o.push_str("\\\\"),
'\n' => o.push_str("\\n"),
'\t' => o.push_str("\\t"),
'\r' => o.push_str("\\r"),
c => o.push(c),
}
}
o
}
let mut s = String::from("[\n");
for (i, r) in rows.iter().enumerate() {
let ts_rfc = Utc
.timestamp_micros(r.ts_micros)
.single()
.map(|d| d.to_rfc3339())
.unwrap_or_default();
s.push_str(&format!(
" {{\"run_id\": \"{}\", \"repo\": \"{}\", \"suite\": \"{}\", \
\"test_name\": \"{}\", \"status\": \"{}\", \"duration_ms\": {:.3}, \
\"ts\": \"{}\", \"message\": \"{}\"}}{}\n",
esc(&r.run_id),
esc(&r.repo),
esc(&r.suite),
esc(&r.test_name),
esc(&r.status),
r.duration_ms,
esc(&ts_rfc),
esc(&r.message),
if i + 1 < rows.len() { "," } else { "" },
));
}
s.push(']');
s
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RunSummary {
pub run_id: String,
pub repo: String,
pub ts_micros: i64,
pub passed: usize,
pub failed: usize,
pub ignored: usize,
pub stalled: usize,
}
impl RunSummary {
pub fn total(&self) -> usize {
self.passed + self.failed + self.ignored + self.stalled
}
pub fn green(&self) -> bool {
self.failed == 0 && self.stalled == 0
}
}
pub fn summarize_runs(rows: &[TestResultRow]) -> Vec<RunSummary> {
let mut by_run: BTreeMap<String, RunSummary> = BTreeMap::new();
for r in rows {
let s = by_run.entry(r.run_id.clone()).or_insert_with(|| RunSummary {
run_id: r.run_id.clone(),
repo: r.repo.clone(),
ts_micros: r.ts_micros,
..Default::default()
});
s.ts_micros = s.ts_micros.max(r.ts_micros);
match r.status.as_str() {
status::PASS => s.passed += 1,
status::FAIL => s.failed += 1,
status::IGNORED => s.ignored += 1,
status::STALLED => s.stalled += 1,
_ => s.failed += 1, }
}
let mut out: Vec<RunSummary> = by_run.into_values().collect();
out.sort_by(|a, b| b.ts_micros.cmp(&a.ts_micros));
out
}
pub fn render_matrix(rows: &[TestResultRow]) -> String {
if rows.is_empty() {
return "(no test runs recorded)\n".to_string();
}
let summaries = summarize_runs(rows);
let mut by_run: BTreeMap<String, Vec<&TestResultRow>> = BTreeMap::new();
for r in rows {
by_run.entry(r.run_id.clone()).or_default().push(r);
}
let mut out = String::new();
for s in &summaries {
let mark = if s.green() { "✓" } else { "✗" };
let ts = Utc
.timestamp_micros(s.ts_micros)
.single()
.map(|d| d.to_rfc3339())
.unwrap_or_default();
out.push_str(&format!(
"{mark} {repo} run {run} [{ts}]\n {p} passed · {f} failed · {ig} ignored · {st} stalled ({tot} total)\n",
repo = s.repo,
run = short_run(&s.run_id),
p = s.passed,
f = s.failed,
ig = s.ignored,
st = s.stalled,
tot = s.total(),
));
if let Some(cases) = by_run.get(&s.run_id) {
for c in cases.iter().filter(|c| status::is_red(&c.status)) {
let detail = if c.message.is_empty() { String::new() } else { format!(" — {}", c.message) };
out.push_str(&format!(
" ✗ {}::{} {}{}\n",
c.suite, c.test_name, c.status, detail
));
}
}
out.push('\n');
}
out
}
pub fn short_run(run_id: &str) -> String {
if run_id.len() > 12 {
format!("{}…", &run_id[..8])
} else {
run_id.to_string()
}
}
pub fn new_run_id() -> String {
Uuid::new_v4().to_string()
}
fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
b.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("test_results col {idx} is not StringArray"))
}
fn col_f64<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a Float64Array> {
b.column(idx)
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| anyhow!("test_results col {idx} is not Float64Array"))
}
fn col_ts<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a TimestampMicrosecondArray> {
b.column(idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| anyhow!("test_results col {idx} is not TimestampMicrosecondArray"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn append_query_round_trip_groups_and_counts() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let nornir_rows = vec![
TestResultRow {
run_id: "runA".into(), repo: "nornir".into(), suite: "nornir".into(),
test_name: "a::passes".into(), status: status::PASS.into(),
duration_ms: 12.0, ts_micros: 100, message: String::new(),
},
TestResultRow {
run_id: "runA".into(), repo: "nornir".into(), suite: "nornir".into(),
test_name: "a::fails".into(), status: status::FAIL.into(),
duration_ms: 3.0, ts_micros: 100, message: "assert left == right".into(),
},
TestResultRow {
run_id: "runA".into(), repo: "nornir".into(), suite: "nornir".into(),
test_name: "a::skipped".into(), status: status::IGNORED.into(),
duration_ms: 0.0, ts_micros: 100, message: String::new(),
},
TestResultRow {
run_id: "runB".into(), repo: "nornir".into(), suite: "nornir".into(),
test_name: "b::hangs".into(), status: status::STALLED.into(),
duration_ms: 120_000.0, ts_micros: 200, message: "no output for 120s".into(),
},
];
let znippy_rows = vec![TestResultRow {
run_id: "runZ".into(), repo: "znippy".into(), suite: "znippy".into(),
test_name: "z::ok".into(), status: status::PASS.into(),
duration_ms: 1.0, ts_micros: 50, message: String::new(),
}];
wh.block_on(append_test_results(&wh, &nornir_rows)).unwrap();
wh.block_on(append_test_results(&wh, &znippy_rows)).unwrap();
let a = wh
.block_on(query_test_results(&wh, &TestSelector::Run("runA".into())))
.unwrap();
assert_eq!(a.len(), 3);
let sum = summarize_runs(&a);
assert_eq!(sum.len(), 1);
assert_eq!((sum[0].passed, sum[0].failed, sum[0].ignored, sum[0].stalled), (1, 1, 1, 0));
assert!(!sum[0].green(), "runA has a failing case");
assert_eq!(sum[0].total(), 3);
let fail = a.iter().find(|r| r.test_name == "a::fails").unwrap();
assert_eq!(fail.duration_ms, 3.0);
assert_eq!(fail.message, "assert left == right");
let nornir = wh
.block_on(query_test_results(&wh, &TestSelector::Repo("nornir".into())))
.unwrap();
assert_eq!(nornir.len(), 4);
let runs = summarize_runs(&nornir);
assert_eq!(runs.len(), 2);
assert_eq!(runs[0].run_id, "runB");
assert_eq!(runs[0].stalled, 1);
assert!(!runs[0].green());
let all = wh.block_on(query_test_results(&wh, &TestSelector::All)).unwrap();
assert_eq!(all.len(), 5);
let json = rows_to_json(&a);
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.as_array().unwrap().len(), 3);
let matrix = render_matrix(&nornir);
assert!(matrix.contains("b::hangs"), "stalled case shown: {matrix}");
assert!(matrix.contains("a::fails"), "failed case shown");
}
#[test]
fn status_red_classification() {
assert!(status::is_red(status::FAIL));
assert!(status::is_red(status::STALLED));
assert!(!status::is_red(status::PASS));
assert!(!status::is_red(status::IGNORED));
}
}