use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use arrow::array::{
Array, BooleanArray, Float64Array, Int32Array, RecordBatch, StringArray,
TimestampMicrosecondArray,
};
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::io::LocalFsStorageFactory;
use iceberg::spec::DataFileFormat;
use iceberg::table::Table;
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
use nornir_catalog::{RedbCatalog, RedbCatalogBuilder};
use parquet::file::properties::WriterProperties;
use tokio::runtime::Runtime;
use uuid::Uuid;
use super::iceberg_schema;
use super::{BenchFilter, Warehouse};
use crate::bench::{BenchResult, BenchRun, TestOutcome};
const NAMESPACE: &str = "nornir";
const TABLE_BENCH_RUNS: &str = "bench_runs";
const TABLE_BENCH_RESULTS: &str = "bench_results";
const TABLE_TEST_OUTCOMES: &str = "test_outcomes";
pub(crate) const TABLE_DEP_GRAPH_EDGES: &str = "dep_graph_edges";
pub(crate) const TABLE_RELEASE_LINEAGE: &str = "release_lineage";
pub(crate) const TABLE_FUNNEL_EVENTS: &str = "funnel_events";
pub(crate) const TABLE_TANTIVY_INDEX_SNAPSHOTS: &str = "tantivy_index_snapshots";
pub(crate) const TABLE_TANTIVY_INDEX_BLOBS: &str = "tantivy_index_blobs";
pub(crate) const TABLE_DWARF_SNAPSHOTS: &str = "dwarf_snapshots";
pub(crate) const TABLE_DWARF_BLOBS: &str = "dwarf_blobs";
pub(crate) const TABLE_GIMLI_SNAPSHOTS: &str = "gimli_snapshots";
pub(crate) const TABLE_GIMLI_BLOBS: &str = "gimli_blobs";
pub struct IcebergWarehouse {
catalog: Arc<RedbCatalog>,
rt: Option<Runtime>,
namespace: NamespaceIdent,
}
impl Drop for IcebergWarehouse {
fn drop(&mut self) {
if let Some(rt) = self.rt.take() {
rt.shutdown_background();
}
}
}
impl IcebergWarehouse {
pub fn open(root: &Path) -> Result<Self> {
std::fs::create_dir_all(root)
.with_context(|| format!("create warehouse root {}", root.display()))?;
let warehouse_dir = root.join("warehouse");
std::fs::create_dir_all(&warehouse_dir)
.with_context(|| format!("create warehouse data dir {}", warehouse_dir.display()))?;
let db_path = root.join("catalog.redb");
let warehouse_uri = format!("file://{}", warehouse_dir.canonicalize()?.display());
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("build tokio runtime")?;
let catalog = rt.block_on(async {
RedbCatalogBuilder::default()
.db_path(&db_path)
.warehouse_location(&warehouse_uri)
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load("nornir", HashMap::new())
.await
})?;
let catalog = Arc::new(catalog);
let namespace = NamespaceIdent::new(NAMESPACE.to_string());
rt.block_on(ensure_layout(&catalog, &namespace))?;
Ok(Self { catalog, rt: Some(rt), namespace })
}
pub(crate) fn table_ident(&self, name: &str) -> TableIdent {
TableIdent::new(self.namespace.clone(), name.to_string())
}
}
impl IcebergWarehouse {
pub fn catalog(&self) -> &Arc<RedbCatalog> {
&self.catalog
}
pub fn block_on<F: std::future::Future>(&self, fut: F) -> F::Output {
self.rt.as_ref().expect("rt present").block_on(fut)
}
}
async fn ensure_layout(catalog: &RedbCatalog, ns: &NamespaceIdent) -> Result<()> {
if !catalog.namespace_exists(ns).await? {
catalog.create_namespace(ns, HashMap::new()).await?;
}
create_table_if_missing(
catalog,
ns,
TABLE_BENCH_RUNS,
iceberg_schema::bench_runs()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_BENCH_RESULTS,
iceberg_schema::bench_results()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_TEST_OUTCOMES,
iceberg_schema::test_outcomes()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_DEP_GRAPH_EDGES,
iceberg_schema::dep_graph_edges()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_RELEASE_LINEAGE,
iceberg_schema::release_lineage()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_FUNNEL_EVENTS,
iceberg_schema::funnel_events()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_TANTIVY_INDEX_SNAPSHOTS,
iceberg_schema::artifact_snapshots()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_TANTIVY_INDEX_BLOBS,
iceberg_schema::artifact_blobs()?,
)
.await?;
for (snap_name, blob_name) in [
(TABLE_DWARF_SNAPSHOTS, TABLE_DWARF_BLOBS),
(TABLE_GIMLI_SNAPSHOTS, TABLE_GIMLI_BLOBS),
] {
create_table_if_missing(catalog, ns, snap_name, iceberg_schema::artifact_snapshots()?).await?;
create_table_if_missing(catalog, ns, blob_name, iceberg_schema::artifact_blobs()?).await?;
}
Ok(())
}
async fn create_table_if_missing(
catalog: &RedbCatalog,
ns: &NamespaceIdent,
name: &str,
schema: iceberg::spec::Schema,
) -> Result<()> {
let ident = TableIdent::new(ns.clone(), name.to_string());
if catalog.table_exists(&ident).await? {
return Ok(());
}
let creation = TableCreation::builder()
.name(name.to_string())
.schema(schema)
.build();
catalog.create_table(ns, creation).await?;
Ok(())
}
impl IcebergWarehouse {
pub async fn append_bench_run_async(&self, repo: &str, run: &BenchRun) -> Result<Uuid> {
if run.machine.trim().is_empty() {
anyhow::bail!("BenchRun.machine is required");
}
let run_id = Uuid::new_v4();
let ts = resolve_timestamp(run)?;
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RUNS)).await?;
let batch = build_bench_runs_batch(&table, run_id, repo, ts, run)?;
append_batch(&self.catalog, table, batch).await?;
let results_batch_opt = {
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RESULTS)).await?;
let batch = build_bench_results_batch(&table, run_id, run)?;
if batch.num_rows() > 0 { Some((table, batch)) } else { None }
};
if let Some((table, batch)) = results_batch_opt {
append_batch(&self.catalog, table, batch).await?;
}
let tests_batch_opt = {
let table = self.catalog.load_table(&self.table_ident(TABLE_TEST_OUTCOMES)).await?;
let batch = build_test_outcomes_batch(&table, run_id, run)?;
if batch.num_rows() > 0 { Some((table, batch)) } else { None }
};
if let Some((table, batch)) = tests_batch_opt {
append_batch(&self.catalog, table, batch).await?;
}
Ok(run_id)
}
}
impl Warehouse for IcebergWarehouse {
fn append_bench_run(&self, repo: &str, run: &BenchRun) -> Result<Uuid> {
self.rt.as_ref().expect("rt present").block_on(self.append_bench_run_async(repo, run))
}
fn query_bench_runs(&self, filter: &BenchFilter) -> Result<Vec<BenchRun>> {
self.rt.as_ref().expect("rt present").block_on(async {
let mut by_id: std::collections::HashMap<Uuid, BenchRun> =
std::collections::HashMap::new();
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RUNS)).await?;
for batch in scan_all(&table).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let repos = downcast::<StringArray>(&batch, 1)?;
let ts = downcast::<TimestampMicrosecondArray>(&batch, 2)?;
let dates = downcast::<StringArray>(&batch, 3)?;
let versions = downcast::<StringArray>(&batch, 4)?;
let machines = downcast::<StringArray>(&batch, 5)?;
let cores = downcast::<Int32Array>(&batch, 6)?;
for i in 0..batch.num_rows() {
if let Some(want) = &filter.repo {
if repos.value(i) != want { continue; }
}
if let Some(want) = &filter.machine {
if machines.value(i) != want { continue; }
}
let uid = Uuid::parse_str(ids.value(i))?;
let ts_dt = Utc.timestamp_micros(ts.value(i)).single()
.context("invalid micro timestamp")?;
by_id.insert(uid, BenchRun {
date: dates.value(i).to_string(),
timestamp: Some(ts_dt.to_rfc3339()),
version: versions.value(i).to_string(),
machine: machines.value(i).to_string(),
cores: cores.value(i).max(0) as u32,
results: Vec::new(),
tests: Vec::new(),
});
}
}
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RESULTS)).await?;
for batch in scan_all(&table).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let names = downcast::<StringArray>(&batch, 1)?;
let metrics = downcast::<StringArray>(&batch, 2)?;
let values = downcast::<Float64Array>(&batch, 3)?;
for i in 0..batch.num_rows() {
let uid = Uuid::parse_str(ids.value(i))?;
if let Some(run) = by_id.get_mut(&uid) {
let name = names.value(i).to_string();
let entry = run.results.iter_mut().find(|r| r.name == name);
let target = if let Some(e) = entry {
e
} else {
run.results.push(BenchResult { name, metrics: Default::default() });
run.results.last_mut().unwrap()
};
target.metrics.insert(
metrics.value(i).to_string(),
serde_json::Value::from(values.value(i)),
);
}
}
}
let table = self.catalog.load_table(&self.table_ident(TABLE_TEST_OUTCOMES)).await?;
for batch in scan_all(&table).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let names = downcast::<StringArray>(&batch, 1)?;
let passed = downcast::<BooleanArray>(&batch, 2)?;
let durations = downcast::<Float64Array>(&batch, 3)?;
let messages = downcast::<StringArray>(&batch, 4)?;
for i in 0..batch.num_rows() {
let uid = Uuid::parse_str(ids.value(i))?;
if let Some(run) = by_id.get_mut(&uid) {
run.tests.push(TestOutcome {
name: names.value(i).to_string(),
passed: passed.value(i),
duration_ms: if durations.is_null(i) { None } else { Some(durations.value(i)) },
message: if messages.is_null(i) { None } else { Some(messages.value(i).to_string()) },
});
}
}
}
let mut out: Vec<BenchRun> = by_id.into_values().collect();
out.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
if let Some(n) = filter.limit {
let drop_n = out.len().saturating_sub(n);
out.drain(..drop_n);
}
anyhow::Ok(out)
})
}
}
pub(crate) async fn append_batch(catalog: &RedbCatalog, table: Table, batch: RecordBatch) -> Result<()> {
let parquet_builder =
ParquetWriterBuilder::new(WriterProperties::builder().build(), table.metadata().current_schema().clone());
let location_gen = DefaultLocationGenerator::new(table.metadata().clone())?;
let file_name_gen = DefaultFileNameGenerator::new(
"nornir".to_string(),
Some(Uuid::new_v4().to_string()),
DataFileFormat::Parquet,
);
let rolling = RollingFileWriterBuilder::new_with_default_file_size(
parquet_builder,
table.file_io().clone(),
location_gen,
file_name_gen,
);
let mut writer = DataFileWriterBuilder::new(rolling).build(None).await?;
writer.write(batch).await?;
let data_files = writer.close().await?;
if data_files.is_empty() {
return Err(anyhow!("append_batch: writer produced 0 data files"));
}
let tx = Transaction::new(&table);
let action = tx.fast_append().add_data_files(data_files);
let tx = action.apply(tx)?;
tx.commit(catalog).await?;
Ok(())
}
async fn scan_all(table: &Table) -> Result<Vec<RecordBatch>> {
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
Ok(batches)
}
fn downcast<T: 'static>(batch: &RecordBatch, idx: usize) -> Result<&T> {
batch
.column(idx)
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("column {idx} has unexpected type {:?}", batch.column(idx).data_type()))
}
fn resolve_timestamp(run: &BenchRun) -> Result<DateTime<Utc>> {
if let Some(s) = &run.timestamp {
return DateTime::parse_from_rfc3339(s)
.map(|dt| dt.with_timezone(&Utc))
.with_context(|| format!("parse timestamp {s}"));
}
let nd = NaiveDate::parse_from_str(&run.date, "%Y-%m-%d")
.with_context(|| format!("parse date {}", run.date))?;
let ndt: NaiveDateTime = nd.and_hms_opt(0, 0, 0).unwrap();
Ok(Utc.from_utc_datetime(&ndt))
}
fn build_bench_runs_batch(
table: &Table,
run_id: Uuid,
repo: &str,
ts: DateTime<Utc>,
run: &BenchRun,
) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![run_id.to_string()])),
Arc::new(StringArray::from(vec![repo.to_string()])),
Arc::new(TimestampMicrosecondArray::from(vec![ts.timestamp_micros()]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![run.date.clone()])),
Arc::new(StringArray::from(vec![run.version.clone()])),
Arc::new(StringArray::from(vec![run.machine.clone()])),
Arc::new(Int32Array::from(vec![run.cores as i32])),
];
Ok(RecordBatch::try_new(s, cols)?)
}
fn build_bench_results_batch(table: &Table, run_id: Uuid, run: &BenchRun) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let id_str = run_id.to_string();
let mut ids = Vec::new();
let mut names = Vec::new();
let mut metrics = Vec::new();
let mut values = Vec::new();
for r in &run.results {
for (k, v) in &r.metrics {
let f = match v {
serde_json::Value::Number(n) => n.as_f64(),
_ => continue,
};
if let Some(f) = f {
ids.push(id_str.clone());
names.push(r.name.clone());
metrics.push(k.clone());
values.push(f);
}
}
}
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(StringArray::from(metrics)),
Arc::new(Float64Array::from(values)),
];
Ok(RecordBatch::try_new(s, cols)?)
}
fn build_test_outcomes_batch(table: &Table, run_id: Uuid, run: &BenchRun) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let id_str = run_id.to_string();
let ids: Vec<String> = run.tests.iter().map(|_| id_str.clone()).collect();
let names: Vec<String> = run.tests.iter().map(|t| t.name.clone()).collect();
let passed: Vec<bool> = run.tests.iter().map(|t| t.passed).collect();
let durations: Vec<Option<f64>> = run.tests.iter().map(|t| t.duration_ms).collect();
let messages: Vec<Option<String>> = run.tests.iter().map(|t| t.message.clone()).collect();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(BooleanArray::from(passed)),
Arc::new(Float64Array::from(durations)),
Arc::new(StringArray::from(messages)),
];
Ok(RecordBatch::try_new(s, cols)?)
}
#[allow(dead_code)]
fn _unused_imports_anchor() {}
#[cfg(test)]
mod tests {
use super::*;
use crate::bench::{BenchResult, BenchRun, TestOutcome};
fn sample_run(machine: &str, ops: f64) -> BenchRun {
let mut metrics = serde_json::Map::new();
metrics.insert("ops_sec".into(), serde_json::json!(ops));
BenchRun {
date: "2026-05-30".into(),
timestamp: Some("2026-05-30T21:00:00Z".into()),
version: "0.1.0".into(),
machine: machine.into(),
cores: 32,
results: vec![BenchResult { name: "x".into(), metrics }],
tests: vec![TestOutcome { name: "smoke".into(), passed: true, duration_ms: Some(1.5), message: None }],
}
}
#[test]
fn roundtrip_bench_run() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let _id = wh.append_bench_run("holger", &sample_run("ryzen", 123.0)).unwrap();
let runs = wh.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
assert_eq!(runs.len(), 1, "expected exactly one bench run");
let r = &runs[0];
assert_eq!(r.machine, "ryzen");
assert_eq!(r.cores, 32);
assert_eq!(r.results.len(), 1);
assert_eq!(r.results[0].name, "x");
let ops = r.results[0].metrics.get("ops_sec").unwrap().as_f64().unwrap();
assert!((ops - 123.0).abs() < 1e-9);
assert_eq!(r.tests.len(), 1);
assert!(r.tests[0].passed);
assert_eq!(r.tests[0].duration_ms, Some(1.5));
}
#[test]
fn filter_by_machine() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 1.0)).unwrap();
wh.append_bench_run("holger", &sample_run("threadripper", 2.0)).unwrap();
let filter = BenchFilter {
repo: Some("holger".into()),
machine: Some("threadripper".into()),
limit: None,
};
let runs = wh.query_bench_runs(&filter).unwrap();
assert_eq!(runs.len(), 1);
assert_eq!(runs[0].machine, "threadripper");
}
#[test]
fn reopen_sees_previous_data() {
let dir = tempfile::tempdir().unwrap();
{
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 7.0)).unwrap();
}
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let runs = wh.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
assert_eq!(runs.len(), 1);
}
}