use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use arrow::array::{
Array, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
TimestampMicrosecondArray,
};
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::io::LocalFsStorageFactory;
use iceberg::spec::{Datum, PartitionSpec, Transform, UnboundPartitionSpec};
use iceberg::table::Table;
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
use skade_katalog::{RedbCatalog, RedbCatalogBuilder};
use tokio::runtime::Runtime;
use uuid::Uuid;
use super::iceberg_schema;
use super::{BenchFilter, Warehouse};
use crate::bench::{BenchResult, BenchRun, TestOutcome};
const NAMESPACE: &str = "nornir";
const TABLE_BENCH_RUNS: &str = "bench_runs";
const TABLE_BENCH_RESULTS: &str = "bench_results";
const TABLE_TEST_OUTCOMES: &str = "test_outcomes";
pub(crate) const TABLE_DEP_GRAPH_EDGES: &str = "dep_graph_edges";
pub(crate) const TABLE_RELEASE_LINEAGE: &str = "release_lineage";
pub(crate) const TABLE_FUNNEL_EVENTS: &str = "funnel_events";
pub(crate) const TABLE_TANTIVY_INDEX_SNAPSHOTS: &str = "tantivy_index_snapshots";
pub(crate) const TABLE_TANTIVY_INDEX_BLOBS: &str = "tantivy_index_blobs";
pub(crate) const TABLE_DWARF_SNAPSHOTS: &str = "dwarf_snapshots";
pub(crate) const TABLE_DWARF_BLOBS: &str = "dwarf_blobs";
pub(crate) const TABLE_GIMLI_SNAPSHOTS: &str = "gimli_snapshots";
pub(crate) const TABLE_GIMLI_BLOBS: &str = "gimli_blobs";
pub(crate) const TABLE_RUSTDOC_JSON_SNAPSHOTS: &str = "rustdoc_json_snapshots";
pub(crate) const TABLE_RUSTDOC_JSON_BLOBS: &str = "rustdoc_json_blobs";
pub(crate) const TABLE_EMBEDDINGS: &str = "embeddings";
pub(crate) const TABLE_EMBEDDING_SNAPSHOTS: &str = "embedding_snapshots";
pub(crate) const TABLE_EMBEDDING_MANIFEST: &str = "embedding_manifest";
pub(crate) const TABLE_PATH_DEP_AUDITS: &str = "path_dep_audits";
pub(crate) const TABLE_PATCH_STRIP_EVENTS: &str = "patch_strip_events";
pub(crate) const TABLE_PUBLISH_ATTEMPTS: &str = "publish_attempts";
pub(crate) const TABLE_CRATE_METADATA_CHECKS: &str = "crate_metadata_checks";
pub(crate) const TABLE_CRATE_TARBALL_STATS: &str = "crate_tarball_stats";
pub(crate) const TABLE_YANK_EVENTS: &str = "yank_events";
pub(crate) const TABLE_SEMVER_DIFFS: &str = "semver_diffs";
pub(crate) const TABLE_LINKS_DECLARATIONS: &str = "links_declarations";
pub(crate) const TABLE_RESOLVED_FEATURES: &str = "resolved_features";
pub(crate) const TABLE_MSRV_PROBES: &str = "msrv_probes";
pub(crate) const TABLE_TEST_IMPACTED_SELECTIONS: &str = "test_impacted_selections";
pub(crate) const TABLE_TEST_QUARANTINES: &str = "test_quarantines";
pub(crate) const TABLE_RELEASE_COMMITS: &str = "release_commits";
pub(crate) const TABLE_REGISTRY_MIRRORS: &str = "registry_mirrors";
pub(crate) const TABLE_VERSION_BUMP_PLANS: &str = "version_bump_plans";
pub(crate) const TABLE_VERSION_BUMP_TARGETS: &str = "version_bump_targets";
pub(crate) const TABLE_SYMBOL_FACTS: &str = "symbol_facts";
pub(crate) const TABLE_CALL_EDGES: &str = "call_edges";
pub(crate) const TABLE_FEATURE_GATE_FACTS: &str = "feature_gate_facts";
pub(crate) const TABLE_GIT_HEAT_FACTS: &str = "git_heat_facts";
pub(crate) const TABLE_DOC_EXPORTS: &str = "doc_exports";
pub(crate) const TABLE_MCP_REQUESTS: &str = "mcp_requests";
pub(crate) const TABLE_DOCS_INDEX_SNAPSHOTS: &str = "docs_index_snapshots";
pub(crate) const TABLE_DOCS_INDEX_BLOBS: &str = "docs_index_blobs";
pub struct IcebergWarehouse {
catalog: Arc<RedbCatalog>,
rt: Option<Runtime>,
namespace: NamespaceIdent,
}
impl Drop for IcebergWarehouse {
fn drop(&mut self) {
if let Some(rt) = self.rt.take() {
rt.shutdown_background();
}
}
}
impl IcebergWarehouse {
pub fn open(root: &Path) -> Result<Self> {
std::fs::create_dir_all(root)
.with_context(|| format!("create warehouse root {}", root.display()))?;
let warehouse_dir = root.join("warehouse");
std::fs::create_dir_all(&warehouse_dir)
.with_context(|| format!("create warehouse data dir {}", warehouse_dir.display()))?;
let db_path = root.join("catalog.redb");
let warehouse_uri = format!("file://{}", warehouse_dir.canonicalize()?.display());
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("build tokio runtime")?;
let catalog = rt.block_on(async {
RedbCatalogBuilder::default()
.db_path(&db_path)
.warehouse_location(&warehouse_uri)
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load("nornir", HashMap::new())
.await
})?;
let catalog = Arc::new(catalog);
let namespace = NamespaceIdent::new(NAMESPACE.to_string());
rt.block_on(ensure_layout(&catalog, &namespace))?;
Ok(Self { catalog, rt: Some(rt), namespace })
}
pub(crate) fn table_ident(&self, name: &str) -> TableIdent {
TableIdent::new(self.namespace.clone(), name.to_string())
}
}
impl IcebergWarehouse {
pub fn catalog(&self) -> &Arc<RedbCatalog> {
&self.catalog
}
pub fn block_on<F: std::future::Future>(&self, fut: F) -> F::Output {
self.rt.as_ref().expect("rt present").block_on(fut)
}
}
async fn ensure_layout(catalog: &RedbCatalog, ns: &NamespaceIdent) -> Result<()> {
if !catalog.namespace_exists(ns).await? {
catalog.create_namespace(ns, HashMap::new()).await?;
}
create_partitioned_table_if_missing(
catalog,
ns,
TABLE_BENCH_RUNS,
iceberg_schema::bench_runs()?,
&["repo"],
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_BENCH_RESULTS,
iceberg_schema::bench_results()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_TEST_OUTCOMES,
iceberg_schema::test_outcomes()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_DEP_GRAPH_EDGES,
iceberg_schema::dep_graph_edges()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_RELEASE_LINEAGE,
iceberg_schema::release_lineage()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_FUNNEL_EVENTS,
iceberg_schema::funnel_events()?,
)
.await?;
for (snap_name, blob_name) in [
(TABLE_TANTIVY_INDEX_SNAPSHOTS, TABLE_TANTIVY_INDEX_BLOBS),
(TABLE_DWARF_SNAPSHOTS, TABLE_DWARF_BLOBS),
(TABLE_GIMLI_SNAPSHOTS, TABLE_GIMLI_BLOBS),
(TABLE_RUSTDOC_JSON_SNAPSHOTS, TABLE_RUSTDOC_JSON_BLOBS),
(TABLE_DOCS_INDEX_SNAPSHOTS, TABLE_DOCS_INDEX_BLOBS),
] {
create_partitioned_table_if_missing(
catalog, ns, snap_name, iceberg_schema::artifact_snapshots()?, &["repo"],
)
.await?;
create_partitioned_table_if_missing(
catalog, ns, blob_name, iceberg_schema::artifact_blobs()?, &["snapshot_id"],
)
.await?;
}
create_table_if_missing(catalog, ns, TABLE_DOC_EXPORTS, iceberg_schema::doc_exports()?).await?;
create_table_if_missing(catalog, ns, TABLE_EMBEDDINGS, iceberg_schema::embeddings()?).await?;
create_table_if_missing(
catalog,
ns,
TABLE_EMBEDDING_SNAPSHOTS,
iceberg_schema::embedding_snapshots()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_EMBEDDING_MANIFEST,
iceberg_schema::embedding_manifest()?,
)
.await?;
for (name, schema) in [
(TABLE_PATH_DEP_AUDITS, iceberg_schema::path_dep_audits()?),
(TABLE_PATCH_STRIP_EVENTS, iceberg_schema::patch_strip_events()?),
(TABLE_PUBLISH_ATTEMPTS, iceberg_schema::publish_attempts()?),
(TABLE_CRATE_METADATA_CHECKS, iceberg_schema::crate_metadata_checks()?),
(TABLE_CRATE_TARBALL_STATS, iceberg_schema::crate_tarball_stats()?),
(TABLE_YANK_EVENTS, iceberg_schema::yank_events()?),
(TABLE_SEMVER_DIFFS, iceberg_schema::semver_diffs()?),
(TABLE_LINKS_DECLARATIONS, iceberg_schema::links_declarations()?),
(TABLE_RESOLVED_FEATURES, iceberg_schema::resolved_features()?),
(TABLE_MSRV_PROBES, iceberg_schema::msrv_probes()?),
(TABLE_TEST_IMPACTED_SELECTIONS, iceberg_schema::test_impacted_selections()?),
(TABLE_TEST_QUARANTINES, iceberg_schema::test_quarantines()?),
(TABLE_RELEASE_COMMITS, iceberg_schema::release_commits()?),
(TABLE_REGISTRY_MIRRORS, iceberg_schema::registry_mirrors()?),
(TABLE_VERSION_BUMP_PLANS, iceberg_schema::version_bump_plans()?),
(TABLE_VERSION_BUMP_TARGETS, iceberg_schema::version_bump_targets()?),
(TABLE_MCP_REQUESTS, iceberg_schema::mcp_requests()?),
] {
create_table_if_missing(catalog, ns, name, schema).await?;
}
for (name, schema) in [
(TABLE_SYMBOL_FACTS, iceberg_schema::symbol_facts()?),
(TABLE_CALL_EDGES, iceberg_schema::call_edges()?),
(TABLE_FEATURE_GATE_FACTS, iceberg_schema::feature_gate_facts()?),
(TABLE_GIT_HEAT_FACTS, iceberg_schema::git_heat_facts()?),
] {
create_partitioned_table_if_missing(catalog, ns, name, schema, &["repo"]).await?;
}
Ok(())
}
async fn create_table_if_missing(
catalog: &RedbCatalog,
ns: &NamespaceIdent,
name: &str,
schema: iceberg::spec::Schema,
) -> Result<()> {
create_partitioned_table_if_missing(catalog, ns, name, schema, &[]).await
}
async fn create_partitioned_table_if_missing(
catalog: &RedbCatalog,
ns: &NamespaceIdent,
name: &str,
schema: iceberg::spec::Schema,
partition_cols: &[&str],
) -> Result<()> {
let ident = TableIdent::new(ns.clone(), name.to_string());
if catalog.table_exists(&ident).await? {
return Ok(());
}
let creation = if partition_cols.is_empty() {
TableCreation::builder().name(name.to_string()).schema(schema).build()
} else {
let spec = identity_partition_spec(&schema, partition_cols)?;
TableCreation::builder()
.name(name.to_string())
.schema(schema)
.partition_spec(spec)
.build()
};
catalog.create_table(ns, creation).await?;
Ok(())
}
fn identity_partition_spec(
schema: &iceberg::spec::Schema,
columns: &[&str],
) -> Result<UnboundPartitionSpec> {
let mut b = PartitionSpec::builder(Arc::new(schema.clone()));
for c in columns {
b = b.add_partition_field(c, (*c).to_string(), Transform::Identity)?;
}
Ok(b.build()?.into_unbound())
}
impl IcebergWarehouse {
pub async fn append_bench_run_async(&self, repo: &str, run: &BenchRun) -> Result<Uuid> {
if run.machine.trim().is_empty() {
anyhow::bail!("BenchRun.machine is required");
}
let run_id = Uuid::new_v4();
let ts = resolve_timestamp(run)?;
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RUNS)).await?;
let batch = build_bench_runs_batch(&table, run_id, repo, ts, run)?;
append_batch(&self.catalog, table, batch).await?;
let results_batch_opt = {
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RESULTS)).await?;
let batch = build_bench_results_batch(&table, run_id, run)?;
if batch.num_rows() > 0 { Some((table, batch)) } else { None }
};
if let Some((table, batch)) = results_batch_opt {
append_batch(&self.catalog, table, batch).await?;
}
let tests_batch_opt = {
let table = self.catalog.load_table(&self.table_ident(TABLE_TEST_OUTCOMES)).await?;
let batch = build_test_outcomes_batch(&table, run_id, run)?;
if batch.num_rows() > 0 { Some((table, batch)) } else { None }
};
if let Some((table, batch)) = tests_batch_opt {
append_batch(&self.catalog, table, batch).await?;
}
Ok(run_id)
}
async fn artifact_snapshot_id_for(
&self,
table_name: &str,
repo: &str,
git_sha: &str,
) -> Result<Option<String>> {
let table = match self.catalog.load_table(&self.table_ident(table_name)).await {
Ok(t) => t,
Err(_) => return Ok(None),
};
let batches = scan_repo_filtered(&table, Some(repo)).await?;
let mut found = None;
for b in &batches {
let sid = downcast::<StringArray>(b, 0)?;
let sha = downcast::<StringArray>(b, 3)?;
for i in 0..b.num_rows() {
if sha.value(i) == git_sha {
found = Some(sid.value(i).to_string());
}
}
}
Ok(found)
}
pub async fn index_snapshot_id_for(&self, repo: &str, git_sha: &str) -> Result<Option<String>> {
self.artifact_snapshot_id_for(TABLE_TANTIVY_INDEX_SNAPSHOTS, repo, git_sha).await
}
pub async fn dwarf_snapshot_id_for(&self, repo: &str, git_sha: &str) -> Result<Option<String>> {
self.artifact_snapshot_id_for(TABLE_DWARF_SNAPSHOTS, repo, git_sha).await
}
pub fn table_names(&self) -> Result<Vec<String>> {
self.rt.as_ref().expect("rt present").block_on(async {
let mut names: Vec<String> = self
.catalog
.list_tables(&self.namespace)
.await?
.into_iter()
.map(|t| t.name().to_string())
.collect();
names.sort();
Ok(names)
})
}
pub fn scan_preview(&self, table: &str, limit: usize) -> Result<TablePreview> {
self.rt.as_ref().expect("rt present").block_on(async {
let t = self.catalog.load_table(&self.table_ident(table)).await?;
let batches = scan_all(&t).await?;
let mut columns: Vec<String> = Vec::new();
let mut rows: Vec<Vec<String>> = Vec::new();
'outer: for b in &batches {
if columns.is_empty() {
columns = b.schema().fields().iter().map(|f| f.name().to_string()).collect();
}
for i in 0..b.num_rows() {
if rows.len() >= limit {
break 'outer;
}
let row = (0..b.num_columns())
.map(|c| cell_to_string(b.column(c).as_ref(), i))
.collect();
rows.push(row);
}
}
Ok(TablePreview { columns, rows })
})
}
}
#[derive(Debug, Clone, Default)]
pub struct TablePreview {
pub columns: Vec<String>,
pub rows: Vec<Vec<String>>,
}
fn cell_to_string(arr: &dyn Array, row: usize) -> String {
if arr.is_null(row) {
return String::new();
}
if let Some(a) = arr.as_any().downcast_ref::<StringArray>() {
return a.value(row).to_string();
}
if let Some(a) = arr.as_any().downcast_ref::<Int64Array>() {
return a.value(row).to_string();
}
if let Some(a) = arr.as_any().downcast_ref::<Int32Array>() {
return a.value(row).to_string();
}
if let Some(a) = arr.as_any().downcast_ref::<Float64Array>() {
return format!("{:.4}", a.value(row));
}
if let Some(a) = arr.as_any().downcast_ref::<BooleanArray>() {
return a.value(row).to_string();
}
format!("<{}>", arr.data_type())
}
impl Warehouse for IcebergWarehouse {
fn append_bench_run(&self, repo: &str, run: &BenchRun) -> Result<Uuid> {
self.rt.as_ref().expect("rt present").block_on(self.append_bench_run_async(repo, run))
}
fn query_bench_runs(&self, filter: &BenchFilter) -> Result<Vec<BenchRun>> {
self.rt.as_ref().expect("rt present").block_on(self.query_bench_runs_async(filter))
}
}
impl IcebergWarehouse {
pub async fn query_bench_runs_async(&self, filter: &BenchFilter) -> Result<Vec<BenchRun>> {
async {
let mut by_id: std::collections::HashMap<Uuid, BenchRun> =
std::collections::HashMap::new();
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RUNS)).await?;
for batch in scan_repo_filtered(&table, filter.repo.as_deref()).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let repos = downcast::<StringArray>(&batch, 1)?;
let ts = downcast::<TimestampMicrosecondArray>(&batch, 2)?;
let dates = downcast::<StringArray>(&batch, 3)?;
let versions = downcast::<StringArray>(&batch, 4)?;
let machines = downcast::<StringArray>(&batch, 5)?;
let cores = downcast::<Int32Array>(&batch, 6)?;
for i in 0..batch.num_rows() {
if let Some(want) = &filter.repo {
if repos.value(i) != want { continue; }
}
if let Some(want) = &filter.machine {
if machines.value(i) != want { continue; }
}
let uid = Uuid::parse_str(ids.value(i))?;
let ts_dt = Utc.timestamp_micros(ts.value(i)).single()
.context("invalid micro timestamp")?;
by_id.insert(uid, BenchRun {
date: dates.value(i).to_string(),
timestamp: Some(ts_dt.to_rfc3339()),
version: versions.value(i).to_string(),
machine: machines.value(i).to_string(),
cores: cores.value(i).max(0) as u32,
results: Vec::new(),
tests: Vec::new(),
});
}
}
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RESULTS)).await?;
for batch in scan_all(&table).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let names = downcast::<StringArray>(&batch, 1)?;
let metrics = downcast::<StringArray>(&batch, 2)?;
let values = downcast::<Float64Array>(&batch, 3)?;
for i in 0..batch.num_rows() {
let uid = Uuid::parse_str(ids.value(i))?;
if let Some(run) = by_id.get_mut(&uid) {
let name = names.value(i).to_string();
let entry = run.results.iter_mut().find(|r| r.name == name);
let target = if let Some(e) = entry {
e
} else {
run.results.push(BenchResult { name, metrics: Default::default() });
run.results.last_mut().unwrap()
};
target.metrics.insert(
metrics.value(i).to_string(),
serde_json::Value::from(values.value(i)),
);
}
}
}
let table = self.catalog.load_table(&self.table_ident(TABLE_TEST_OUTCOMES)).await?;
for batch in scan_all(&table).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let names = downcast::<StringArray>(&batch, 1)?;
let passed = downcast::<BooleanArray>(&batch, 2)?;
let durations = downcast::<Float64Array>(&batch, 3)?;
let messages = downcast::<StringArray>(&batch, 4)?;
for i in 0..batch.num_rows() {
let uid = Uuid::parse_str(ids.value(i))?;
if let Some(run) = by_id.get_mut(&uid) {
run.tests.push(TestOutcome {
name: names.value(i).to_string(),
passed: passed.value(i),
duration_ms: if durations.is_null(i) { None } else { Some(durations.value(i)) },
message: if messages.is_null(i) { None } else { Some(messages.value(i).to_string()) },
});
}
}
}
let mut out: Vec<BenchRun> = by_id.into_values().collect();
out.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
if let Some(n) = filter.limit {
let drop_n = out.len().saturating_sub(n);
out.drain(..drop_n);
}
anyhow::Ok(out)
}.await
}
}
pub(crate) async fn append_batch(catalog: &RedbCatalog, table: Table, batch: RecordBatch) -> Result<()> {
skade::append(catalog, &table, std::slice::from_ref(&batch)).await?;
Ok(())
}
async fn scan_all(table: &Table) -> Result<Vec<RecordBatch>> {
Ok(skade::read_all(table).await?)
}
async fn scan_repo_filtered(table: &Table, repo: Option<&str>) -> Result<Vec<RecordBatch>> {
let mut builder = table.scan();
if let Some(r) = repo {
builder = builder.with_filter(Reference::new("repo").equal_to(Datum::string(r)));
}
let scan = builder.build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
Ok(batches)
}
fn downcast<T: 'static>(batch: &RecordBatch, idx: usize) -> Result<&T> {
batch
.column(idx)
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("column {idx} has unexpected type {:?}", batch.column(idx).data_type()))
}
fn resolve_timestamp(run: &BenchRun) -> Result<DateTime<Utc>> {
if let Some(s) = &run.timestamp {
return DateTime::parse_from_rfc3339(s)
.map(|dt| dt.with_timezone(&Utc))
.with_context(|| format!("parse timestamp {s}"));
}
let nd = NaiveDate::parse_from_str(&run.date, "%Y-%m-%d")
.with_context(|| format!("parse date {}", run.date))?;
let ndt: NaiveDateTime = nd.and_hms_opt(0, 0, 0).unwrap();
Ok(Utc.from_utc_datetime(&ndt))
}
fn build_bench_runs_batch(
table: &Table,
run_id: Uuid,
repo: &str,
ts: DateTime<Utc>,
run: &BenchRun,
) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![run_id.to_string()])),
Arc::new(StringArray::from(vec![repo.to_string()])),
Arc::new(TimestampMicrosecondArray::from(vec![ts.timestamp_micros()]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![run.date.clone()])),
Arc::new(StringArray::from(vec![run.version.clone()])),
Arc::new(StringArray::from(vec![run.machine.clone()])),
Arc::new(Int32Array::from(vec![run.cores as i32])),
];
Ok(RecordBatch::try_new(s, cols)?)
}
fn build_bench_results_batch(table: &Table, run_id: Uuid, run: &BenchRun) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let id_str = run_id.to_string();
let mut ids = Vec::new();
let mut names = Vec::new();
let mut metrics = Vec::new();
let mut values = Vec::new();
for r in &run.results {
for (k, v) in &r.metrics {
let f = match v {
serde_json::Value::Number(n) => n.as_f64(),
_ => continue,
};
if let Some(f) = f {
ids.push(id_str.clone());
names.push(r.name.clone());
metrics.push(k.clone());
values.push(f);
}
}
}
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(StringArray::from(metrics)),
Arc::new(Float64Array::from(values)),
];
Ok(RecordBatch::try_new(s, cols)?)
}
fn build_test_outcomes_batch(table: &Table, run_id: Uuid, run: &BenchRun) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let id_str = run_id.to_string();
let n = run.tests.len();
let ids: Vec<String> = run.tests.iter().map(|_| id_str.clone()).collect();
let names: Vec<String> = run.tests.iter().map(|t| t.name.clone()).collect();
let passed: Vec<bool> = run.tests.iter().map(|t| t.passed).collect();
let durations: Vec<Option<f64>> = run.tests.iter().map(|t| t.duration_ms).collect();
let messages: Vec<Option<String>> = run.tests.iter().map(|t| t.message.clone()).collect();
let nulls_string: Vec<Option<String>> = vec![None; n];
let nulls_int: Vec<Option<i32>> = vec![None; n];
let nulls_bool: Vec<Option<bool>> = vec![None; n];
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(BooleanArray::from(passed)),
Arc::new(Float64Array::from(durations)),
Arc::new(StringArray::from(messages)),
Arc::new(StringArray::from(nulls_string.clone())), Arc::new(StringArray::from(nulls_string.clone())), Arc::new(Int32Array::from(nulls_int.clone())), Arc::new(StringArray::from(nulls_string)), Arc::new(BooleanArray::from(nulls_bool)), ];
Ok(RecordBatch::try_new(s, cols)?)
}
#[allow(dead_code)]
fn _unused_imports_anchor() {}
impl IcebergWarehouse {
pub fn append_symbol_scan(
&self,
scan: &crate::knowledge::symbols::SymbolScan,
) -> Result<()> {
self.rt.as_ref().expect("rt present").block_on(self.append_symbol_scan_async(scan))
}
pub fn record_dep_graph(
&self,
workspace_name: &str,
graph: &crate::warehouse::dep_graph::WorkspaceGraph,
) -> Result<uuid::Uuid> {
self.rt.as_ref().expect("rt present").block_on(
crate::warehouse::dep_graph::record_dep_graph(self, workspace_name, graph),
)
}
pub async fn append_symbol_scan_async(
&self,
scan: &crate::knowledge::symbols::SymbolScan,
) -> Result<()> {
let ts = scan.ts.timestamp_micros();
let snap = scan.snapshot_id.to_string();
if !scan.symbols.is_empty() {
let table = self.catalog.load_table(&self.table_ident(TABLE_SYMBOL_FACTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.symbols.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap.clone(); n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.crate_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.module_path.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.item_kind.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.item_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.visibility.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.symbols.iter().map(|r| r.line as i32).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.symbols.iter().map(|r| r.doc_lines as i32).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.signature.clone()).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
}
if !scan.calls.is_empty() {
let table = self.catalog.load_table(&self.table_ident(TABLE_CALL_EDGES)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.calls.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap.clone(); n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.crate_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.caller_path.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.callee_ident.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.call_kind.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.calls.iter().map(|r| r.line as i32).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
}
if !scan.features.is_empty() {
let table = self.catalog.load_table(&self.table_ident(TABLE_FEATURE_GATE_FACTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.features.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap.clone(); n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.crate_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.module_path.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.item_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.cfg_expr.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.features.iter().map(|r| r.line as i32).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
}
Ok(())
}
pub fn append_git_heat_scan(
&self,
scan: &crate::knowledge::git_heat::GitHeatScan,
) -> Result<()> {
self.rt.as_ref().expect("rt present").block_on(self.append_git_heat_scan_async(scan))
}
pub async fn append_git_heat_scan_async(
&self,
scan: &crate::knowledge::git_heat::GitHeatScan,
) -> Result<()> {
if scan.files.is_empty() {
return Ok(());
}
let ts = scan.ts.timestamp_micros();
let snap = scan.snapshot_id.to_string();
let table = self.catalog.load_table(&self.table_ident(TABLE_GIT_HEAT_FACTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.files.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap; n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.files.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.commits_total).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.commits_30d).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.commits_90d).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.authors_total).collect::<Vec<_>>())),
Arc::new(TimestampMicrosecondArray::from(scan.files.iter().map(|r| r.last_commit_ts.timestamp_micros()).collect::<Vec<_>>()).with_timezone("+00:00")),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
Ok(())
}
pub async fn append_mcp_calls_async(&self, calls: &[McpCall]) -> Result<()> {
if calls.is_empty() {
return Ok(());
}
let table = self.catalog.load_table(&self.table_ident(TABLE_MCP_REQUESTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(
TimestampMicrosecondArray::from(calls.iter().map(|c| c.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(calls.iter().map(|c| c.tool.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(calls.iter().map(|c| c.status.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(calls.iter().map(|c| c.latency_ms).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
Ok(())
}
pub fn append_mcp_calls(&self, calls: &[McpCall]) -> Result<()> {
self.rt.as_ref().expect("rt present").block_on(self.append_mcp_calls_async(calls))
}
pub async fn query_mcp_stats_async(&self) -> Result<Vec<McpToolStat>> {
let table = self.catalog.load_table(&self.table_ident(TABLE_MCP_REQUESTS)).await?;
let batches = scan_all(&table).await?;
struct Acc {
calls: u64,
errors: u64,
lat_sum: i128,
last_ts: i64,
}
let mut map: std::collections::HashMap<String, Acc> = std::collections::HashMap::new();
for b in &batches {
let ts = downcast::<TimestampMicrosecondArray>(b, 0)?;
let tool = downcast::<StringArray>(b, 1)?;
let status = downcast::<StringArray>(b, 2)?;
let lat = downcast::<Int64Array>(b, 3)?;
for i in 0..b.num_rows() {
let e = map.entry(tool.value(i).to_string()).or_insert(Acc {
calls: 0,
errors: 0,
lat_sum: 0,
last_ts: i64::MIN,
});
e.calls += 1;
if status.value(i) != "ok" {
e.errors += 1;
}
e.lat_sum += lat.value(i) as i128;
e.last_ts = e.last_ts.max(ts.value(i));
}
}
let mut out: Vec<McpToolStat> = map
.into_iter()
.map(|(tool, a)| McpToolStat {
tool,
calls: a.calls,
errors: a.errors,
avg_latency_ms: if a.calls > 0 { a.lat_sum as f64 / a.calls as f64 } else { 0.0 },
last_ts_micros: a.last_ts,
})
.collect();
out.sort_by(|a, b| b.calls.cmp(&a.calls).then_with(|| a.tool.cmp(&b.tool)));
Ok(out)
}
pub fn query_mcp_stats(&self) -> Result<Vec<McpToolStat>> {
self.rt.as_ref().expect("rt present").block_on(self.query_mcp_stats_async())
}
}
#[derive(Debug, Clone)]
pub struct McpCall {
pub ts_micros: i64,
pub tool: String,
pub status: String,
pub latency_ms: i64,
}
#[derive(Debug, Clone)]
pub struct McpToolStat {
pub tool: String,
pub calls: u64,
pub errors: u64,
pub avg_latency_ms: f64,
pub last_ts_micros: i64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bench::{BenchResult, BenchRun, TestOutcome};
fn sample_run(machine: &str, ops: f64) -> BenchRun {
let mut metrics = serde_json::Map::new();
metrics.insert("ops_sec".into(), serde_json::json!(ops));
BenchRun {
date: "2026-05-30".into(),
timestamp: Some("2026-05-30T21:00:00Z".into()),
version: "0.1.0".into(),
machine: machine.into(),
cores: 32,
results: vec![BenchResult { name: "x".into(), metrics }],
tests: vec![TestOutcome { name: "smoke".into(), passed: true, duration_ms: Some(1.5), message: None }],
}
}
#[test]
fn roundtrip_bench_run() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let _id = wh.append_bench_run("holger", &sample_run("ryzen", 123.0)).unwrap();
let runs = wh.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
assert_eq!(runs.len(), 1, "expected exactly one bench run");
let r = &runs[0];
assert_eq!(r.machine, "ryzen");
assert_eq!(r.cores, 32);
assert_eq!(r.results.len(), 1);
assert_eq!(r.results[0].name, "x");
let ops = r.results[0].metrics.get("ops_sec").unwrap().as_f64().unwrap();
assert!((ops - 123.0).abs() < 1e-9);
assert_eq!(r.tests.len(), 1);
assert!(r.tests[0].passed);
assert_eq!(r.tests[0].duration_ms, Some(1.5));
}
#[test]
fn partitioned_bench_runs_scope_to_repo() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 1.0)).unwrap();
wh.append_bench_run("znippy", &sample_run("ryzen", 2.0)).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 3.0)).unwrap();
let ops = |r: &BenchRun| r.results[0].metrics.get("ops_sec").unwrap().as_f64().unwrap();
let mut h: Vec<f64> = wh
.query_bench_runs(&BenchFilter::for_repo("holger"))
.unwrap()
.iter()
.map(ops)
.collect();
h.sort_by(|a, b| a.total_cmp(b));
assert_eq!(h, vec![1.0, 3.0], "only holger's two runs");
let z = wh.query_bench_runs(&BenchFilter::for_repo("znippy")).unwrap();
assert_eq!(z.len(), 1, "only znippy's one run");
assert!((ops(&z[0]) - 2.0).abs() < 1e-9);
}
#[test]
fn filter_by_machine() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 1.0)).unwrap();
wh.append_bench_run("holger", &sample_run("threadripper", 2.0)).unwrap();
let filter = BenchFilter {
repo: Some("holger".into()),
machine: Some("threadripper".into()),
limit: None,
};
let runs = wh.query_bench_runs(&filter).unwrap();
assert_eq!(runs.len(), 1);
assert_eq!(runs[0].machine, "threadripper");
}
#[test]
fn reopen_sees_previous_data() {
let dir = tempfile::tempdir().unwrap();
{
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 7.0)).unwrap();
}
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let runs = wh.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
assert_eq!(runs.len(), 1);
}
#[test]
fn mcp_calls_round_trip_and_aggregate() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let t0 = 1_900_000_000_000_000i64;
wh.append_mcp_calls(&[
McpCall { ts_micros: t0, tool: "search".into(), status: "ok".into(), latency_ms: 12 },
McpCall { ts_micros: t0 + 1, tool: "search".into(), status: "err".into(), latency_ms: 8 },
McpCall { ts_micros: t0 + 2, tool: "search".into(), status: "ok".into(), latency_ms: 10 },
McpCall { ts_micros: t0 + 3, tool: "deps_of".into(), status: "ok".into(), latency_ms: 20 },
])
.unwrap();
wh.append_mcp_calls(&[McpCall {
ts_micros: t0 + 4,
tool: "deps_of".into(),
status: "ok".into(),
latency_ms: 30,
}])
.unwrap();
let stats = wh.query_mcp_stats().unwrap();
assert_eq!(stats.len(), 2);
assert_eq!(stats[0].tool, "search");
assert_eq!(stats[0].calls, 3);
assert_eq!(stats[0].errors, 1);
assert!((stats[0].avg_latency_ms - 10.0).abs() < 1e-9);
assert_eq!(stats[1].tool, "deps_of");
assert_eq!(stats[1].calls, 2);
assert_eq!(stats[1].errors, 0);
assert!((stats[1].avg_latency_ms - 25.0).abs() < 1e-9);
assert_eq!(stats[1].last_ts_micros, t0 + 4);
}
#[test]
#[ignore = "A/B benchmark; run with --release --ignored --nocapture"]
fn skade_ab_write_read() {
use std::sync::Arc;
use std::time::{Duration, Instant};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
let env = |k: &str, d: usize| {
std::env::var(k).ok().and_then(|v| v.parse().ok()).unwrap_or(d)
};
let commits = env("AB_COMMITS", 200);
let rows = env("AB_ROWS", 1000);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("repo", DataType::Utf8, false),
Field::new("machine", DataType::Utf8, false),
Field::new("cores", DataType::Int32, false),
Field::new("seq", DataType::Int64, false),
Field::new("score", DataType::Float64, false),
]));
let columns: Vec<arrow::array::ArrayRef> = vec![
Arc::new(StringArray::from((0..rows).map(|i| format!("id-{i:08}")).collect::<Vec<_>>())),
Arc::new(StringArray::from(vec!["nornir"; rows])),
Arc::new(StringArray::from(vec!["oden"; rows])),
Arc::new(Int32Array::from(vec![32i32; rows])),
Arc::new(Int64Array::from((0..rows as i64).collect::<Vec<_>>())),
Arc::new(Float64Array::from((0..rows).map(|i| i as f64).collect::<Vec<_>>())),
];
let bytes_per_batch: usize = columns.iter().map(|c| c.get_array_memory_size()).sum();
let total_rows = (commits * rows) as f64;
let total_mb = (commits * bytes_per_batch) as f64 / 1e6;
let ice_schema = skade::arrow_to_iceberg(&arrow_schema).unwrap();
let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
let mbs = |dt: Duration| total_mb / dt.as_secs_f64();
let rps = |dt: Duration| total_rows / dt.as_secs_f64();
let cps = |dt: Duration| commits as f64 / dt.as_secs_f64();
println!("\n══ skade A/B (commits={commits}, rows/batch={rows}, {total_mb:.1} MB, {total_rows:.0} rows) ══");
let mut a_writes: Vec<(&str, Duration)> = Vec::new();
let mut b_writes: Vec<(&str, Duration)> = Vec::new();
for (label, part_cols) in [
("unpartitioned", &[] as &[&str]),
("partitioned by `repo`", &["repo"] as &[&str]),
] {
let dir_a = tempfile::tempdir().unwrap();
let (a_write, a_read, a_rows) = rt.block_on(async {
let wh_dir = dir_a.path().join("warehouse");
std::fs::create_dir_all(&wh_dir).unwrap();
let catalog = RedbCatalogBuilder::default()
.db_path(dir_a.path().join("catalog.redb"))
.warehouse_location(format!("file://{}", wh_dir.canonicalize().unwrap().display()))
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load("nornir", HashMap::new())
.await
.unwrap();
let ns = NamespaceIdent::new("nornir".to_string());
catalog.create_namespace(&ns, HashMap::new()).await.unwrap();
create_partitioned_table_if_missing(&catalog, &ns, "ab", ice_schema.clone(), part_cols)
.await
.unwrap();
let ident = TableIdent::new(ns, "ab".into());
let table0 = catalog.load_table(&ident).await.unwrap();
let a_schema = Arc::new(schema_to_arrow_schema(table0.metadata().current_schema()).unwrap());
let batch_a = RecordBatch::try_new(a_schema, columns.clone()).unwrap();
let t0 = Instant::now();
for _ in 0..commits {
let table = catalog.load_table(&ident).await.unwrap();
append_batch(&catalog, table, batch_a.clone()).await.unwrap();
}
let write = t0.elapsed();
let table = catalog.load_table(&ident).await.unwrap();
let t1 = Instant::now();
let batches = scan_all(&table).await.unwrap();
let read = t1.elapsed();
(write, read, batches.iter().map(|b| b.num_rows()).sum::<usize>())
});
let dir_b = tempfile::tempdir().unwrap();
let (b_write, b_read, b_rows) = rt.block_on(async {
let wh = skade::open(dir_b.path()).await.unwrap();
let mut table = wh.create_partitioned_table("ab", &arrow_schema, part_cols).await.unwrap();
let batch_b = RecordBatch::try_new(table.arrow_schema().unwrap(), columns.clone()).unwrap();
let t0 = Instant::now();
for _ in 0..commits {
table.append(std::slice::from_ref(&batch_b)).await.unwrap();
}
let write = t0.elapsed();
let t1 = Instant::now();
let batches = table.read().await.unwrap();
let read = t1.elapsed();
(write, read, batches.iter().map(|b| b.num_rows()).sum::<usize>())
});
assert_eq!(a_rows, b_rows, "[{label}] both paths must read back the same row count");
assert_eq!(a_rows, commits * rows, "[{label}] all appended rows must be readable");
println!("\n── {label} ──");
println!("WRITE nornir(append_batch): {:>8.0} rows/s {:>6.1} commits/s {:>6.1} MB/s ({:?})",
rps(a_write), cps(a_write), mbs(a_write), a_write);
println!("WRITE skade(append): {:>8.0} rows/s {:>6.1} commits/s {:>6.1} MB/s ({:?})",
rps(b_write), cps(b_write), mbs(b_write), b_write);
println!(" → skade/nornir write ratio: {:.2}x ({})",
a_write.as_secs_f64() / b_write.as_secs_f64(),
if b_write <= a_write { "skade ≥ nornir ✓" } else { "skade slower" });
println!("READ nornir(scan_all): {:>8.0} rows/s ({:?})", rps(a_read), a_read);
println!("READ skade(read): {:>8.0} rows/s ({:?})", rps(b_read), b_read);
println!(" → skade/nornir read ratio: {:.2}x",
a_read.as_secs_f64() / b_read.as_secs_f64());
a_writes.push((label, a_write));
b_writes.push((label, b_write));
}
if a_writes.len() == 2 {
let ov = |v: &[(&str, Duration)]| v[1].1.as_secs_f64() / v[0].1.as_secs_f64();
println!("\n── partition overhead (partitioned ÷ unpartitioned write) ──");
println!(" nornir(append_batch): {:.2}x skade(append): {:.2}x", ov(&a_writes), ov(&b_writes));
}
}
}