use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{
Array, Float64Array, RecordBatch, StringArray, TimestampMicrosecondArray,
};
use futures::TryStreamExt;
use iceberg::Catalog;
use iceberg::arrow::schema_to_arrow_schema;
use super::iceberg::{IcebergWarehouse, TABLE_TEST_RESULTS, append_batch, ensure_table_schema};
pub use nornir_testmatrix::model::{
new_run_id, render_matrix, rows_to_json, short_run, status, summarize_runs, RunSummary,
TestResultRow, TestSelector, ASPECT_UNIT,
};
pub use nornir_testmatrix::sink::TestSink;
const COL_RUN_ID: usize = 0;
const COL_REPO: usize = 1;
const COL_SUITE: usize = 2;
const COL_TEST_NAME: usize = 3;
const COL_STATUS: usize = 4;
const COL_DURATION_MS: usize = 5;
const COL_TS_MICROS: usize = 6;
const COL_MESSAGE: usize = 7;
const COL_ASPECT: usize = 8;
const COL_METRIC: usize = 9;
pub async fn append_test_results(wh: &IcebergWarehouse, rows: &[TestResultRow]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let ident = wh.table_ident(TABLE_TEST_RESULTS);
let table = wh.catalog().load_table(&ident).await?;
let table = ensure_table_schema(
wh.catalog(),
&ident,
table,
&super::iceberg_schema::test_results()?,
)
.await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.repo.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.suite.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.test_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.status.clone()).collect::<Vec<_>>())),
Arc::new(Float64Array::from(rows.iter().map(|r| r.duration_ms).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(rows.iter().map(|r| r.message.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(
rows.iter()
.map(|r| if r.aspect.is_empty() { ASPECT_UNIT.to_string() } else { r.aspect.clone() })
.collect::<Vec<_>>(),
)),
Arc::new(Float64Array::from(rows.iter().map(|r| r.metric).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
pub async fn query_test_results(
wh: &IcebergWarehouse,
sel: &TestSelector,
) -> Result<Vec<TestResultRow>> {
let table = wh.catalog().load_table(&wh.table_ident(TABLE_TEST_RESULTS)).await?;
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
let mut out: Vec<TestResultRow> = Vec::new();
for b in &batches {
let run_id = col_str(b, COL_RUN_ID)?;
let repo = col_str(b, COL_REPO)?;
let suite = col_str(b, COL_SUITE)?;
let test_name = col_str(b, COL_TEST_NAME)?;
let st = col_str(b, COL_STATUS)?;
let dur = col_f64(b, COL_DURATION_MS)?;
let ts = col_ts(b, COL_TS_MICROS)?;
let msg = col_str(b, COL_MESSAGE)?;
let aspect = opt_col_str(b, COL_ASPECT);
let metric = opt_col_f64(b, COL_METRIC);
for i in 0..b.num_rows() {
let row = TestResultRow {
run_id: run_id.value(i).to_string(),
repo: repo.value(i).to_string(),
suite: suite.value(i).to_string(),
test_name: test_name.value(i).to_string(),
status: st.value(i).to_string(),
duration_ms: dur.value(i),
ts_micros: ts.value(i),
message: msg.value(i).to_string(),
aspect: aspect
.filter(|a| !a.is_null(i))
.map(|a| a.value(i).to_string())
.unwrap_or_else(|| ASPECT_UNIT.to_string()),
metric: metric.filter(|m| !m.is_null(i)).map(|m| m.value(i)).unwrap_or(0.0),
};
let keep = match sel {
TestSelector::Run(id) => &row.run_id == id,
TestSelector::Repo(r) => &row.repo == r,
TestSelector::All => true,
};
if keep {
out.push(row);
}
}
}
out.sort_by_key(|r| r.key());
Ok(out)
}
pub struct IcebergTestSink<'a> {
wh: &'a IcebergWarehouse,
}
impl<'a> IcebergTestSink<'a> {
pub fn new(wh: &'a IcebergWarehouse) -> Self {
Self { wh }
}
}
impl TestSink for IcebergTestSink<'_> {
fn append(&self, rows: &[TestResultRow]) -> Result<()> {
self.wh.block_on(append_test_results(self.wh, rows))
}
}
fn col_str<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a StringArray> {
b.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("test_results col {idx} is not StringArray"))
}
fn col_f64<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a Float64Array> {
b.column(idx)
.as_any()
.downcast_ref::<Float64Array>()
.ok_or_else(|| anyhow!("test_results col {idx} is not Float64Array"))
}
fn col_ts<'a>(b: &'a RecordBatch, idx: usize) -> Result<&'a TimestampMicrosecondArray> {
b.column(idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| anyhow!("test_results col {idx} is not TimestampMicrosecondArray"))
}
fn opt_col_str(b: &RecordBatch, idx: usize) -> Option<&StringArray> {
if idx >= b.num_columns() {
return None;
}
b.column(idx).as_any().downcast_ref::<StringArray>()
}
fn opt_col_f64(b: &RecordBatch, idx: usize) -> Option<&Float64Array> {
if idx >= b.num_columns() {
return None;
}
b.column(idx).as_any().downcast_ref::<Float64Array>()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn append_query_round_trip_groups_and_counts() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let nornir_rows = vec![
TestResultRow::unit("runA", "nornir", "nornir", "a::passes", status::PASS, 12.0, 100, ""),
TestResultRow::unit("runA", "nornir", "nornir", "a::fails", status::FAIL, 3.0, 100, "assert left == right"),
TestResultRow::unit("runA", "nornir", "nornir", "a::skipped", status::IGNORED, 0.0, 100, ""),
TestResultRow::unit("runB", "nornir", "nornir", "b::hangs", status::STALLED, 120_000.0, 200, "no output for 120s"),
TestResultRow {
run_id: "runB".into(), repo: "nornir".into(), suite: "nornir".into(),
test_name: "clippy".into(), status: status::FAIL.into(),
duration_ms: 50.0, ts_micros: 200, message: "4 clippy warning(s)".into(),
aspect: "clippy".into(), metric: 4.0,
},
];
let znippy_rows = vec![TestResultRow::unit(
"runZ", "znippy", "znippy", "z::ok", status::PASS, 1.0, 50, "",
)];
wh.block_on(append_test_results(&wh, &nornir_rows)).unwrap();
wh.block_on(append_test_results(&wh, &znippy_rows)).unwrap();
let a = wh
.block_on(query_test_results(&wh, &TestSelector::Run("runA".into())))
.unwrap();
assert_eq!(a.len(), 3);
let sum = summarize_runs(&a);
assert_eq!(sum.len(), 1);
assert_eq!((sum[0].passed, sum[0].failed, sum[0].ignored, sum[0].stalled), (1, 1, 1, 0));
assert!(!sum[0].green(), "runA has a failing case");
assert_eq!(sum[0].total(), 3);
let fail = a.iter().find(|r| r.test_name == "a::fails").unwrap();
assert_eq!(fail.duration_ms, 3.0);
assert_eq!(fail.message, "assert left == right");
assert_eq!(fail.aspect, ASPECT_UNIT, "unit rows default the aspect");
assert_eq!(fail.metric, 0.0);
let nornir = wh
.block_on(query_test_results(&wh, &TestSelector::Repo("nornir".into())))
.unwrap();
assert_eq!(nornir.len(), 5);
let clippy = nornir.iter().find(|r| r.test_name == "clippy").unwrap();
assert_eq!(clippy.aspect, "clippy");
assert_eq!(clippy.metric, 4.0);
let runs = summarize_runs(&nornir);
assert_eq!(runs.len(), 2);
assert_eq!(runs[0].run_id, "runB");
assert_eq!(runs[0].stalled, 1);
assert!(!runs[0].green());
let all = wh.block_on(query_test_results(&wh, &TestSelector::All)).unwrap();
assert_eq!(all.len(), 6);
let json = rows_to_json(&a);
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.as_array().unwrap().len(), 3);
let matrix = render_matrix(&nornir);
assert!(matrix.contains("b::hangs"), "stalled case shown: {matrix}");
assert!(matrix.contains("clippy"), "clippy aspect failure shown");
}
#[test]
fn iceberg_sink_appends_via_trait() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let sink = IcebergTestSink::new(&wh);
let rows = vec![TestResultRow {
run_id: "rS".into(), repo: "leaf".into(), suite: "leaf".into(),
test_name: "coverage".into(), status: status::PASS.into(),
duration_ms: 9.0, ts_micros: 300, message: "91.3% line coverage".into(),
aspect: "coverage".into(), metric: 91.3,
}];
sink.append(&rows).unwrap();
let back = wh
.block_on(query_test_results(&wh, &TestSelector::Run("rS".into())))
.unwrap();
assert_eq!(back.len(), 1);
assert_eq!(back[0].aspect, "coverage");
assert_eq!(back[0].metric, 91.3);
assert_eq!(back[0].status, status::PASS);
}
fn legacy_8_col_schema() -> iceberg::spec::Schema {
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
Schema::builder()
.with_schema_id(0)
.with_fields(vec![
Arc::new(NestedField::required(1, "run_id", Type::Primitive(PrimitiveType::String))),
Arc::new(NestedField::required(2, "repo", Type::Primitive(PrimitiveType::String))),
Arc::new(NestedField::required(3, "suite", Type::Primitive(PrimitiveType::String))),
Arc::new(NestedField::required(4, "test_name", Type::Primitive(PrimitiveType::String))),
Arc::new(NestedField::required(5, "status", Type::Primitive(PrimitiveType::String))),
Arc::new(NestedField::required(6, "duration_ms", Type::Primitive(PrimitiveType::Double))),
Arc::new(NestedField::required(7, "ts_micros", Type::Primitive(PrimitiveType::Timestamptz))),
Arc::new(NestedField::required(8, "message", Type::Primitive(PrimitiveType::String))),
])
.build()
.unwrap()
}
#[test]
fn stale_8col_table_evolves_to_10_on_append() {
use iceberg::Catalog;
use iceberg::spec::{PartitionSpec, Transform};
use iceberg::TableCreation;
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let ident = wh.table_ident(TABLE_TEST_RESULTS);
wh.block_on(async {
let cat = wh.catalog();
cat.drop_table(&ident).await.unwrap();
let schema = legacy_8_col_schema();
let spec = PartitionSpec::builder(Arc::new(schema.clone()))
.add_partition_field("repo", "repo", Transform::Identity)
.unwrap()
.build()
.unwrap()
.into_unbound();
let creation = TableCreation::builder()
.name(ident.name().to_string())
.schema(schema)
.partition_spec(spec)
.build();
cat.create_table(ident.namespace(), creation).await.unwrap();
let t = cat.load_table(&ident).await.unwrap();
assert_eq!(t.metadata().current_schema().as_struct().fields().len(), 8);
});
let rows = vec![TestResultRow {
run_id: "evo".into(), repo: "nornir".into(), suite: "nornir".into(),
test_name: "clippy".into(), status: status::FAIL.into(),
duration_ms: 50.0, ts_micros: 200, message: "4 warnings".into(),
aspect: "clippy".into(), metric: 4.0,
}];
wh.block_on(append_test_results(&wh, &rows)).unwrap();
wh.block_on(async {
let t = wh.catalog().load_table(&ident).await.unwrap();
let names: Vec<&str> = t
.metadata().current_schema().as_struct().fields()
.iter().map(|f| f.name.as_str()).collect();
assert_eq!(names.len(), 10, "schema evolved 8 → 10 columns");
assert!(names.contains(&"aspect"));
assert!(names.contains(&"metric"));
});
let got = wh.block_on(query_test_results(&wh, &TestSelector::Run("evo".into()))).unwrap();
assert_eq!(got.len(), 1);
assert_eq!(got[0].aspect, "clippy", "evolved aspect reads back");
assert_eq!(got[0].metric, 4.0, "evolved metric reads back");
}
#[test]
fn status_red_classification() {
assert!(status::is_red(status::FAIL));
assert!(status::is_red(status::STALLED));
assert!(!status::is_red(status::PASS));
assert!(!status::is_red(status::IGNORED));
assert!(!status::is_red(status::SKIP));
}
}