use std::sync::Arc;
use anyhow::{anyhow, Result};
use arrow::array::{Array, Int64Array, RecordBatch, StringArray, TimestampMicrosecondArray};
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::Catalog;
use super::iceberg::{
append_batch, ensure_table_schema, IcebergWarehouse, TABLE_COVERAGE, TABLE_COVERAGE_FN,
};
use crate::coverage::{Boundary, CoverageReport, CrateCoverage, FileCoverage, FnCoverage};
#[derive(Debug, Clone, PartialEq)]
pub struct CoverageRow {
pub run_id: String,
pub ts_micros: i64,
pub repo: String,
pub scope: String,
pub krate: String,
pub file: String,
pub lines: i64,
pub lines_covered: i64,
pub regions: i64,
pub regions_covered: i64,
}
impl CoverageRow {
pub fn line_pct(&self) -> f64 {
crate::coverage::pct_i64(self.lines_covered, self.lines)
}
pub fn region_pct(&self) -> f64 {
crate::coverage::pct_i64(self.regions_covered, self.regions)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct CoverageFnRow {
pub run_id: String,
pub ts_micros: i64,
pub repo: String,
pub name: String,
pub file: String,
pub boundary: String,
pub lines: i64,
pub lines_covered: i64,
pub regions: i64,
pub regions_covered: i64,
}
impl CoverageFnRow {
pub fn exercised(&self) -> bool {
self.regions_covered > 0 || self.lines_covered > 0
}
pub fn is_boundary(&self) -> bool {
self.boundary != Boundary::Core.as_str()
}
}
pub fn rows_for(
report: &CoverageReport,
run_id: &str,
repo: &str,
ts_micros: i64,
) -> (Vec<CoverageRow>, Vec<CoverageFnRow>) {
let mut rows = Vec::new();
let mk = |scope: String, krate: String, file: String, l: u64, lc: u64, r: u64, rc: u64| {
CoverageRow {
run_id: run_id.to_string(),
ts_micros,
repo: repo.to_string(),
scope,
krate,
file,
lines: l as i64,
lines_covered: lc as i64,
regions: r as i64,
regions_covered: rc as i64,
}
};
rows.push(mk(
"overall".into(),
String::new(),
String::new(),
report.lines,
report.lines_covered,
report.regions,
report.regions_covered,
));
for c in &report.crates {
let CrateCoverage { krate, lines, lines_covered, regions, regions_covered, .. } = c;
rows.push(mk(
format!("crate:{krate}"),
krate.clone(),
String::new(),
*lines,
*lines_covered,
*regions,
*regions_covered,
));
}
for f in &report.files {
let FileCoverage { file, krate, lines, lines_covered, regions, regions_covered, .. } = f;
rows.push(mk(
format!("file:{file}"),
krate.clone(),
file.clone(),
*lines,
*lines_covered,
*regions,
*regions_covered,
));
}
let mut fn_rows = Vec::new();
for f in &report.files {
for fnc in &f.functions {
let FnCoverage {
name, file, lines, lines_covered, regions, regions_covered, boundary,
} = fnc;
fn_rows.push(CoverageFnRow {
run_id: run_id.to_string(),
ts_micros,
repo: repo.to_string(),
name: name.clone(),
file: file.clone(),
boundary: boundary.as_str().to_string(),
lines: *lines as i64,
lines_covered: *lines_covered as i64,
regions: *regions as i64,
regions_covered: *regions_covered as i64,
});
}
}
(rows, fn_rows)
}
pub async fn append_coverage(wh: &IcebergWarehouse, rows: &[CoverageRow]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let ident = wh.table_ident(TABLE_COVERAGE);
let table = wh.catalog().load_table(&ident).await?;
let table =
ensure_table_schema(wh.catalog(), &ident, table, &super::iceberg_schema::coverage()?).await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(rows.iter().map(|r| r.repo.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.scope.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.krate.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.lines).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.lines_covered).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.regions).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.regions_covered).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
pub async fn append_coverage_fn(wh: &IcebergWarehouse, rows: &[CoverageFnRow]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let ident = wh.table_ident(TABLE_COVERAGE_FN);
let table = wh.catalog().load_table(&ident).await?;
let table = ensure_table_schema(
wh.catalog(),
&ident,
table,
&super::iceberg_schema::coverage_fn()?,
)
.await?;
let arrow_schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(rows.iter().map(|r| r.run_id.clone()).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(rows.iter().map(|r| r.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(rows.iter().map(|r| r.repo.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.boundary.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.lines).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.lines_covered).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.regions).collect::<Vec<_>>())),
Arc::new(Int64Array::from(rows.iter().map(|r| r.regions_covered).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(arrow_schema, cols)?;
append_batch(wh.catalog(), table, batch).await?;
Ok(())
}
pub async fn append_report(
wh: &IcebergWarehouse,
rows: &[CoverageRow],
fn_rows: &[CoverageFnRow],
) -> Result<()> {
append_coverage(wh, rows).await?;
append_coverage_fn(wh, fn_rows).await
}
pub async fn query_coverage(wh: &IcebergWarehouse, repo: &str) -> Result<Vec<CoverageRow>> {
let table = wh.catalog().load_table(&wh.table_ident(TABLE_COVERAGE)).await?;
let batches: Vec<RecordBatch> = skade::read_all(&table).await?;
let mut out = Vec::new();
for b in &batches {
let run_id = col_str(b, 0, TABLE_COVERAGE)?;
let ts = col_ts(b, 1, TABLE_COVERAGE)?;
let r = col_str(b, 2, TABLE_COVERAGE)?;
let scope = col_str(b, 3, TABLE_COVERAGE)?;
let krate = col_str(b, 4, TABLE_COVERAGE)?;
let file = col_str(b, 5, TABLE_COVERAGE)?;
let lines = col_i64(b, 6, TABLE_COVERAGE)?;
let lc = col_i64(b, 7, TABLE_COVERAGE)?;
let regions = col_i64(b, 8, TABLE_COVERAGE)?;
let rc = col_i64(b, 9, TABLE_COVERAGE)?;
for i in 0..b.num_rows() {
if r.value(i) != repo {
continue;
}
out.push(CoverageRow {
run_id: run_id.value(i).to_string(),
ts_micros: ts.value(i),
repo: r.value(i).to_string(),
scope: scope.value(i).to_string(),
krate: krate.value(i).to_string(),
file: file.value(i).to_string(),
lines: lines.value(i),
lines_covered: lc.value(i),
regions: regions.value(i),
regions_covered: rc.value(i),
});
}
}
out.sort_by(|a, b| (a.ts_micros, &a.scope).cmp(&(b.ts_micros, &b.scope)));
Ok(out)
}
pub async fn query_coverage_fn(wh: &IcebergWarehouse, repo: &str) -> Result<Vec<CoverageFnRow>> {
let table = wh.catalog().load_table(&wh.table_ident(TABLE_COVERAGE_FN)).await?;
let batches: Vec<RecordBatch> = skade::read_all(&table).await?;
let mut out = Vec::new();
for b in &batches {
let run_id = col_str(b, 0, TABLE_COVERAGE_FN)?;
let ts = col_ts(b, 1, TABLE_COVERAGE_FN)?;
let r = col_str(b, 2, TABLE_COVERAGE_FN)?;
let name = col_str(b, 3, TABLE_COVERAGE_FN)?;
let file = col_str(b, 4, TABLE_COVERAGE_FN)?;
let boundary = col_str(b, 5, TABLE_COVERAGE_FN)?;
let lines = col_i64(b, 6, TABLE_COVERAGE_FN)?;
let lc = col_i64(b, 7, TABLE_COVERAGE_FN)?;
let regions = col_i64(b, 8, TABLE_COVERAGE_FN)?;
let rc = col_i64(b, 9, TABLE_COVERAGE_FN)?;
for i in 0..b.num_rows() {
if r.value(i) != repo {
continue;
}
out.push(CoverageFnRow {
run_id: run_id.value(i).to_string(),
ts_micros: ts.value(i),
repo: r.value(i).to_string(),
name: name.value(i).to_string(),
file: file.value(i).to_string(),
boundary: boundary.value(i).to_string(),
lines: lines.value(i),
lines_covered: lc.value(i),
regions: regions.value(i),
regions_covered: rc.value(i),
});
}
}
out.sort_by(|a, b| (a.ts_micros, &a.boundary, &a.name).cmp(&(b.ts_micros, &b.boundary, &b.name)));
Ok(out)
}
pub async fn latest(
wh: &IcebergWarehouse,
repo: &str,
) -> Result<(Vec<CoverageRow>, Vec<CoverageFnRow>)> {
let all = query_coverage(wh, repo).await?;
let Some(latest_ts) = all.iter().map(|r| r.ts_micros).max() else {
return Ok((Vec::new(), Vec::new()));
};
let run_id = all
.iter()
.filter(|r| r.ts_micros == latest_ts)
.map(|r| r.run_id.clone())
.next()
.unwrap_or_default();
let rows: Vec<CoverageRow> = all.into_iter().filter(|r| r.run_id == run_id).collect();
let fns: Vec<CoverageFnRow> = query_coverage_fn(wh, repo)
.await?
.into_iter()
.filter(|r| r.run_id == run_id)
.collect();
Ok((rows, fns))
}
fn col_str<'a>(b: &'a RecordBatch, idx: usize, t: &str) -> Result<&'a StringArray> {
b.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("{t} col {idx} is not StringArray"))
}
fn col_i64<'a>(b: &'a RecordBatch, idx: usize, t: &str) -> Result<&'a Int64Array> {
b.column(idx)
.as_any()
.downcast_ref::<Int64Array>()
.ok_or_else(|| anyhow!("{t} col {idx} is not Int64Array"))
}
fn col_ts<'a>(b: &'a RecordBatch, idx: usize, t: &str) -> Result<&'a TimestampMicrosecondArray> {
b.column(idx)
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.ok_or_else(|| anyhow!("{t} col {idx} is not TimestampMicrosecondArray"))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coverage::{summarise, Boundary, FileCoverage, FnCoverage};
fn sample_report() -> CoverageReport {
let viz = FileCoverage {
file: "src/viz/test_tab.rs".into(),
krate: "nornir".into(),
lines: 10,
lines_covered: 6,
regions: 8,
regions_covered: 5,
functions: vec![
FnCoverage {
name: "nornir::viz::test_tab::TestTabState::ui".into(),
file: "src/viz/test_tab.rs".into(),
lines: 4, lines_covered: 4, regions: 4, regions_covered: 4,
boundary: Boundary::Ui,
},
FnCoverage {
name: "nornir::viz::test_tab::TestTabState::state_json".into(),
file: "src/viz/test_tab.rs".into(),
lines: 3, lines_covered: 2, regions: 3, regions_covered: 1,
boundary: Boundary::Emitter,
},
],
};
let core = FileCoverage {
file: "src/deps.rs".into(),
krate: "nornir".into(),
lines: 5, lines_covered: 5, regions: 5, regions_covered: 5,
functions: vec![FnCoverage {
name: "nornir::deps::topo_sort".into(),
file: "src/deps.rs".into(),
lines: 5, lines_covered: 5, regions: 5, regions_covered: 5,
boundary: Boundary::Core,
}],
};
summarise("nornir", vec![viz, core]).unwrap()
}
#[test]
fn append_query_round_trip_rollup_and_boundary() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let report = sample_report();
let (rows, fn_rows) = rows_for(&report, "run-1", "nornir", 1000);
assert_eq!(rows.len(), 4);
assert_eq!(fn_rows.len(), 3);
wh.block_on(append_report(&wh, &rows, &fn_rows)).unwrap();
let back = wh.block_on(query_coverage(&wh, "nornir")).unwrap();
assert_eq!(back.len(), 4);
let overall = back.iter().find(|r| r.scope == "overall").unwrap();
assert_eq!(overall.lines, 15, "10 + 5 lines");
assert_eq!(overall.lines_covered, 11, "6 + 5 covered");
assert!((overall.line_pct() - 11.0 / 15.0 * 100.0).abs() < 1e-9);
let vizf = back.iter().find(|r| r.scope == "file:src/viz/test_tab.rs").unwrap();
assert_eq!(vizf.krate, "nornir");
assert!((vizf.line_pct() - 60.0).abs() < 1e-9);
let fns = wh.block_on(query_coverage_fn(&wh, "nornir")).unwrap();
assert_eq!(fns.len(), 3);
let ui = fns.iter().find(|r| r.name.ends_with("::ui")).unwrap();
assert_eq!(ui.boundary, "ui");
assert!(ui.is_boundary());
assert!(ui.exercised(), "ui fn covered โ exercised");
let emit = fns.iter().find(|r| r.name.ends_with("::state_json")).unwrap();
assert_eq!(emit.boundary, "emitter");
let core = fns.iter().find(|r| r.name.ends_with("::topo_sort")).unwrap();
assert_eq!(core.boundary, "core");
assert!(!core.is_boundary());
let (lrows, lfns) = wh.block_on(latest(&wh, "nornir")).unwrap();
assert_eq!(lrows.len(), 4);
assert_eq!(lfns.len(), 3);
let (er, ef) = wh.block_on(latest(&wh, "ghost")).unwrap();
assert!(er.is_empty() && ef.is_empty());
}
#[test]
fn latest_picks_newest_run() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let report = sample_report();
let (a, af) = rows_for(&report, "old", "nornir", 1000);
wh.block_on(append_report(&wh, &a, &af)).unwrap();
let (b, bf) = rows_for(&report, "new", "nornir", 2000);
wh.block_on(append_report(&wh, &b, &bf)).unwrap();
let (lrows, _) = wh.block_on(latest(&wh, "nornir")).unwrap();
assert!(lrows.iter().all(|r| r.run_id == "new"), "newest run only");
}
}