use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use arrow::array::{
Array, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
TimestampMicrosecondArray,
};
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::io::LocalFsStorageFactory;
use iceberg::spec::{
DataFileFormat, Datum, Literal, PartitionKey, PartitionSpec, Struct, Transform,
UnboundPartitionSpec,
};
use iceberg::table::Table;
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
use nornir_catalog::{RedbCatalog, RedbCatalogBuilder};
use parquet::file::properties::WriterProperties;
use tokio::runtime::Runtime;
use uuid::Uuid;
use super::iceberg_schema;
use super::{BenchFilter, Warehouse};
use crate::bench::{BenchResult, BenchRun, TestOutcome};
const NAMESPACE: &str = "nornir";
const TABLE_BENCH_RUNS: &str = "bench_runs";
const TABLE_BENCH_RESULTS: &str = "bench_results";
const TABLE_TEST_OUTCOMES: &str = "test_outcomes";
pub(crate) const TABLE_DEP_GRAPH_EDGES: &str = "dep_graph_edges";
pub(crate) const TABLE_RELEASE_LINEAGE: &str = "release_lineage";
pub(crate) const TABLE_FUNNEL_EVENTS: &str = "funnel_events";
pub(crate) const TABLE_TANTIVY_INDEX_SNAPSHOTS: &str = "tantivy_index_snapshots";
pub(crate) const TABLE_TANTIVY_INDEX_BLOBS: &str = "tantivy_index_blobs";
pub(crate) const TABLE_DWARF_SNAPSHOTS: &str = "dwarf_snapshots";
pub(crate) const TABLE_DWARF_BLOBS: &str = "dwarf_blobs";
pub(crate) const TABLE_GIMLI_SNAPSHOTS: &str = "gimli_snapshots";
pub(crate) const TABLE_GIMLI_BLOBS: &str = "gimli_blobs";
pub(crate) const TABLE_RUSTDOC_JSON_SNAPSHOTS: &str = "rustdoc_json_snapshots";
pub(crate) const TABLE_RUSTDOC_JSON_BLOBS: &str = "rustdoc_json_blobs";
pub(crate) const TABLE_EMBEDDINGS: &str = "embeddings";
pub(crate) const TABLE_EMBEDDING_SNAPSHOTS: &str = "embedding_snapshots";
pub(crate) const TABLE_EMBEDDING_MANIFEST: &str = "embedding_manifest";
pub(crate) const TABLE_PATH_DEP_AUDITS: &str = "path_dep_audits";
pub(crate) const TABLE_PATCH_STRIP_EVENTS: &str = "patch_strip_events";
pub(crate) const TABLE_PUBLISH_ATTEMPTS: &str = "publish_attempts";
pub(crate) const TABLE_CRATE_METADATA_CHECKS: &str = "crate_metadata_checks";
pub(crate) const TABLE_CRATE_TARBALL_STATS: &str = "crate_tarball_stats";
pub(crate) const TABLE_YANK_EVENTS: &str = "yank_events";
pub(crate) const TABLE_SEMVER_DIFFS: &str = "semver_diffs";
pub(crate) const TABLE_LINKS_DECLARATIONS: &str = "links_declarations";
pub(crate) const TABLE_RESOLVED_FEATURES: &str = "resolved_features";
pub(crate) const TABLE_MSRV_PROBES: &str = "msrv_probes";
pub(crate) const TABLE_TEST_IMPACTED_SELECTIONS: &str = "test_impacted_selections";
pub(crate) const TABLE_TEST_QUARANTINES: &str = "test_quarantines";
pub(crate) const TABLE_RELEASE_COMMITS: &str = "release_commits";
pub(crate) const TABLE_REGISTRY_MIRRORS: &str = "registry_mirrors";
pub(crate) const TABLE_VERSION_BUMP_PLANS: &str = "version_bump_plans";
pub(crate) const TABLE_VERSION_BUMP_TARGETS: &str = "version_bump_targets";
pub(crate) const TABLE_SYMBOL_FACTS: &str = "symbol_facts";
pub(crate) const TABLE_CALL_EDGES: &str = "call_edges";
pub(crate) const TABLE_FEATURE_GATE_FACTS: &str = "feature_gate_facts";
pub(crate) const TABLE_GIT_HEAT_FACTS: &str = "git_heat_facts";
pub(crate) const TABLE_DOC_EXPORTS: &str = "doc_exports";
pub(crate) const TABLE_DOCS_INDEX_SNAPSHOTS: &str = "docs_index_snapshots";
pub(crate) const TABLE_DOCS_INDEX_BLOBS: &str = "docs_index_blobs";
pub struct IcebergWarehouse {
catalog: Arc<RedbCatalog>,
rt: Option<Runtime>,
namespace: NamespaceIdent,
}
impl Drop for IcebergWarehouse {
fn drop(&mut self) {
if let Some(rt) = self.rt.take() {
rt.shutdown_background();
}
}
}
impl IcebergWarehouse {
pub fn open(root: &Path) -> Result<Self> {
std::fs::create_dir_all(root)
.with_context(|| format!("create warehouse root {}", root.display()))?;
let warehouse_dir = root.join("warehouse");
std::fs::create_dir_all(&warehouse_dir)
.with_context(|| format!("create warehouse data dir {}", warehouse_dir.display()))?;
let db_path = root.join("catalog.redb");
let warehouse_uri = format!("file://{}", warehouse_dir.canonicalize()?.display());
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("build tokio runtime")?;
let catalog = rt.block_on(async {
RedbCatalogBuilder::default()
.db_path(&db_path)
.warehouse_location(&warehouse_uri)
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load("nornir", HashMap::new())
.await
})?;
let catalog = Arc::new(catalog);
let namespace = NamespaceIdent::new(NAMESPACE.to_string());
rt.block_on(ensure_layout(&catalog, &namespace))?;
Ok(Self { catalog, rt: Some(rt), namespace })
}
pub(crate) fn table_ident(&self, name: &str) -> TableIdent {
TableIdent::new(self.namespace.clone(), name.to_string())
}
}
impl IcebergWarehouse {
pub fn catalog(&self) -> &Arc<RedbCatalog> {
&self.catalog
}
pub fn block_on<F: std::future::Future>(&self, fut: F) -> F::Output {
self.rt.as_ref().expect("rt present").block_on(fut)
}
}
async fn ensure_layout(catalog: &RedbCatalog, ns: &NamespaceIdent) -> Result<()> {
if !catalog.namespace_exists(ns).await? {
catalog.create_namespace(ns, HashMap::new()).await?;
}
create_partitioned_table_if_missing(
catalog,
ns,
TABLE_BENCH_RUNS,
iceberg_schema::bench_runs()?,
&["repo"],
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_BENCH_RESULTS,
iceberg_schema::bench_results()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_TEST_OUTCOMES,
iceberg_schema::test_outcomes()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_DEP_GRAPH_EDGES,
iceberg_schema::dep_graph_edges()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_RELEASE_LINEAGE,
iceberg_schema::release_lineage()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_FUNNEL_EVENTS,
iceberg_schema::funnel_events()?,
)
.await?;
for (snap_name, blob_name) in [
(TABLE_TANTIVY_INDEX_SNAPSHOTS, TABLE_TANTIVY_INDEX_BLOBS),
(TABLE_DWARF_SNAPSHOTS, TABLE_DWARF_BLOBS),
(TABLE_GIMLI_SNAPSHOTS, TABLE_GIMLI_BLOBS),
(TABLE_RUSTDOC_JSON_SNAPSHOTS, TABLE_RUSTDOC_JSON_BLOBS),
(TABLE_DOCS_INDEX_SNAPSHOTS, TABLE_DOCS_INDEX_BLOBS),
] {
create_partitioned_table_if_missing(
catalog, ns, snap_name, iceberg_schema::artifact_snapshots()?, &["repo"],
)
.await?;
create_partitioned_table_if_missing(
catalog, ns, blob_name, iceberg_schema::artifact_blobs()?, &["snapshot_id"],
)
.await?;
}
create_table_if_missing(catalog, ns, TABLE_DOC_EXPORTS, iceberg_schema::doc_exports()?).await?;
create_table_if_missing(catalog, ns, TABLE_EMBEDDINGS, iceberg_schema::embeddings()?).await?;
create_table_if_missing(
catalog,
ns,
TABLE_EMBEDDING_SNAPSHOTS,
iceberg_schema::embedding_snapshots()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_EMBEDDING_MANIFEST,
iceberg_schema::embedding_manifest()?,
)
.await?;
for (name, schema) in [
(TABLE_PATH_DEP_AUDITS, iceberg_schema::path_dep_audits()?),
(TABLE_PATCH_STRIP_EVENTS, iceberg_schema::patch_strip_events()?),
(TABLE_PUBLISH_ATTEMPTS, iceberg_schema::publish_attempts()?),
(TABLE_CRATE_METADATA_CHECKS, iceberg_schema::crate_metadata_checks()?),
(TABLE_CRATE_TARBALL_STATS, iceberg_schema::crate_tarball_stats()?),
(TABLE_YANK_EVENTS, iceberg_schema::yank_events()?),
(TABLE_SEMVER_DIFFS, iceberg_schema::semver_diffs()?),
(TABLE_LINKS_DECLARATIONS, iceberg_schema::links_declarations()?),
(TABLE_RESOLVED_FEATURES, iceberg_schema::resolved_features()?),
(TABLE_MSRV_PROBES, iceberg_schema::msrv_probes()?),
(TABLE_TEST_IMPACTED_SELECTIONS, iceberg_schema::test_impacted_selections()?),
(TABLE_TEST_QUARANTINES, iceberg_schema::test_quarantines()?),
(TABLE_RELEASE_COMMITS, iceberg_schema::release_commits()?),
(TABLE_REGISTRY_MIRRORS, iceberg_schema::registry_mirrors()?),
(TABLE_VERSION_BUMP_PLANS, iceberg_schema::version_bump_plans()?),
(TABLE_VERSION_BUMP_TARGETS, iceberg_schema::version_bump_targets()?),
] {
create_table_if_missing(catalog, ns, name, schema).await?;
}
for (name, schema) in [
(TABLE_SYMBOL_FACTS, iceberg_schema::symbol_facts()?),
(TABLE_CALL_EDGES, iceberg_schema::call_edges()?),
(TABLE_FEATURE_GATE_FACTS, iceberg_schema::feature_gate_facts()?),
(TABLE_GIT_HEAT_FACTS, iceberg_schema::git_heat_facts()?),
] {
create_partitioned_table_if_missing(catalog, ns, name, schema, &["repo"]).await?;
}
Ok(())
}
async fn create_table_if_missing(
catalog: &RedbCatalog,
ns: &NamespaceIdent,
name: &str,
schema: iceberg::spec::Schema,
) -> Result<()> {
create_partitioned_table_if_missing(catalog, ns, name, schema, &[]).await
}
async fn create_partitioned_table_if_missing(
catalog: &RedbCatalog,
ns: &NamespaceIdent,
name: &str,
schema: iceberg::spec::Schema,
partition_cols: &[&str],
) -> Result<()> {
let ident = TableIdent::new(ns.clone(), name.to_string());
if catalog.table_exists(&ident).await? {
return Ok(());
}
let creation = if partition_cols.is_empty() {
TableCreation::builder().name(name.to_string()).schema(schema).build()
} else {
let spec = identity_partition_spec(&schema, partition_cols)?;
TableCreation::builder()
.name(name.to_string())
.schema(schema)
.partition_spec(spec)
.build()
};
catalog.create_table(ns, creation).await?;
Ok(())
}
fn identity_partition_spec(
schema: &iceberg::spec::Schema,
columns: &[&str],
) -> Result<UnboundPartitionSpec> {
let mut b = PartitionSpec::builder(Arc::new(schema.clone()));
for c in columns {
b = b.add_partition_field(c, (*c).to_string(), Transform::Identity)?;
}
Ok(b.build()?.into_unbound())
}
fn partition_key_for(table: &Table, batch: &RecordBatch) -> Result<Option<PartitionKey>> {
let meta = table.metadata();
let spec = meta.default_partition_spec();
if spec.fields().is_empty() {
return Ok(None);
}
let schema = meta.current_schema();
let mut lits: Vec<Option<Literal>> = Vec::with_capacity(spec.fields().len());
for f in spec.fields() {
if f.transform != Transform::Identity {
return Err(anyhow!(
"append_batch: only identity partition transforms are supported (got {:?} on `{}`)",
f.transform, f.name
));
}
let src = schema.field_by_id(f.source_id).ok_or_else(|| {
anyhow!("partition source field id {} not in schema", f.source_id)
})?;
let col = batch.column_by_name(&src.name).ok_or_else(|| {
anyhow!("partition column `{}` missing from batch", src.name)
})?;
let arr = col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
anyhow!("partition column `{}` is not Utf8 (only string identity partitions)", src.name)
})?;
if arr.is_empty() {
return Ok(None);
}
let v0 = arr.value(0);
for i in 1..arr.len() {
if arr.value(i) != v0 {
return Err(anyhow!(
"append_batch: batch spans multiple `{}` partitions (`{v0}` vs `{}`); \
single-partition appends only",
src.name, arr.value(i)
));
}
}
lits.push(Some(Literal::string(v0)));
}
Ok(Some(PartitionKey::new(
spec.as_ref().clone(),
schema.clone(),
Struct::from_iter(lits),
)))
}
impl IcebergWarehouse {
pub async fn append_bench_run_async(&self, repo: &str, run: &BenchRun) -> Result<Uuid> {
if run.machine.trim().is_empty() {
anyhow::bail!("BenchRun.machine is required");
}
let run_id = Uuid::new_v4();
let ts = resolve_timestamp(run)?;
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RUNS)).await?;
let batch = build_bench_runs_batch(&table, run_id, repo, ts, run)?;
append_batch(&self.catalog, table, batch).await?;
let results_batch_opt = {
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RESULTS)).await?;
let batch = build_bench_results_batch(&table, run_id, run)?;
if batch.num_rows() > 0 { Some((table, batch)) } else { None }
};
if let Some((table, batch)) = results_batch_opt {
append_batch(&self.catalog, table, batch).await?;
}
let tests_batch_opt = {
let table = self.catalog.load_table(&self.table_ident(TABLE_TEST_OUTCOMES)).await?;
let batch = build_test_outcomes_batch(&table, run_id, run)?;
if batch.num_rows() > 0 { Some((table, batch)) } else { None }
};
if let Some((table, batch)) = tests_batch_opt {
append_batch(&self.catalog, table, batch).await?;
}
Ok(run_id)
}
async fn artifact_snapshot_id_for(
&self,
table_name: &str,
repo: &str,
git_sha: &str,
) -> Result<Option<String>> {
let table = match self.catalog.load_table(&self.table_ident(table_name)).await {
Ok(t) => t,
Err(_) => return Ok(None),
};
let batches = scan_repo_filtered(&table, Some(repo)).await?;
let mut found = None;
for b in &batches {
let sid = downcast::<StringArray>(b, 0)?;
let sha = downcast::<StringArray>(b, 3)?;
for i in 0..b.num_rows() {
if sha.value(i) == git_sha {
found = Some(sid.value(i).to_string());
}
}
}
Ok(found)
}
pub async fn index_snapshot_id_for(&self, repo: &str, git_sha: &str) -> Result<Option<String>> {
self.artifact_snapshot_id_for(TABLE_TANTIVY_INDEX_SNAPSHOTS, repo, git_sha).await
}
pub async fn dwarf_snapshot_id_for(&self, repo: &str, git_sha: &str) -> Result<Option<String>> {
self.artifact_snapshot_id_for(TABLE_DWARF_SNAPSHOTS, repo, git_sha).await
}
}
impl Warehouse for IcebergWarehouse {
fn append_bench_run(&self, repo: &str, run: &BenchRun) -> Result<Uuid> {
self.rt.as_ref().expect("rt present").block_on(self.append_bench_run_async(repo, run))
}
fn query_bench_runs(&self, filter: &BenchFilter) -> Result<Vec<BenchRun>> {
self.rt.as_ref().expect("rt present").block_on(self.query_bench_runs_async(filter))
}
}
impl IcebergWarehouse {
pub async fn query_bench_runs_async(&self, filter: &BenchFilter) -> Result<Vec<BenchRun>> {
async {
let mut by_id: std::collections::HashMap<Uuid, BenchRun> =
std::collections::HashMap::new();
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RUNS)).await?;
for batch in scan_repo_filtered(&table, filter.repo.as_deref()).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let repos = downcast::<StringArray>(&batch, 1)?;
let ts = downcast::<TimestampMicrosecondArray>(&batch, 2)?;
let dates = downcast::<StringArray>(&batch, 3)?;
let versions = downcast::<StringArray>(&batch, 4)?;
let machines = downcast::<StringArray>(&batch, 5)?;
let cores = downcast::<Int32Array>(&batch, 6)?;
for i in 0..batch.num_rows() {
if let Some(want) = &filter.repo {
if repos.value(i) != want { continue; }
}
if let Some(want) = &filter.machine {
if machines.value(i) != want { continue; }
}
let uid = Uuid::parse_str(ids.value(i))?;
let ts_dt = Utc.timestamp_micros(ts.value(i)).single()
.context("invalid micro timestamp")?;
by_id.insert(uid, BenchRun {
date: dates.value(i).to_string(),
timestamp: Some(ts_dt.to_rfc3339()),
version: versions.value(i).to_string(),
machine: machines.value(i).to_string(),
cores: cores.value(i).max(0) as u32,
results: Vec::new(),
tests: Vec::new(),
});
}
}
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RESULTS)).await?;
for batch in scan_all(&table).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let names = downcast::<StringArray>(&batch, 1)?;
let metrics = downcast::<StringArray>(&batch, 2)?;
let values = downcast::<Float64Array>(&batch, 3)?;
for i in 0..batch.num_rows() {
let uid = Uuid::parse_str(ids.value(i))?;
if let Some(run) = by_id.get_mut(&uid) {
let name = names.value(i).to_string();
let entry = run.results.iter_mut().find(|r| r.name == name);
let target = if let Some(e) = entry {
e
} else {
run.results.push(BenchResult { name, metrics: Default::default() });
run.results.last_mut().unwrap()
};
target.metrics.insert(
metrics.value(i).to_string(),
serde_json::Value::from(values.value(i)),
);
}
}
}
let table = self.catalog.load_table(&self.table_ident(TABLE_TEST_OUTCOMES)).await?;
for batch in scan_all(&table).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let names = downcast::<StringArray>(&batch, 1)?;
let passed = downcast::<BooleanArray>(&batch, 2)?;
let durations = downcast::<Float64Array>(&batch, 3)?;
let messages = downcast::<StringArray>(&batch, 4)?;
for i in 0..batch.num_rows() {
let uid = Uuid::parse_str(ids.value(i))?;
if let Some(run) = by_id.get_mut(&uid) {
run.tests.push(TestOutcome {
name: names.value(i).to_string(),
passed: passed.value(i),
duration_ms: if durations.is_null(i) { None } else { Some(durations.value(i)) },
message: if messages.is_null(i) { None } else { Some(messages.value(i).to_string()) },
});
}
}
}
let mut out: Vec<BenchRun> = by_id.into_values().collect();
out.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
if let Some(n) = filter.limit {
let drop_n = out.len().saturating_sub(n);
out.drain(..drop_n);
}
anyhow::Ok(out)
}.await
}
}
pub(crate) async fn append_batch(catalog: &RedbCatalog, table: Table, batch: RecordBatch) -> Result<()> {
let parquet_builder =
ParquetWriterBuilder::new(WriterProperties::builder().build(), table.metadata().current_schema().clone());
let location_gen = DefaultLocationGenerator::new(table.metadata().clone())?;
let file_name_gen = DefaultFileNameGenerator::new(
"nornir".to_string(),
Some(Uuid::new_v4().to_string()),
DataFileFormat::Parquet,
);
let rolling = RollingFileWriterBuilder::new_with_default_file_size(
parquet_builder,
table.file_io().clone(),
location_gen,
file_name_gen,
);
let partition_key = partition_key_for(&table, &batch)?;
let mut writer = DataFileWriterBuilder::new(rolling).build(partition_key).await?;
writer.write(batch).await?;
let data_files = writer.close().await?;
if data_files.is_empty() {
return Err(anyhow!("append_batch: writer produced 0 data files"));
}
let tx = Transaction::new(&table);
let action = tx.fast_append().add_data_files(data_files);
let tx = action.apply(tx)?;
tx.commit(catalog).await?;
Ok(())
}
async fn scan_all(table: &Table) -> Result<Vec<RecordBatch>> {
let scan = table.scan().build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
Ok(batches)
}
async fn scan_repo_filtered(table: &Table, repo: Option<&str>) -> Result<Vec<RecordBatch>> {
let mut builder = table.scan();
if let Some(r) = repo {
builder = builder.with_filter(Reference::new("repo").equal_to(Datum::string(r)));
}
let scan = builder.build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
Ok(batches)
}
fn downcast<T: 'static>(batch: &RecordBatch, idx: usize) -> Result<&T> {
batch
.column(idx)
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("column {idx} has unexpected type {:?}", batch.column(idx).data_type()))
}
fn resolve_timestamp(run: &BenchRun) -> Result<DateTime<Utc>> {
if let Some(s) = &run.timestamp {
return DateTime::parse_from_rfc3339(s)
.map(|dt| dt.with_timezone(&Utc))
.with_context(|| format!("parse timestamp {s}"));
}
let nd = NaiveDate::parse_from_str(&run.date, "%Y-%m-%d")
.with_context(|| format!("parse date {}", run.date))?;
let ndt: NaiveDateTime = nd.and_hms_opt(0, 0, 0).unwrap();
Ok(Utc.from_utc_datetime(&ndt))
}
fn build_bench_runs_batch(
table: &Table,
run_id: Uuid,
repo: &str,
ts: DateTime<Utc>,
run: &BenchRun,
) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![run_id.to_string()])),
Arc::new(StringArray::from(vec![repo.to_string()])),
Arc::new(TimestampMicrosecondArray::from(vec![ts.timestamp_micros()]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![run.date.clone()])),
Arc::new(StringArray::from(vec![run.version.clone()])),
Arc::new(StringArray::from(vec![run.machine.clone()])),
Arc::new(Int32Array::from(vec![run.cores as i32])),
];
Ok(RecordBatch::try_new(s, cols)?)
}
fn build_bench_results_batch(table: &Table, run_id: Uuid, run: &BenchRun) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let id_str = run_id.to_string();
let mut ids = Vec::new();
let mut names = Vec::new();
let mut metrics = Vec::new();
let mut values = Vec::new();
for r in &run.results {
for (k, v) in &r.metrics {
let f = match v {
serde_json::Value::Number(n) => n.as_f64(),
_ => continue,
};
if let Some(f) = f {
ids.push(id_str.clone());
names.push(r.name.clone());
metrics.push(k.clone());
values.push(f);
}
}
}
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(StringArray::from(metrics)),
Arc::new(Float64Array::from(values)),
];
Ok(RecordBatch::try_new(s, cols)?)
}
fn build_test_outcomes_batch(table: &Table, run_id: Uuid, run: &BenchRun) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let id_str = run_id.to_string();
let n = run.tests.len();
let ids: Vec<String> = run.tests.iter().map(|_| id_str.clone()).collect();
let names: Vec<String> = run.tests.iter().map(|t| t.name.clone()).collect();
let passed: Vec<bool> = run.tests.iter().map(|t| t.passed).collect();
let durations: Vec<Option<f64>> = run.tests.iter().map(|t| t.duration_ms).collect();
let messages: Vec<Option<String>> = run.tests.iter().map(|t| t.message.clone()).collect();
let nulls_string: Vec<Option<String>> = vec![None; n];
let nulls_int: Vec<Option<i32>> = vec![None; n];
let nulls_bool: Vec<Option<bool>> = vec![None; n];
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(BooleanArray::from(passed)),
Arc::new(Float64Array::from(durations)),
Arc::new(StringArray::from(messages)),
Arc::new(StringArray::from(nulls_string.clone())), Arc::new(StringArray::from(nulls_string.clone())), Arc::new(Int32Array::from(nulls_int.clone())), Arc::new(StringArray::from(nulls_string)), Arc::new(BooleanArray::from(nulls_bool)), ];
Ok(RecordBatch::try_new(s, cols)?)
}
#[allow(dead_code)]
fn _unused_imports_anchor() {}
impl IcebergWarehouse {
pub fn append_symbol_scan(
&self,
scan: &crate::knowledge::symbols::SymbolScan,
) -> Result<()> {
self.rt.as_ref().expect("rt present").block_on(self.append_symbol_scan_async(scan))
}
pub async fn append_symbol_scan_async(
&self,
scan: &crate::knowledge::symbols::SymbolScan,
) -> Result<()> {
let ts = scan.ts.timestamp_micros();
let snap = scan.snapshot_id.to_string();
if !scan.symbols.is_empty() {
let table = self.catalog.load_table(&self.table_ident(TABLE_SYMBOL_FACTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.symbols.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap.clone(); n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.crate_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.module_path.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.item_kind.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.item_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.visibility.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.symbols.iter().map(|r| r.line as i32).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.symbols.iter().map(|r| r.doc_lines as i32).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.signature.clone()).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
}
if !scan.calls.is_empty() {
let table = self.catalog.load_table(&self.table_ident(TABLE_CALL_EDGES)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.calls.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap.clone(); n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.crate_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.caller_path.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.callee_ident.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.call_kind.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.calls.iter().map(|r| r.line as i32).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
}
if !scan.features.is_empty() {
let table = self.catalog.load_table(&self.table_ident(TABLE_FEATURE_GATE_FACTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.features.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap.clone(); n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.crate_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.module_path.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.item_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.cfg_expr.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.features.iter().map(|r| r.line as i32).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
}
Ok(())
}
pub fn append_git_heat_scan(
&self,
scan: &crate::knowledge::git_heat::GitHeatScan,
) -> Result<()> {
self.rt.as_ref().expect("rt present").block_on(self.append_git_heat_scan_async(scan))
}
pub async fn append_git_heat_scan_async(
&self,
scan: &crate::knowledge::git_heat::GitHeatScan,
) -> Result<()> {
if scan.files.is_empty() {
return Ok(());
}
let ts = scan.ts.timestamp_micros();
let snap = scan.snapshot_id.to_string();
let table = self.catalog.load_table(&self.table_ident(TABLE_GIT_HEAT_FACTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.files.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap; n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.files.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.commits_total).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.commits_30d).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.commits_90d).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.authors_total).collect::<Vec<_>>())),
Arc::new(TimestampMicrosecondArray::from(scan.files.iter().map(|r| r.last_commit_ts.timestamp_micros()).collect::<Vec<_>>()).with_timezone("+00:00")),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bench::{BenchResult, BenchRun, TestOutcome};
fn sample_run(machine: &str, ops: f64) -> BenchRun {
let mut metrics = serde_json::Map::new();
metrics.insert("ops_sec".into(), serde_json::json!(ops));
BenchRun {
date: "2026-05-30".into(),
timestamp: Some("2026-05-30T21:00:00Z".into()),
version: "0.1.0".into(),
machine: machine.into(),
cores: 32,
results: vec![BenchResult { name: "x".into(), metrics }],
tests: vec![TestOutcome { name: "smoke".into(), passed: true, duration_ms: Some(1.5), message: None }],
}
}
#[test]
fn roundtrip_bench_run() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let _id = wh.append_bench_run("holger", &sample_run("ryzen", 123.0)).unwrap();
let runs = wh.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
assert_eq!(runs.len(), 1, "expected exactly one bench run");
let r = &runs[0];
assert_eq!(r.machine, "ryzen");
assert_eq!(r.cores, 32);
assert_eq!(r.results.len(), 1);
assert_eq!(r.results[0].name, "x");
let ops = r.results[0].metrics.get("ops_sec").unwrap().as_f64().unwrap();
assert!((ops - 123.0).abs() < 1e-9);
assert_eq!(r.tests.len(), 1);
assert!(r.tests[0].passed);
assert_eq!(r.tests[0].duration_ms, Some(1.5));
}
#[test]
fn partitioned_bench_runs_scope_to_repo() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 1.0)).unwrap();
wh.append_bench_run("znippy", &sample_run("ryzen", 2.0)).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 3.0)).unwrap();
let ops = |r: &BenchRun| r.results[0].metrics.get("ops_sec").unwrap().as_f64().unwrap();
let mut h: Vec<f64> = wh
.query_bench_runs(&BenchFilter::for_repo("holger"))
.unwrap()
.iter()
.map(ops)
.collect();
h.sort_by(|a, b| a.total_cmp(b));
assert_eq!(h, vec![1.0, 3.0], "only holger's two runs");
let z = wh.query_bench_runs(&BenchFilter::for_repo("znippy")).unwrap();
assert_eq!(z.len(), 1, "only znippy's one run");
assert!((ops(&z[0]) - 2.0).abs() < 1e-9);
}
#[test]
fn filter_by_machine() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 1.0)).unwrap();
wh.append_bench_run("holger", &sample_run("threadripper", 2.0)).unwrap();
let filter = BenchFilter {
repo: Some("holger".into()),
machine: Some("threadripper".into()),
limit: None,
};
let runs = wh.query_bench_runs(&filter).unwrap();
assert_eq!(runs.len(), 1);
assert_eq!(runs[0].machine, "threadripper");
}
#[test]
fn reopen_sees_previous_data() {
let dir = tempfile::tempdir().unwrap();
{
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 7.0)).unwrap();
}
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let runs = wh.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
assert_eq!(runs.len(), 1);
}
}