use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use arrow::array::{
Array, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray,
TimestampMicrosecondArray,
};
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeZone, Utc};
use futures::TryStreamExt;
use iceberg::arrow::schema_to_arrow_schema;
use iceberg::expr::Reference;
use iceberg::io::LocalFsStorageFactory;
use iceberg::spec::{Datum, PartitionSpec, Transform, UnboundPartitionSpec};
use iceberg::table::Table;
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
use skade_katalog::{RedbCatalog, RedbCatalogBuilder};
use tokio::runtime::Runtime;
use uuid::Uuid;
use super::iceberg_schema;
use super::{BenchFilter, Warehouse};
use crate::bench::{BenchResult, BenchRun, TestOutcome};
const NAMESPACE: &str = "nornir";
const TABLE_BENCH_RUNS: &str = "bench_runs";
const TABLE_BENCH_RESULTS: &str = "bench_results";
const TABLE_BENCH_TELEMETRY: &str = "bench_telemetry";
const TABLE_TEST_OUTCOMES: &str = "test_outcomes";
pub(crate) const TABLE_TEST_RESULTS: &str = "test_results";
pub(crate) const TABLE_DEP_GRAPH_EDGES: &str = "dep_graph_edges";
pub(crate) const TABLE_RELEASE_LINEAGE: &str = "release_lineage";
pub(crate) const TABLE_RELEASE_EVENTS: &str = "release_events";
pub(crate) const TABLE_FUNNEL_EVENTS: &str = "funnel_events";
pub(crate) const TABLE_TANTIVY_INDEX_SNAPSHOTS: &str = "tantivy_index_snapshots";
pub(crate) const TABLE_TANTIVY_INDEX_BLOBS: &str = "tantivy_index_blobs";
pub(crate) const TABLE_DWARF_SNAPSHOTS: &str = "dwarf_snapshots";
pub(crate) const TABLE_DWARF_BLOBS: &str = "dwarf_blobs";
pub(crate) const TABLE_GIMLI_SNAPSHOTS: &str = "gimli_snapshots";
pub(crate) const TABLE_GIMLI_BLOBS: &str = "gimli_blobs";
pub(crate) const TABLE_RUSTDOC_JSON_SNAPSHOTS: &str = "rustdoc_json_snapshots";
pub(crate) const TABLE_RUSTDOC_JSON_BLOBS: &str = "rustdoc_json_blobs";
pub(crate) const TABLE_EMBEDDINGS: &str = "embeddings";
pub(crate) const TABLE_EMBEDDING_SNAPSHOTS: &str = "embedding_snapshots";
pub(crate) const TABLE_EMBEDDING_MANIFEST: &str = "embedding_manifest";
pub(crate) const TABLE_PATH_DEP_AUDITS: &str = "path_dep_audits";
pub(crate) const TABLE_PATCH_STRIP_EVENTS: &str = "patch_strip_events";
pub(crate) const TABLE_PUBLISH_ATTEMPTS: &str = "publish_attempts";
pub(crate) const TABLE_CRATE_METADATA_CHECKS: &str = "crate_metadata_checks";
pub(crate) const TABLE_CRATE_TARBALL_STATS: &str = "crate_tarball_stats";
pub(crate) const TABLE_YANK_EVENTS: &str = "yank_events";
pub(crate) const TABLE_SEMVER_DIFFS: &str = "semver_diffs";
pub(crate) const TABLE_LINKS_DECLARATIONS: &str = "links_declarations";
pub(crate) const TABLE_RESOLVED_FEATURES: &str = "resolved_features";
pub(crate) const TABLE_MSRV_PROBES: &str = "msrv_probes";
pub(crate) const TABLE_TEST_IMPACTED_SELECTIONS: &str = "test_impacted_selections";
pub(crate) const TABLE_TEST_QUARANTINES: &str = "test_quarantines";
pub(crate) const TABLE_RELEASE_COMMITS: &str = "release_commits";
pub(crate) const TABLE_REGISTRY_MIRRORS: &str = "registry_mirrors";
pub(crate) const TABLE_VERSION_BUMP_PLANS: &str = "version_bump_plans";
pub(crate) const TABLE_VERSION_BUMP_TARGETS: &str = "version_bump_targets";
pub(crate) const TABLE_SYMBOL_FACTS: &str = "symbol_facts";
pub(crate) const TABLE_CALL_EDGES: &str = "call_edges";
pub(crate) const TABLE_FEATURE_GATE_FACTS: &str = "feature_gate_facts";
pub(crate) const TABLE_GIT_HEAT_FACTS: &str = "git_heat_facts";
pub(crate) const TABLE_WAREHOUSE_ACCESS_EDGES: &str = "warehouse_access_edges";
pub(crate) const TABLE_DOC_EXPORTS: &str = "doc_exports";
pub(crate) const TABLE_ARCHITECTURE_WIRING: &str = "architecture_wiring";
pub(crate) const TABLE_MCP_REQUESTS: &str = "mcp_requests";
pub(crate) const TABLE_VULN_FINDINGS: &str = "vuln_findings";
pub(crate) const TABLE_DOCS_INDEX_SNAPSHOTS: &str = "docs_index_snapshots";
pub(crate) const TABLE_DOCS_INDEX_BLOBS: &str = "docs_index_blobs";
pub(crate) const TABLE_SBOM_COMPONENTS: &str = "sbom_components";
pub(crate) const TABLE_KNOWLEDGE_SCANS: &str = "knowledge_scans";
pub(crate) const TABLE_AGENT_MODEL_RUNS: &str = "agent_model_runs";
pub(crate) const TABLE_VIZ_ACTIONS: &str = "viz_actions";
pub(crate) const TABLE_SURFACE_COVERAGE: &str = "surface_coverage";
pub struct IcebergWarehouse {
catalog: Arc<RedbCatalog>,
rt: Option<Runtime>,
namespace: NamespaceIdent,
root: std::path::PathBuf,
_snapshot: Option<tempfile::TempDir>,
}
pub fn is_catalog_lock_error(err: &anyhow::Error) -> bool {
err.chain().any(|e| {
let m = e.to_string();
m.contains("Database already open") || m.contains("Cannot acquire lock")
})
}
fn copy_catalog_consistent(live: &Path, dst: &Path) -> Result<()> {
copy_catalog_consistent_with(live, dst, |s, d| {
std::fs::copy(s, d)
.map(|_| ())
.with_context(|| format!("copy {} -> {}", s.display(), d.display()))
})
}
fn copy_catalog_consistent_with(
live: &Path,
dst: &Path,
mut copy: impl FnMut(&Path, &Path) -> Result<()>,
) -> Result<()> {
const MAX_ATTEMPTS: usize = 64;
let len_of = |p: &Path| -> Result<u64> {
Ok(std::fs::metadata(p)
.with_context(|| format!("stat {}", p.display()))?
.len())
};
for attempt in 0..MAX_ATTEMPTS {
let before = len_of(live)?;
copy(live, dst)?;
let after = len_of(live)?;
if before == after && len_of(dst)? >= after {
return Ok(());
}
std::thread::sleep(std::time::Duration::from_millis(2 + attempt as u64 / 4));
}
anyhow::bail!(
"snapshot of locked catalog {} never reached a size-stable window after \
{MAX_ATTEMPTS} attempts (catalog growing under a very hot writer)",
live.display()
)
}
impl Drop for IcebergWarehouse {
fn drop(&mut self) {
if let Some(rt) = self.rt.take() {
rt.shutdown_background();
}
}
}
impl IcebergWarehouse {
pub fn open(root: &Path) -> Result<Self> {
std::fs::create_dir_all(root)
.with_context(|| format!("create warehouse root {}", root.display()))?;
let warehouse_dir = root.join("warehouse");
std::fs::create_dir_all(&warehouse_dir)
.with_context(|| format!("create warehouse data dir {}", warehouse_dir.display()))?;
let db_path = root.join("catalog.redb");
let warehouse_uri = format!("file://{}", warehouse_dir.canonicalize()?.display());
Self::open_with_catalog(root, &db_path, &warehouse_uri, None)
}
pub fn open_read_only(root: &Path) -> Result<Self> {
match Self::open(root) {
Ok(wh) => Ok(wh),
Err(e) if is_catalog_lock_error(&e) => {
eprintln!(
"WARNING: nornir: catalog.redb at {} is locked by another process \
(a live nornir-server?); opening a read-only copied snapshot instead. \
Reads reflect the catalog as of now; this CLI cannot mutate the live \
warehouse while the server holds it.",
root.display()
);
Self::open_snapshot(root).with_context(|| {
format!(
"open read-only snapshot of locked catalog at {}",
root.display()
)
})
}
Err(e) => Err(e),
}
}
fn open_snapshot(root: &Path) -> Result<Self> {
let live_db = root.join("catalog.redb");
if !live_db.exists() {
anyhow::bail!(
"no catalog.redb at {} to snapshot",
live_db.display()
);
}
let tmp = tempfile::Builder::new()
.prefix("nornir-catalog-snapshot-")
.tempdir()
.context("create temp dir for catalog snapshot")?;
let snap_db = tmp.path().join("catalog.redb");
copy_catalog_consistent(&live_db, &snap_db).with_context(|| {
format!(
"copy locked catalog {} -> snapshot {}",
live_db.display(),
snap_db.display()
)
})?;
let warehouse_dir = root.join("warehouse");
let warehouse_uri = format!("file://{}", warehouse_dir.canonicalize()?.display());
Self::open_with_catalog(root, &snap_db, &warehouse_uri, Some(tmp))
}
fn open_with_catalog(
root: &Path,
db_path: &Path,
warehouse_uri: &str,
snapshot: Option<tempfile::TempDir>,
) -> Result<Self> {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("build tokio runtime")?;
let catalog = rt.block_on(async {
RedbCatalogBuilder::default()
.db_path(db_path)
.warehouse_location(warehouse_uri)
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load("nornir", HashMap::new())
.await
})?;
let catalog = Arc::new(catalog);
let namespace = NamespaceIdent::new(NAMESPACE.to_string());
rt.block_on(ensure_layout(&catalog, &namespace))?;
Ok(Self {
catalog,
rt: Some(rt),
namespace,
root: root.to_path_buf(),
_snapshot: snapshot,
})
}
pub fn root(&self) -> &Path {
&self.root
}
pub fn is_snapshot(&self) -> bool {
self._snapshot.is_some()
}
pub(crate) fn table_ident(&self, name: &str) -> TableIdent {
TableIdent::new(self.namespace.clone(), name.to_string())
}
}
impl IcebergWarehouse {
pub fn catalog(&self) -> &Arc<RedbCatalog> {
&self.catalog
}
pub fn block_on<F: std::future::Future>(&self, fut: F) -> F::Output {
self.rt.as_ref().expect("rt present").block_on(fut)
}
}
async fn ensure_layout(catalog: &RedbCatalog, ns: &NamespaceIdent) -> Result<()> {
if !catalog.namespace_exists(ns).await? {
catalog.create_namespace(ns, HashMap::new()).await?;
}
create_partitioned_table_if_missing(
catalog,
ns,
TABLE_BENCH_RUNS,
iceberg_schema::bench_runs()?,
&["repo"],
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_BENCH_RESULTS,
iceberg_schema::bench_results()?,
)
.await?;
create_partitioned_table_if_missing(
catalog,
ns,
TABLE_BENCH_TELEMETRY,
iceberg_schema::bench_telemetry()?,
&["repo"],
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_TEST_OUTCOMES,
iceberg_schema::test_outcomes()?,
)
.await?;
create_partitioned_table_if_missing(
catalog,
ns,
TABLE_TEST_RESULTS,
iceberg_schema::test_results()?,
&["repo"],
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_DEP_GRAPH_EDGES,
iceberg_schema::dep_graph_edges()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_RELEASE_LINEAGE,
iceberg_schema::release_lineage()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_RELEASE_EVENTS,
iceberg_schema::release_events()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_FUNNEL_EVENTS,
iceberg_schema::funnel_events()?,
)
.await?;
for (snap_name, blob_name) in [
(TABLE_TANTIVY_INDEX_SNAPSHOTS, TABLE_TANTIVY_INDEX_BLOBS),
(TABLE_DWARF_SNAPSHOTS, TABLE_DWARF_BLOBS),
(TABLE_GIMLI_SNAPSHOTS, TABLE_GIMLI_BLOBS),
(TABLE_RUSTDOC_JSON_SNAPSHOTS, TABLE_RUSTDOC_JSON_BLOBS),
(TABLE_DOCS_INDEX_SNAPSHOTS, TABLE_DOCS_INDEX_BLOBS),
] {
create_partitioned_table_if_missing(
catalog, ns, snap_name, iceberg_schema::artifact_snapshots()?, &["repo"],
)
.await?;
create_partitioned_table_if_missing(
catalog, ns, blob_name, iceberg_schema::artifact_blobs()?, &["snapshot_id"],
)
.await?;
}
create_table_if_missing(catalog, ns, TABLE_DOC_EXPORTS, iceberg_schema::doc_exports()?).await?;
create_table_if_missing(
catalog,
ns,
TABLE_ARCHITECTURE_WIRING,
iceberg_schema::architecture_wiring()?,
)
.await?;
create_table_if_missing(catalog, ns, TABLE_EMBEDDINGS, iceberg_schema::embeddings()?).await?;
create_table_if_missing(
catalog,
ns,
TABLE_EMBEDDING_SNAPSHOTS,
iceberg_schema::embedding_snapshots()?,
)
.await?;
create_table_if_missing(
catalog,
ns,
TABLE_EMBEDDING_MANIFEST,
iceberg_schema::embedding_manifest()?,
)
.await?;
for (name, schema) in [
(TABLE_PATH_DEP_AUDITS, iceberg_schema::path_dep_audits()?),
(TABLE_PATCH_STRIP_EVENTS, iceberg_schema::patch_strip_events()?),
(TABLE_PUBLISH_ATTEMPTS, iceberg_schema::publish_attempts()?),
(TABLE_CRATE_METADATA_CHECKS, iceberg_schema::crate_metadata_checks()?),
(TABLE_CRATE_TARBALL_STATS, iceberg_schema::crate_tarball_stats()?),
(TABLE_YANK_EVENTS, iceberg_schema::yank_events()?),
(TABLE_SEMVER_DIFFS, iceberg_schema::semver_diffs()?),
(TABLE_LINKS_DECLARATIONS, iceberg_schema::links_declarations()?),
(TABLE_RESOLVED_FEATURES, iceberg_schema::resolved_features()?),
(TABLE_MSRV_PROBES, iceberg_schema::msrv_probes()?),
(TABLE_TEST_IMPACTED_SELECTIONS, iceberg_schema::test_impacted_selections()?),
(TABLE_TEST_QUARANTINES, iceberg_schema::test_quarantines()?),
(TABLE_RELEASE_COMMITS, iceberg_schema::release_commits()?),
(TABLE_REGISTRY_MIRRORS, iceberg_schema::registry_mirrors()?),
(TABLE_VERSION_BUMP_PLANS, iceberg_schema::version_bump_plans()?),
(TABLE_VERSION_BUMP_TARGETS, iceberg_schema::version_bump_targets()?),
(TABLE_MCP_REQUESTS, iceberg_schema::mcp_requests()?),
(TABLE_VULN_FINDINGS, iceberg_schema::vuln_findings()?),
(TABLE_WAREHOUSE_ACCESS_EDGES, iceberg_schema::warehouse_access_edges()?),
(TABLE_SURFACE_COVERAGE, iceberg_schema::surface_coverage()?),
] {
create_table_if_missing(catalog, ns, name, schema).await?;
}
for (name, schema) in [
(TABLE_SYMBOL_FACTS, iceberg_schema::symbol_facts()?),
(TABLE_CALL_EDGES, iceberg_schema::call_edges()?),
(TABLE_FEATURE_GATE_FACTS, iceberg_schema::feature_gate_facts()?),
(TABLE_GIT_HEAT_FACTS, iceberg_schema::git_heat_facts()?),
(TABLE_SBOM_COMPONENTS, iceberg_schema::sbom_components()?),
(TABLE_KNOWLEDGE_SCANS, iceberg_schema::knowledge_scans()?),
] {
create_partitioned_table_if_missing(catalog, ns, name, schema, &["repo"]).await?;
}
create_partitioned_table_if_missing(
catalog,
ns,
TABLE_AGENT_MODEL_RUNS,
iceberg_schema::agent_model_runs()?,
&["run_id"],
)
.await?;
create_partitioned_table_if_missing(
catalog,
ns,
TABLE_VIZ_ACTIONS,
iceberg_schema::viz_actions()?,
&["session_id"],
)
.await?;
Ok(())
}
async fn create_table_if_missing(
catalog: &RedbCatalog,
ns: &NamespaceIdent,
name: &str,
schema: iceberg::spec::Schema,
) -> Result<()> {
create_partitioned_table_if_missing(catalog, ns, name, schema, &[]).await
}
async fn create_partitioned_table_if_missing(
catalog: &RedbCatalog,
ns: &NamespaceIdent,
name: &str,
schema: iceberg::spec::Schema,
partition_cols: &[&str],
) -> Result<()> {
let ident = TableIdent::new(ns.clone(), name.to_string());
if catalog.table_exists(&ident).await? {
return Ok(());
}
let creation = if partition_cols.is_empty() {
TableCreation::builder().name(name.to_string()).schema(schema).build()
} else {
let spec = identity_partition_spec(&schema, partition_cols)?;
TableCreation::builder()
.name(name.to_string())
.schema(schema)
.partition_spec(spec)
.build()
};
catalog.create_table(ns, creation).await?;
Ok(())
}
fn identity_partition_spec(
schema: &iceberg::spec::Schema,
columns: &[&str],
) -> Result<UnboundPartitionSpec> {
let mut b = PartitionSpec::builder(Arc::new(schema.clone()));
for c in columns {
b = b.add_partition_field(c, (*c).to_string(), Transform::Identity)?;
}
Ok(b.build()?.into_unbound())
}
impl IcebergWarehouse {
pub async fn append_bench_run_async(&self, repo: &str, run: &BenchRun) -> Result<Uuid> {
if run.machine.trim().is_empty() {
anyhow::bail!("BenchRun.machine is required");
}
let run_id = Uuid::new_v4();
let ts = resolve_timestamp(run)?;
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RUNS)).await?;
let batch = build_bench_runs_batch(&table, run_id, repo, ts, run)?;
append_batch(&self.catalog, table, batch).await?;
let results_batch_opt = {
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RESULTS)).await?;
let batch = build_bench_results_batch(&table, run_id, run)?;
if batch.num_rows() > 0 { Some((table, batch)) } else { None }
};
if let Some((table, batch)) = results_batch_opt {
append_batch(&self.catalog, table, batch).await?;
}
let telem_batch_opt = {
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_TELEMETRY)).await?;
let batch = build_bench_telemetry_batch(&table, run_id, repo, run)?;
if batch.num_rows() > 0 { Some((table, batch)) } else { None }
};
if let Some((table, batch)) = telem_batch_opt {
append_batch(&self.catalog, table, batch).await?;
}
let tests_batch_opt = {
let table = self.catalog.load_table(&self.table_ident(TABLE_TEST_OUTCOMES)).await?;
let batch = build_test_outcomes_batch(&table, run_id, run)?;
if batch.num_rows() > 0 { Some((table, batch)) } else { None }
};
if let Some((table, batch)) = tests_batch_opt {
append_batch(&self.catalog, table, batch).await?;
}
Ok(run_id)
}
async fn artifact_snapshot_id_for(
&self,
table_name: &str,
repo: &str,
git_sha: &str,
) -> Result<Option<String>> {
let table = match self.catalog.load_table(&self.table_ident(table_name)).await {
Ok(t) => t,
Err(_) => return Ok(None),
};
let batches = scan_repo_filtered(&table, Some(repo)).await?;
let mut found = None;
for b in &batches {
let sid = downcast::<StringArray>(b, 0)?;
let sha = downcast::<StringArray>(b, 3)?;
for i in 0..b.num_rows() {
if sha.value(i) == git_sha {
found = Some(sid.value(i).to_string());
}
}
}
Ok(found)
}
pub async fn index_snapshot_id_for(&self, repo: &str, git_sha: &str) -> Result<Option<String>> {
self.artifact_snapshot_id_for(TABLE_TANTIVY_INDEX_SNAPSHOTS, repo, git_sha).await
}
pub async fn dwarf_snapshot_id_for(&self, repo: &str, git_sha: &str) -> Result<Option<String>> {
self.artifact_snapshot_id_for(TABLE_DWARF_SNAPSHOTS, repo, git_sha).await
}
pub fn table_names(&self) -> Result<Vec<String>> {
self.rt.as_ref().expect("rt present").block_on(async {
let mut names: Vec<String> = self
.catalog
.list_tables(&self.namespace)
.await?
.into_iter()
.map(|t| t.name().to_string())
.collect();
names.sort();
Ok(names)
})
}
pub fn scan_preview(&self, table: &str, limit: usize) -> Result<TablePreview> {
self.rt.as_ref().expect("rt present").block_on(async {
let t = self.catalog.load_table(&self.table_ident(table)).await?;
let batches = scan_limited(&t, limit).await?;
let mut columns: Vec<String> = Vec::new();
let mut rows: Vec<Vec<String>> = Vec::new();
'outer: for b in &batches {
if columns.is_empty() {
columns = b.schema().fields().iter().map(|f| f.name().to_string()).collect();
}
for i in 0..b.num_rows() {
if rows.len() >= limit {
break 'outer;
}
let row = (0..b.num_columns())
.map(|c| cell_to_string(b.column(c).as_ref(), i))
.collect();
rows.push(row);
}
}
Ok(TablePreview { columns, rows })
})
}
}
#[derive(Debug, Clone, Default)]
pub struct TablePreview {
pub columns: Vec<String>,
pub rows: Vec<Vec<String>>,
}
fn cell_to_string(arr: &dyn Array, row: usize) -> String {
if arr.is_null(row) {
return String::new();
}
if let Some(a) = arr.as_any().downcast_ref::<StringArray>() {
return a.value(row).to_string();
}
if let Some(a) = arr.as_any().downcast_ref::<Int64Array>() {
return a.value(row).to_string();
}
if let Some(a) = arr.as_any().downcast_ref::<Int32Array>() {
return a.value(row).to_string();
}
if let Some(a) = arr.as_any().downcast_ref::<Float64Array>() {
return format!("{:.4}", a.value(row));
}
if let Some(a) = arr.as_any().downcast_ref::<BooleanArray>() {
return a.value(row).to_string();
}
format!("<{}>", arr.data_type())
}
impl Warehouse for IcebergWarehouse {
fn append_bench_run(&self, repo: &str, run: &BenchRun) -> Result<Uuid> {
self.rt.as_ref().expect("rt present").block_on(self.append_bench_run_async(repo, run))
}
fn query_bench_runs(&self, filter: &BenchFilter) -> Result<Vec<BenchRun>> {
self.rt.as_ref().expect("rt present").block_on(self.query_bench_runs_async(filter))
}
}
impl IcebergWarehouse {
pub async fn query_bench_runs_async(&self, filter: &BenchFilter) -> Result<Vec<BenchRun>> {
async {
let mut by_id: std::collections::HashMap<Uuid, BenchRun> =
std::collections::HashMap::new();
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RUNS)).await?;
for batch in scan_repo_filtered(&table, filter.repo.as_deref()).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let repos = downcast::<StringArray>(&batch, 1)?;
let ts = downcast::<TimestampMicrosecondArray>(&batch, 2)?;
let dates = downcast::<StringArray>(&batch, 3)?;
let versions = downcast::<StringArray>(&batch, 4)?;
let machines = downcast::<StringArray>(&batch, 5)?;
let cores = downcast::<Int32Array>(&batch, 6)?;
for i in 0..batch.num_rows() {
if let Some(want) = &filter.repo {
if repos.value(i) != want { continue; }
}
if let Some(want) = &filter.machine {
if machines.value(i) != want { continue; }
}
let uid = Uuid::parse_str(ids.value(i))?;
let ts_dt = Utc.timestamp_micros(ts.value(i)).single()
.context("invalid micro timestamp")?;
by_id.insert(uid, BenchRun {
date: dates.value(i).to_string(),
timestamp: Some(ts_dt.to_rfc3339()),
version: versions.value(i).to_string(),
machine: machines.value(i).to_string(),
cores: cores.value(i).max(0) as u32,
results: Vec::new(),
tests: Vec::new(),
});
}
}
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_RESULTS)).await?;
for batch in scan_all(&table).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let names = downcast::<StringArray>(&batch, 1)?;
let metrics = downcast::<StringArray>(&batch, 2)?;
let values = downcast::<Float64Array>(&batch, 3)?;
for i in 0..batch.num_rows() {
let uid = Uuid::parse_str(ids.value(i))?;
if let Some(run) = by_id.get_mut(&uid) {
let name = names.value(i).to_string();
let entry = run.results.iter_mut().find(|r| r.name == name);
let target = if let Some(e) = entry {
e
} else {
run.results.push(BenchResult { name, metrics: Default::default() });
run.results.last_mut().unwrap()
};
target.metrics.insert(
metrics.value(i).to_string(),
serde_json::Value::from(values.value(i)),
);
}
}
}
let table = self.catalog.load_table(&self.table_ident(TABLE_TEST_OUTCOMES)).await?;
for batch in scan_all(&table).await? {
let ids = downcast::<StringArray>(&batch, 0)?;
let names = downcast::<StringArray>(&batch, 1)?;
let passed = downcast::<BooleanArray>(&batch, 2)?;
let durations = downcast::<Float64Array>(&batch, 3)?;
let messages = downcast::<StringArray>(&batch, 4)?;
for i in 0..batch.num_rows() {
let uid = Uuid::parse_str(ids.value(i))?;
if let Some(run) = by_id.get_mut(&uid) {
run.tests.push(TestOutcome {
name: names.value(i).to_string(),
passed: passed.value(i),
duration_ms: if durations.is_null(i) { None } else { Some(durations.value(i)) },
message: if messages.is_null(i) { None } else { Some(messages.value(i).to_string()) },
});
}
}
}
let mut out: Vec<BenchRun> = by_id.into_values().collect();
out.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
if let Some(n) = filter.limit {
let drop_n = out.len().saturating_sub(n);
out.drain(..drop_n);
}
anyhow::Ok(out)
}.await
}
}
impl IcebergWarehouse {
pub fn query_bench_telemetry(&self, repo: Option<&str>) -> Result<Vec<BenchTelemetryRow>> {
self.rt
.as_ref()
.expect("rt present")
.block_on(self.query_bench_telemetry_async(repo))
}
pub async fn query_bench_telemetry_async(
&self,
repo: Option<&str>,
) -> Result<Vec<BenchTelemetryRow>> {
let table = self.catalog.load_table(&self.table_ident(TABLE_BENCH_TELEMETRY)).await?;
let mut out = Vec::new();
for batch in scan_repo_filtered(&table, repo).await? {
let run_ids = downcast::<StringArray>(&batch, 0)?;
let repos = downcast::<StringArray>(&batch, 1)?;
let benches = downcast::<StringArray>(&batch, 2)?;
let n_cores = downcast::<Int32Array>(&batch, 3)?;
let cpu_avg = downcast::<Float64Array>(&batch, 4)?;
let cpu_max = downcast::<Float64Array>(&batch, 5)?;
let busy_avg = downcast::<Float64Array>(&batch, 6)?;
let busy_max = downcast::<Int32Array>(&batch, 7)?;
let mem_peak = downcast::<Float64Array>(&batch, 8)?;
let mem_pct = downcast::<Float64Array>(&batch, 9)?;
let elapsed = downcast::<Float64Array>(&batch, 10)?;
for i in 0..batch.num_rows() {
if let Some(want) = repo {
if repos.value(i) != want {
continue;
}
}
out.push(BenchTelemetryRow {
run_id: run_ids.value(i).to_string(),
repo: repos.value(i).to_string(),
bench: benches.value(i).to_string(),
n_cores: n_cores.value(i).max(0) as u32,
cpu_pct_avg: cpu_avg.value(i),
cpu_pct_max: cpu_max.value(i),
cores_busy_avg: busy_avg.value(i),
cores_busy_max: busy_max.value(i).max(0) as u32,
mem_peak_mb: mem_peak.value(i),
mem_pct_max: mem_pct.value(i),
elapsed_ms: elapsed.value(i),
});
}
}
Ok(out)
}
}
#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
pub struct BenchTelemetryRow {
pub run_id: String,
pub repo: String,
pub bench: String,
pub n_cores: u32,
pub cpu_pct_avg: f64,
pub cpu_pct_max: f64,
pub cores_busy_avg: f64,
pub cores_busy_max: u32,
pub mem_peak_mb: f64,
pub mem_pct_max: f64,
pub elapsed_ms: f64,
}
pub(crate) async fn append_batch(catalog: &RedbCatalog, table: Table, batch: RecordBatch) -> Result<()> {
skade::append(catalog, &table, std::slice::from_ref(&batch)).await?;
Ok(())
}
pub(crate) async fn ensure_table_schema(
catalog: &RedbCatalog,
ident: &TableIdent,
table: Table,
canonical: &iceberg::spec::Schema,
) -> Result<Table> {
use std::collections::HashSet;
use iceberg::TableUpdate;
let current = table.metadata().current_schema();
let have: HashSet<&str> =
current.as_struct().fields().iter().map(|f| f.name.as_str()).collect();
let missing: Vec<&str> = canonical
.as_struct()
.fields()
.iter()
.map(|f| f.name.as_str())
.filter(|n| !have.contains(n))
.collect();
if missing.is_empty() {
return Ok(table);
}
use iceberg::spec::Schema;
let new_schema_id = current.schema_id() + 1;
let evolved = Schema::builder()
.with_schema_id(new_schema_id)
.with_fields(canonical.as_struct().fields().to_vec())
.build()?;
let updates = vec![
TableUpdate::AddSchema { schema: evolved },
TableUpdate::SetCurrentSchema { schema_id: -1 },
];
eprintln!(
"nornir: evolving warehouse table `{}` schema — adding column(s) [{}] \
(Iceberg add-column migration)",
ident.name(),
missing.join(", "),
);
catalog.commit_table(ident.clone(), Vec::new(), updates).await?;
Ok(catalog.load_table(ident).await?)
}
async fn scan_all(table: &Table) -> Result<Vec<RecordBatch>> {
Ok(skade::read_all(table).await?)
}
async fn scan_limited(table: &Table, limit: usize) -> Result<Vec<RecordBatch>> {
use futures::StreamExt;
if limit == 0 {
return scan_all(table).await;
}
let batch_size = limit.clamp(256, 8192);
let mut stream = table
.scan()
.select_all()
.with_batch_size(Some(batch_size))
.build()?
.to_arrow()
.await?;
let mut batches = Vec::new();
let mut have = 0usize;
while have < limit {
match stream.next().await {
Some(b) => {
let b = b?;
have += b.num_rows();
batches.push(b);
}
None => break, }
}
Ok(batches)
}
async fn scan_repo_filtered(table: &Table, repo: Option<&str>) -> Result<Vec<RecordBatch>> {
let mut builder = table.scan();
if let Some(r) = repo {
builder = builder.with_filter(Reference::new("repo").equal_to(Datum::string(r)));
}
let scan = builder.build()?;
let stream = scan.to_arrow().await?;
let batches: Vec<RecordBatch> = stream.try_collect().await?;
Ok(batches)
}
fn downcast<T: 'static>(batch: &RecordBatch, idx: usize) -> Result<&T> {
batch
.column(idx)
.as_any()
.downcast_ref::<T>()
.ok_or_else(|| anyhow!("column {idx} has unexpected type {:?}", batch.column(idx).data_type()))
}
fn resolve_timestamp(run: &BenchRun) -> Result<DateTime<Utc>> {
if let Some(s) = &run.timestamp {
return DateTime::parse_from_rfc3339(s)
.map(|dt| dt.with_timezone(&Utc))
.with_context(|| format!("parse timestamp {s}"));
}
let nd = NaiveDate::parse_from_str(&run.date, "%Y-%m-%d")
.with_context(|| format!("parse date {}", run.date))?;
let ndt: NaiveDateTime = nd.and_hms_opt(0, 0, 0).unwrap();
Ok(Utc.from_utc_datetime(&ndt))
}
fn build_bench_runs_batch(
table: &Table,
run_id: Uuid,
repo: &str,
ts: DateTime<Utc>,
run: &BenchRun,
) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![run_id.to_string()])),
Arc::new(StringArray::from(vec![repo.to_string()])),
Arc::new(TimestampMicrosecondArray::from(vec![ts.timestamp_micros()]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![run.date.clone()])),
Arc::new(StringArray::from(vec![run.version.clone()])),
Arc::new(StringArray::from(vec![run.machine.clone()])),
Arc::new(Int32Array::from(vec![run.cores as i32])),
];
Ok(RecordBatch::try_new(s, cols)?)
}
fn build_bench_results_batch(table: &Table, run_id: Uuid, run: &BenchRun) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let id_str = run_id.to_string();
let mut ids = Vec::new();
let mut names = Vec::new();
let mut metrics = Vec::new();
let mut values = Vec::new();
for r in &run.results {
for (k, v) in &r.metrics {
if k.starts_with(crate::bench::telemetry::TELEM_PREFIX) {
continue;
}
let f = match v {
serde_json::Value::Number(n) => n.as_f64(),
_ => continue,
};
if let Some(f) = f {
ids.push(id_str.clone());
names.push(r.name.clone());
metrics.push(k.clone());
values.push(f);
}
}
}
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(StringArray::from(metrics)),
Arc::new(Float64Array::from(values)),
];
Ok(RecordBatch::try_new(s, cols)?)
}
fn build_bench_telemetry_batch(
table: &Table,
run_id: Uuid,
repo: &str,
run: &BenchRun,
) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let id_str = run_id.to_string();
let mut ids = Vec::new();
let mut repos = Vec::new();
let mut benches = Vec::new();
let mut n_cores = Vec::new();
let mut cpu_avg = Vec::new();
let mut cpu_max = Vec::new();
let mut busy_avg = Vec::new();
let mut busy_max = Vec::new();
let mut mem_peak = Vec::new();
let mut mem_pct = Vec::new();
let mut elapsed = Vec::new();
for r in &run.results {
let Some(t) = crate::bench::telemetry::from_metrics(&r.metrics) else { continue };
ids.push(id_str.clone());
repos.push(repo.to_string());
benches.push(r.name.clone());
n_cores.push(t.n_cores as i32);
cpu_avg.push(t.cpu_pct_avg);
cpu_max.push(t.cpu_pct_max);
busy_avg.push(t.cores_busy_avg);
busy_max.push(t.cores_busy_max as i32);
mem_peak.push(t.mem_peak_mb);
mem_pct.push(t.mem_pct_max);
elapsed.push(t.elapsed_ms);
}
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(repos)),
Arc::new(StringArray::from(benches)),
Arc::new(Int32Array::from(n_cores)),
Arc::new(Float64Array::from(cpu_avg)),
Arc::new(Float64Array::from(cpu_max)),
Arc::new(Float64Array::from(busy_avg)),
Arc::new(Int32Array::from(busy_max)),
Arc::new(Float64Array::from(mem_peak)),
Arc::new(Float64Array::from(mem_pct)),
Arc::new(Float64Array::from(elapsed)),
];
Ok(RecordBatch::try_new(s, cols)?)
}
fn build_test_outcomes_batch(table: &Table, run_id: Uuid, run: &BenchRun) -> Result<RecordBatch> {
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let id_str = run_id.to_string();
let n = run.tests.len();
let ids: Vec<String> = run.tests.iter().map(|_| id_str.clone()).collect();
let names: Vec<String> = run.tests.iter().map(|t| t.name.clone()).collect();
let passed: Vec<bool> = run.tests.iter().map(|t| t.passed).collect();
let durations: Vec<Option<f64>> = run.tests.iter().map(|t| t.duration_ms).collect();
let messages: Vec<Option<String>> = run.tests.iter().map(|t| t.message.clone()).collect();
let nulls_string: Vec<Option<String>> = vec![None; n];
let nulls_int: Vec<Option<i32>> = vec![None; n];
let nulls_bool: Vec<Option<bool>> = vec![None; n];
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(ids)),
Arc::new(StringArray::from(names)),
Arc::new(BooleanArray::from(passed)),
Arc::new(Float64Array::from(durations)),
Arc::new(StringArray::from(messages)),
Arc::new(StringArray::from(nulls_string.clone())), Arc::new(StringArray::from(nulls_string.clone())), Arc::new(Int32Array::from(nulls_int.clone())), Arc::new(StringArray::from(nulls_string)), Arc::new(BooleanArray::from(nulls_bool)), ];
Ok(RecordBatch::try_new(s, cols)?)
}
#[allow(dead_code)]
fn _unused_imports_anchor() {}
impl IcebergWarehouse {
pub fn append_symbol_scan(
&self,
scan: &crate::knowledge::symbols::SymbolScan,
) -> Result<()> {
self.rt.as_ref().expect("rt present").block_on(self.append_symbol_scan_async(scan))
}
pub fn record_dep_graph(
&self,
workspace_name: &str,
graph: &crate::warehouse::dep_graph::WorkspaceGraph,
) -> Result<uuid::Uuid> {
self.rt.as_ref().expect("rt present").block_on(
crate::warehouse::dep_graph::record_dep_graph(self, workspace_name, graph),
)
}
pub async fn append_symbol_scan_async(
&self,
scan: &crate::knowledge::symbols::SymbolScan,
) -> Result<()> {
let ts = scan.ts.timestamp_micros();
let snap = scan.snapshot_id.to_string();
if !scan.symbols.is_empty() {
let table = self.catalog.load_table(&self.table_ident(TABLE_SYMBOL_FACTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.symbols.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap.clone(); n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.crate_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.module_path.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.item_kind.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.item_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.visibility.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.symbols.iter().map(|r| r.line as i32).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.symbols.iter().map(|r| r.doc_lines as i32).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.symbols.iter().map(|r| r.signature.clone()).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
}
if !scan.calls.is_empty() {
let table = self.catalog.load_table(&self.table_ident(TABLE_CALL_EDGES)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.calls.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap.clone(); n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.crate_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.caller_path.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.callee_ident.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.call_kind.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.calls.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.calls.iter().map(|r| r.line as i32).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
}
if !scan.features.is_empty() {
let table = self.catalog.load_table(&self.table_ident(TABLE_FEATURE_GATE_FACTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.features.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap.clone(); n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.crate_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.module_path.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.item_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.cfg_expr.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(scan.features.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int32Array::from(scan.features.iter().map(|r| r.line as i32).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
}
Ok(())
}
pub fn append_git_heat_scan(
&self,
scan: &crate::knowledge::git_heat::GitHeatScan,
) -> Result<()> {
self.rt.as_ref().expect("rt present").block_on(self.append_git_heat_scan_async(scan))
}
pub async fn append_git_heat_scan_async(
&self,
scan: &crate::knowledge::git_heat::GitHeatScan,
) -> Result<()> {
if scan.files.is_empty() {
return Ok(());
}
let ts = scan.ts.timestamp_micros();
let snap = scan.snapshot_id.to_string();
let table = self.catalog.load_table(&self.table_ident(TABLE_GIT_HEAT_FACTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = scan.files.len();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap; n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![scan.repo.clone(); n])),
Arc::new(StringArray::from(scan.files.iter().map(|r| r.file.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.commits_total).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.commits_30d).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.commits_90d).collect::<Vec<_>>())),
Arc::new(Int64Array::from(scan.files.iter().map(|r| r.authors_total).collect::<Vec<_>>())),
Arc::new(TimestampMicrosecondArray::from(scan.files.iter().map(|r| r.last_commit_ts.timestamp_micros()).collect::<Vec<_>>()).with_timezone("+00:00")),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
Ok(())
}
pub async fn append_mcp_calls_async(&self, calls: &[McpCall]) -> Result<()> {
if calls.is_empty() {
return Ok(());
}
let table = self.catalog.load_table(&self.table_ident(TABLE_MCP_REQUESTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(
TimestampMicrosecondArray::from(calls.iter().map(|c| c.ts_micros).collect::<Vec<_>>())
.with_timezone("+00:00"),
),
Arc::new(StringArray::from(calls.iter().map(|c| c.tool.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(calls.iter().map(|c| c.status.clone()).collect::<Vec<_>>())),
Arc::new(Int64Array::from(calls.iter().map(|c| c.latency_ms).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
Ok(())
}
pub fn append_mcp_calls(&self, calls: &[McpCall]) -> Result<()> {
self.rt.as_ref().expect("rt present").block_on(self.append_mcp_calls_async(calls))
}
pub async fn query_mcp_stats_async(&self) -> Result<Vec<McpToolStat>> {
let table = self.catalog.load_table(&self.table_ident(TABLE_MCP_REQUESTS)).await?;
let batches = scan_all(&table).await?;
struct Acc {
calls: u64,
errors: u64,
lat_sum: i128,
last_ts: i64,
}
let mut map: std::collections::HashMap<String, Acc> = std::collections::HashMap::new();
for b in &batches {
let ts = downcast::<TimestampMicrosecondArray>(b, 0)?;
let tool = downcast::<StringArray>(b, 1)?;
let status = downcast::<StringArray>(b, 2)?;
let lat = downcast::<Int64Array>(b, 3)?;
for i in 0..b.num_rows() {
let e = map.entry(tool.value(i).to_string()).or_insert(Acc {
calls: 0,
errors: 0,
lat_sum: 0,
last_ts: i64::MIN,
});
e.calls += 1;
if status.value(i) != "ok" {
e.errors += 1;
}
e.lat_sum += lat.value(i) as i128;
e.last_ts = e.last_ts.max(ts.value(i));
}
}
let mut out: Vec<McpToolStat> = map
.into_iter()
.map(|(tool, a)| McpToolStat {
tool,
calls: a.calls,
errors: a.errors,
avg_latency_ms: if a.calls > 0 { a.lat_sum as f64 / a.calls as f64 } else { 0.0 },
last_ts_micros: a.last_ts,
})
.collect();
out.sort_by(|a, b| b.calls.cmp(&a.calls).then_with(|| a.tool.cmp(&b.tool)));
Ok(out)
}
pub fn query_mcp_stats(&self) -> Result<Vec<McpToolStat>> {
self.rt.as_ref().expect("rt present").block_on(self.query_mcp_stats_async())
}
pub async fn query_vuln_findings_async(
&self,
) -> Result<std::collections::HashMap<(String, String), (Vec<String>, String, i64)>> {
let table = self.catalog.load_table(&self.table_ident(TABLE_VULN_FINDINGS)).await?;
let batches = scan_all(&table).await?;
let mut map: std::collections::HashMap<(String, String), (Vec<String>, String, i64)> =
std::collections::HashMap::new();
for b in &batches {
let kr = downcast::<StringArray>(b, 0)?;
let ver = downcast::<StringArray>(b, 1)?;
let ids = downcast::<StringArray>(b, 2)?;
let sum = downcast::<StringArray>(b, 3)?;
let ts = downcast::<TimestampMicrosecondArray>(b, 4)?;
for i in 0..b.num_rows() {
let key = (kr.value(i).to_string(), ver.value(i).to_string());
let checked = ts.value(i);
let entry = map.entry(key).or_insert((Vec::new(), String::new(), i64::MIN));
if checked >= entry.2 {
let id_vec: Vec<String> =
ids.value(i).split(',').filter(|s| !s.is_empty()).map(String::from).collect();
*entry = (id_vec, sum.value(i).to_string(), checked);
}
}
}
Ok(map)
}
pub fn query_vuln_findings(
&self,
) -> Result<std::collections::HashMap<(String, String), (Vec<String>, String, i64)>> {
self.rt.as_ref().expect("rt present").block_on(self.query_vuln_findings_async())
}
pub async fn append_vuln_findings_async(&self, rows: &[VulnFinding]) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let table = self.catalog.load_table(&self.table_ident(TABLE_VULN_FINDINGS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(rows.iter().map(|r| r.crate_name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.version.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.ids.join(",")).collect::<Vec<_>>())),
Arc::new(StringArray::from(rows.iter().map(|r| r.summary.clone()).collect::<Vec<_>>())),
Arc::new(
TimestampMicrosecondArray::from(
rows.iter().map(|r| r.checked_at_micros).collect::<Vec<_>>(),
)
.with_timezone("+00:00"),
),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
Ok(())
}
pub fn append_vuln_findings(&self, rows: &[VulnFinding]) -> Result<()> {
self.rt.as_ref().expect("rt present").block_on(self.append_vuln_findings_async(rows))
}
pub async fn append_sbom_components_async(
&self,
repo: &str,
snapshot_id: Uuid,
components: &[SbomComponentRow],
) -> Result<()> {
if components.is_empty() {
return Ok(());
}
let table = self.catalog.load_table(&self.table_ident(TABLE_SBOM_COMPONENTS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let n = components.len();
let snap = snapshot_id.to_string();
let ts = Utc::now().timestamp_micros();
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![snap; n])),
Arc::new(TimestampMicrosecondArray::from(vec![ts; n]).with_timezone("+00:00")),
Arc::new(StringArray::from(vec![repo.to_string(); n])),
Arc::new(StringArray::from(components.iter().map(|c| c.name.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(components.iter().map(|c| c.version.clone()).collect::<Vec<_>>())),
Arc::new(StringArray::from(components.iter().map(|c| c.license.clone()).collect::<Vec<_>>())),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
Ok(())
}
pub fn append_sbom_components(
&self,
repo: &str,
snapshot_id: Uuid,
components: &[SbomComponentRow],
) -> Result<()> {
self.rt
.as_ref()
.expect("rt present")
.block_on(self.append_sbom_components_async(repo, snapshot_id, components))
}
pub async fn query_sbom_components_async(
&self,
repo: &str,
) -> Result<Option<Vec<SbomComponentRow>>> {
let table = self.catalog.load_table(&self.table_ident(TABLE_SBOM_COMPONENTS)).await?;
let scan = table
.scan()
.with_filter(iceberg::expr::Reference::new("repo").equal_to(Datum::string(repo)))
.build()?;
let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
let mut latest: Option<(String, i64)> = None;
for b in &batches {
let snaps = downcast::<StringArray>(b, 0)?;
let ts = downcast::<TimestampMicrosecondArray>(b, 1)?;
let repos = downcast::<StringArray>(b, 2)?;
for i in 0..b.num_rows() {
if repos.value(i) != repo {
continue;
}
let t = ts.value(i);
if latest.as_ref().map(|(_, lt)| t > *lt).unwrap_or(true) {
latest = Some((snaps.value(i).to_string(), t));
}
}
}
let Some((snap, _)) = latest else { return Ok(None) };
let mut out = Vec::new();
for b in &batches {
let snaps = downcast::<StringArray>(b, 0)?;
let name = downcast::<StringArray>(b, 3)?;
let version = downcast::<StringArray>(b, 4)?;
let license = downcast::<StringArray>(b, 5)?;
for i in 0..b.num_rows() {
if snaps.value(i) != snap {
continue;
}
out.push(SbomComponentRow {
name: name.value(i).to_string(),
version: version.value(i).to_string(),
license: license.value(i).to_string(),
});
}
}
Ok(Some(out))
}
pub fn query_sbom_components(&self, repo: &str) -> Result<Option<Vec<SbomComponentRow>>> {
self.rt.as_ref().expect("rt present").block_on(self.query_sbom_components_async(repo))
}
pub async fn knowledge_scan_exists_async(&self, repo: &str, git_sha: &str) -> Result<bool> {
let table = self.catalog.load_table(&self.table_ident(TABLE_KNOWLEDGE_SCANS)).await?;
let scan = table
.scan()
.with_filter(iceberg::expr::Reference::new("repo").equal_to(Datum::string(repo)))
.build()?;
let batches: Vec<RecordBatch> = scan.to_arrow().await?.try_collect().await?;
for b in &batches {
let repos = downcast::<StringArray>(b, 0)?;
let shas = downcast::<StringArray>(b, 1)?;
for i in 0..b.num_rows() {
if repos.value(i) == repo && shas.value(i) == git_sha {
return Ok(true);
}
}
}
Ok(false)
}
pub fn knowledge_scan_exists(&self, repo: &str, git_sha: &str) -> Result<bool> {
self.rt.as_ref().expect("rt present").block_on(self.knowledge_scan_exists_async(repo, git_sha))
}
pub async fn record_knowledge_scan_async(
&self,
repo: &str,
git_sha: &str,
snapshot_id: Uuid,
) -> Result<()> {
if self.knowledge_scan_exists_async(repo, git_sha).await? {
return Ok(());
}
let table = self.catalog.load_table(&self.table_ident(TABLE_KNOWLEDGE_SCANS)).await?;
let s = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?);
let cols: Vec<Arc<dyn Array>> = vec![
Arc::new(StringArray::from(vec![repo.to_string()])),
Arc::new(StringArray::from(vec![git_sha.to_string()])),
Arc::new(StringArray::from(vec![snapshot_id.to_string()])),
Arc::new(
TimestampMicrosecondArray::from(vec![Utc::now().timestamp_micros()])
.with_timezone("+00:00"),
),
];
let batch = RecordBatch::try_new(s, cols)?;
append_batch(&self.catalog, table, batch).await?;
Ok(())
}
pub fn record_knowledge_scan(&self, repo: &str, git_sha: &str, snapshot_id: Uuid) -> Result<()> {
self.rt
.as_ref()
.expect("rt present")
.block_on(self.record_knowledge_scan_async(repo, git_sha, snapshot_id))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SbomComponentRow {
pub name: String,
pub version: String,
pub license: String,
}
#[derive(Debug, Clone)]
pub struct McpCall {
pub ts_micros: i64,
pub tool: String,
pub status: String,
pub latency_ms: i64,
}
#[derive(Debug, Clone)]
pub struct VulnFinding {
pub crate_name: String,
pub version: String,
pub ids: Vec<String>,
pub summary: String,
pub checked_at_micros: i64,
}
#[derive(Debug, Clone)]
pub struct McpToolStat {
pub tool: String,
pub calls: u64,
pub errors: u64,
pub avg_latency_ms: f64,
pub last_ts_micros: i64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bench::{BenchResult, BenchRun, TestOutcome};
fn sample_run(machine: &str, ops: f64) -> BenchRun {
let mut metrics = serde_json::Map::new();
metrics.insert("ops_sec".into(), serde_json::json!(ops));
BenchRun {
date: "2026-05-30".into(),
timestamp: Some("2026-05-30T21:00:00Z".into()),
version: "0.1.0".into(),
machine: machine.into(),
cores: 32,
results: vec![BenchResult { name: "x".into(), metrics }],
tests: vec![TestOutcome { name: "smoke".into(), passed: true, duration_ms: Some(1.5), message: None }],
}
}
#[test]
fn roundtrip_bench_run() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let _id = wh.append_bench_run("holger", &sample_run("ryzen", 123.0)).unwrap();
let runs = wh.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
assert_eq!(runs.len(), 1, "expected exactly one bench run");
let r = &runs[0];
assert_eq!(r.machine, "ryzen");
assert_eq!(r.cores, 32);
assert_eq!(r.results.len(), 1);
assert_eq!(r.results[0].name, "x");
let ops = r.results[0].metrics.get("ops_sec").unwrap().as_f64().unwrap();
assert!((ops - 123.0).abs() < 1e-9);
assert_eq!(r.tests.len(), 1);
assert!(r.tests[0].passed);
assert_eq!(r.tests[0].duration_ms, Some(1.5));
}
#[test]
fn bench_telemetry_lands_as_warehouse_rows() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let sampler = crate::bench::telemetry::Sampler::start();
let stop = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let mut handles = Vec::new();
let n = std::thread::available_parallelism().map(|x| x.get()).unwrap_or(2).max(2);
for _ in 0..n {
let s = std::sync::Arc::clone(&stop);
handles.push(std::thread::spawn(move || {
let mut x = 0u64;
while !s.load(std::sync::atomic::Ordering::Relaxed) {
x = x.wrapping_mul(2654435761).wrapping_add(1);
std::hint::black_box(x);
}
}));
}
std::thread::sleep(std::time::Duration::from_millis(2200));
stop.store(true, std::sync::atomic::Ordering::Relaxed);
for h in handles {
let _ = h.join();
}
let telem = sampler.stop();
let mut metrics = serde_json::Map::new();
metrics.insert("ops_sec".into(), serde_json::json!(999.0));
crate::bench::telemetry::inject_into_metrics(&mut metrics, &telem);
let run = BenchRun {
date: "2026-06-12".into(),
timestamp: Some("2026-06-12T10:00:00Z".into()),
version: "0.1.0".into(),
machine: "test-host".into(),
cores: telem.n_cores.max(1),
results: vec![BenchResult { name: "demo.cpu_burn".into(), metrics }],
tests: vec![],
};
let run_id = wh.append_bench_run("demo", &run).unwrap().to_string();
let rows = wh.query_bench_telemetry(Some("demo")).unwrap();
assert_eq!(rows.len(), 1, "expected exactly one telemetry row");
let row = &rows[0];
assert_eq!(row.run_id, run_id);
assert_eq!(row.repo, "demo");
assert_eq!(row.bench, "demo.cpu_burn");
assert!(row.elapsed_ms >= 2000.0, "elapsed_ms={}", row.elapsed_ms);
if crate::bench::telemetry::Sampler::start().n_cores() > 0
|| std::fs::read_to_string("/proc/stat").is_ok()
{
assert!(row.n_cores >= 1, "n_cores={}", row.n_cores);
assert!(
row.cores_busy_max >= 1,
"expected ≥1 busy core under an N-thread spin, got {}",
row.cores_busy_max
);
assert!(row.cpu_pct_max > 0.0, "cpu_pct_max={}", row.cpu_pct_max);
assert!(row.mem_peak_mb > 0.0, "mem_peak_mb={}", row.mem_peak_mb);
}
let runs = wh.query_bench_runs(&BenchFilter::for_repo("demo")).unwrap();
assert_eq!(runs.len(), 1);
let res = &runs[0].results[0];
assert!(res.metrics.contains_key("ops_sec"));
assert!(
!res.metrics.keys().any(|k| k.starts_with("telem_")),
"telem_* keys must be routed to bench_telemetry, not bench_results: {:?}",
res.metrics.keys().collect::<Vec<_>>()
);
}
#[test]
fn open_read_only_degrades_when_catalog_locked() {
let dir = tempfile::tempdir().unwrap();
{
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 42.0)).unwrap();
}
let _server = IcebergWarehouse::open(dir.path()).unwrap();
match IcebergWarehouse::open(dir.path()) {
Ok(_) => panic!("plain open should not acquire an already-held lock"),
Err(err) => assert!(
is_catalog_lock_error(&err),
"expected a catalog lock error, got: {err:#}"
),
}
let ro = IcebergWarehouse::open_read_only(dir.path())
.expect("open_read_only should degrade to a snapshot, not error");
assert!(ro._snapshot.is_some(), "expected a copied-aside snapshot");
let runs = ro.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
assert_eq!(runs.len(), 1, "snapshot must reflect the pre-lock row");
let ops = runs[0].results[0].metrics.get("ops_sec").unwrap().as_f64().unwrap();
assert!((ops - 42.0).abs() < 1e-9);
}
#[test]
fn open_read_only_reads_latest_committed_under_live_lock() {
let dir = tempfile::tempdir().unwrap();
let server = IcebergWarehouse::open(dir.path()).unwrap();
server.append_bench_run("holger", &sample_run("ryzen", 1.0)).unwrap();
server.append_bench_run("holger", &sample_run("ryzen", 2.0)).unwrap();
server.append_bench_run("holger", &sample_run("ryzen", 3.0)).unwrap();
let ro = IcebergWarehouse::open_read_only(dir.path())
.expect("open_read_only should degrade to a snapshot, not error");
assert!(
ro._snapshot.is_some(),
"server holds the lock → reader must use a copied-aside snapshot"
);
let runs = ro.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
let mut ops: Vec<f64> = runs
.iter()
.map(|r| r.results[0].metrics.get("ops_sec").unwrap().as_f64().unwrap())
.collect();
ops.sort_by(|a, b| a.total_cmp(b));
assert_eq!(
ops,
vec![1.0, 2.0, 3.0],
"lock-contended read must see EVERY committed row (latest state), not a \
stale/empty subset"
);
drop(ro);
drop(server);
}
#[test]
fn open_read_only_snapshot_is_consistent_with_concurrent_writer() {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc as StdArc;
let dir = tempfile::tempdir().unwrap();
let server = IcebergWarehouse::open(dir.path()).unwrap();
server.append_bench_run("holger", &sample_run("ryzen", 0.0)).unwrap();
let server = StdArc::new(server);
let committed = StdArc::new(AtomicUsize::new(1));
let stop = StdArc::new(std::sync::atomic::AtomicBool::new(false));
let writer = {
let server = StdArc::clone(&server);
let committed = StdArc::clone(&committed);
let stop = StdArc::clone(&stop);
std::thread::spawn(move || {
let mut n = 1.0_f64;
while !stop.load(Ordering::Relaxed) {
server.append_bench_run("holger", &sample_run("ryzen", n)).unwrap();
committed.fetch_add(1, Ordering::SeqCst);
n += 1.0;
}
})
};
let mut max_seen = 0usize;
for _ in 0..40 {
let floor = committed.load(Ordering::SeqCst);
let ro = IcebergWarehouse::open_read_only(dir.path())
.expect("concurrent lock-tolerant open must not fail on a torn copy");
let n = ro.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap().len();
assert!(
n >= floor,
"snapshot saw {n} rows but {floor} were already committed before the copy — stale read"
);
max_seen = max_seen.max(n);
drop(ro);
}
stop.store(true, Ordering::Relaxed);
writer.join().unwrap();
assert!(max_seen > 1, "writer should have advanced past the seed row");
}
#[test]
fn copy_catalog_consistent_rejects_short_growing_copies_until_stable() {
use std::cell::Cell;
let dir = tempfile::tempdir().unwrap();
let live = dir.path().join("catalog.redb");
let snap = dir.path().join("snap.redb");
std::fs::write(&live, vec![1u8; 1024 * 1024]).unwrap();
let attempt = Cell::new(0usize);
let first_short_copy_len = Cell::new(0u64);
let res = copy_catalog_consistent_with(&live, &snap, |s, d| {
let n = attempt.get();
attempt.set(n + 1);
let src_len = std::fs::metadata(s).unwrap().len();
if n < 3 {
let mut body = std::fs::read(s).unwrap();
body.extend(std::iter::repeat(7u8).take(256 * 1024));
std::fs::write(s, &body).unwrap(); std::fs::write(d, vec![1u8; src_len as usize]).unwrap(); if n == 0 {
first_short_copy_len.set(src_len);
}
} else {
std::fs::copy(s, d).unwrap();
}
Ok(())
});
res.expect("must converge on a size-stable copy");
let stable_src_len = std::fs::metadata(&live).unwrap().len();
let got = std::fs::metadata(&snap).unwrap().len();
assert!(
first_short_copy_len.get() < stable_src_len,
"test precondition: the first (blind) copy must be short ({} < {})",
first_short_copy_len.get(),
stable_src_len
);
assert_eq!(
got, stable_src_len,
"fix must return the size-stable, full-length copy ({stable_src_len}), not a \
short growth-torn image like the pre-fix blind copy ({})",
first_short_copy_len.get()
);
assert!(attempt.get() >= 4, "expected retries past the short copies, got {}", attempt.get());
}
#[test]
fn open_read_only_is_exclusive_when_unlocked() {
let dir = tempfile::tempdir().unwrap();
{
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 7.0)).unwrap();
}
let ro = IcebergWarehouse::open_read_only(dir.path()).unwrap();
assert!(ro._snapshot.is_none(), "unlocked open must not snapshot");
let runs = ro.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
assert_eq!(runs.len(), 1);
}
#[test]
fn partitioned_bench_runs_scope_to_repo() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 1.0)).unwrap();
wh.append_bench_run("znippy", &sample_run("ryzen", 2.0)).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 3.0)).unwrap();
let ops = |r: &BenchRun| r.results[0].metrics.get("ops_sec").unwrap().as_f64().unwrap();
let mut h: Vec<f64> = wh
.query_bench_runs(&BenchFilter::for_repo("holger"))
.unwrap()
.iter()
.map(ops)
.collect();
h.sort_by(|a, b| a.total_cmp(b));
assert_eq!(h, vec![1.0, 3.0], "only holger's two runs");
let z = wh.query_bench_runs(&BenchFilter::for_repo("znippy")).unwrap();
assert_eq!(z.len(), 1, "only znippy's one run");
assert!((ops(&z[0]) - 2.0).abs() < 1e-9);
}
#[test]
fn filter_by_machine() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 1.0)).unwrap();
wh.append_bench_run("holger", &sample_run("threadripper", 2.0)).unwrap();
let filter = BenchFilter {
repo: Some("holger".into()),
machine: Some("threadripper".into()),
limit: None,
};
let runs = wh.query_bench_runs(&filter).unwrap();
assert_eq!(runs.len(), 1);
assert_eq!(runs[0].machine, "threadripper");
}
#[test]
fn reopen_sees_previous_data() {
let dir = tempfile::tempdir().unwrap();
{
let wh = IcebergWarehouse::open(dir.path()).unwrap();
wh.append_bench_run("holger", &sample_run("ryzen", 7.0)).unwrap();
}
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let runs = wh.query_bench_runs(&BenchFilter::for_repo("holger")).unwrap();
assert_eq!(runs.len(), 1);
}
#[test]
fn mcp_calls_round_trip_and_aggregate() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
let t0 = 1_900_000_000_000_000i64;
wh.append_mcp_calls(&[
McpCall { ts_micros: t0, tool: "search".into(), status: "ok".into(), latency_ms: 12 },
McpCall { ts_micros: t0 + 1, tool: "search".into(), status: "err".into(), latency_ms: 8 },
McpCall { ts_micros: t0 + 2, tool: "search".into(), status: "ok".into(), latency_ms: 10 },
McpCall { ts_micros: t0 + 3, tool: "deps_of".into(), status: "ok".into(), latency_ms: 20 },
])
.unwrap();
wh.append_mcp_calls(&[McpCall {
ts_micros: t0 + 4,
tool: "deps_of".into(),
status: "ok".into(),
latency_ms: 30,
}])
.unwrap();
let stats = wh.query_mcp_stats().unwrap();
assert_eq!(stats.len(), 2);
assert_eq!(stats[0].tool, "search");
assert_eq!(stats[0].calls, 3);
assert_eq!(stats[0].errors, 1);
assert!((stats[0].avg_latency_ms - 10.0).abs() < 1e-9);
assert_eq!(stats[1].tool, "deps_of");
assert_eq!(stats[1].calls, 2);
assert_eq!(stats[1].errors, 0);
assert!((stats[1].avg_latency_ms - 25.0).abs() < 1e-9);
assert_eq!(stats[1].last_ts_micros, t0 + 4);
}
#[test]
fn sbom_components_latest_snapshot_wins() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
assert!(wh.query_sbom_components("znippy").unwrap().is_none());
let snap1 = Uuid::new_v4();
wh.append_sbom_components(
"znippy",
snap1,
&[
SbomComponentRow { name: "serde".into(), version: "1.0.0".into(), license: "MIT".into() },
SbomComponentRow { name: "anyhow".into(), version: "1.0.80".into(), license: "MIT OR Apache-2.0".into() },
],
)
.unwrap();
let got = wh.query_sbom_components("znippy").unwrap().unwrap();
assert_eq!(got.len(), 2);
assert!(got.iter().any(|c| c.name == "serde" && c.version == "1.0.0" && c.license == "MIT"));
let snap2 = Uuid::new_v4();
wh.append_sbom_components(
"znippy",
snap2,
&[SbomComponentRow { name: "serde".into(), version: "1.0.1".into(), license: "MIT".into() }],
)
.unwrap();
let got = wh.query_sbom_components("znippy").unwrap().unwrap();
assert_eq!(got.len(), 1, "only the latest snapshot's components");
assert_eq!(got[0].version, "1.0.1");
assert!(wh.query_sbom_components("other").unwrap().is_none());
}
#[test]
fn knowledge_scan_ledger_is_idempotent_per_sha() {
let dir = tempfile::tempdir().unwrap();
let wh = IcebergWarehouse::open(dir.path()).unwrap();
assert!(!wh.knowledge_scan_exists("znippy", "deadbeef").unwrap());
wh.record_knowledge_scan("znippy", "deadbeef", Uuid::new_v4()).unwrap();
assert!(wh.knowledge_scan_exists("znippy", "deadbeef").unwrap());
wh.record_knowledge_scan("znippy", "deadbeef", Uuid::new_v4()).unwrap();
let n = wh.block_on(async {
let t = wh.catalog().load_table(&wh.table_ident(TABLE_KNOWLEDGE_SCANS)).await.unwrap();
let batches = scan_all(&t).await.unwrap();
batches.iter().map(|b| b.num_rows()).sum::<usize>()
});
assert_eq!(n, 1, "duplicate record_knowledge_scan must not add a row");
assert!(!wh.knowledge_scan_exists("znippy", "cafef00d").unwrap());
assert!(!wh.knowledge_scan_exists("other", "deadbeef").unwrap());
wh.record_knowledge_scan("znippy", "cafef00d", Uuid::new_v4()).unwrap();
assert!(wh.knowledge_scan_exists("znippy", "cafef00d").unwrap());
}
#[test]
#[ignore = "A/B benchmark; run with --release --ignored --nocapture"]
fn skade_ab_write_read() {
use std::sync::Arc;
use std::time::{Duration, Instant};
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
let env = |k: &str, d: usize| {
std::env::var(k).ok().and_then(|v| v.parse().ok()).unwrap_or(d)
};
let commits = env("AB_COMMITS", 200);
let rows = env("AB_ROWS", 1000);
let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Utf8, false),
Field::new("repo", DataType::Utf8, false),
Field::new("machine", DataType::Utf8, false),
Field::new("cores", DataType::Int32, false),
Field::new("seq", DataType::Int64, false),
Field::new("score", DataType::Float64, false),
]));
let columns: Vec<arrow::array::ArrayRef> = vec![
Arc::new(StringArray::from((0..rows).map(|i| format!("id-{i:08}")).collect::<Vec<_>>())),
Arc::new(StringArray::from(vec!["nornir"; rows])),
Arc::new(StringArray::from(vec!["oden"; rows])),
Arc::new(Int32Array::from(vec![32i32; rows])),
Arc::new(Int64Array::from((0..rows as i64).collect::<Vec<_>>())),
Arc::new(Float64Array::from((0..rows).map(|i| i as f64).collect::<Vec<_>>())),
];
let bytes_per_batch: usize = columns.iter().map(|c| c.get_array_memory_size()).sum();
let total_rows = (commits * rows) as f64;
let total_mb = (commits * bytes_per_batch) as f64 / 1e6;
let ice_schema = skade::arrow_to_iceberg(&arrow_schema).unwrap();
let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap();
let mbs = |dt: Duration| total_mb / dt.as_secs_f64();
let rps = |dt: Duration| total_rows / dt.as_secs_f64();
let cps = |dt: Duration| commits as f64 / dt.as_secs_f64();
println!("\n══ skade A/B (commits={commits}, rows/batch={rows}, {total_mb:.1} MB, {total_rows:.0} rows) ══");
let mut a_writes: Vec<(&str, Duration)> = Vec::new();
let mut b_writes: Vec<(&str, Duration)> = Vec::new();
for (label, part_cols) in [
("unpartitioned", &[] as &[&str]),
("partitioned by `repo`", &["repo"] as &[&str]),
] {
let dir_a = tempfile::tempdir().unwrap();
let (a_write, a_read, a_rows) = rt.block_on(async {
let wh_dir = dir_a.path().join("warehouse");
std::fs::create_dir_all(&wh_dir).unwrap();
let catalog = RedbCatalogBuilder::default()
.db_path(dir_a.path().join("catalog.redb"))
.warehouse_location(format!("file://{}", wh_dir.canonicalize().unwrap().display()))
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load("nornir", HashMap::new())
.await
.unwrap();
let ns = NamespaceIdent::new("nornir".to_string());
catalog.create_namespace(&ns, HashMap::new()).await.unwrap();
create_partitioned_table_if_missing(&catalog, &ns, "ab", ice_schema.clone(), part_cols)
.await
.unwrap();
let ident = TableIdent::new(ns, "ab".into());
let table0 = catalog.load_table(&ident).await.unwrap();
let a_schema = Arc::new(schema_to_arrow_schema(table0.metadata().current_schema()).unwrap());
let batch_a = RecordBatch::try_new(a_schema, columns.clone()).unwrap();
let t0 = Instant::now();
for _ in 0..commits {
let table = catalog.load_table(&ident).await.unwrap();
append_batch(&catalog, table, batch_a.clone()).await.unwrap();
}
let write = t0.elapsed();
let table = catalog.load_table(&ident).await.unwrap();
let t1 = Instant::now();
let batches = scan_all(&table).await.unwrap();
let read = t1.elapsed();
(write, read, batches.iter().map(|b| b.num_rows()).sum::<usize>())
});
let dir_b = tempfile::tempdir().unwrap();
let (b_write, b_read, b_rows) = rt.block_on(async {
let wh = skade::open(dir_b.path()).await.unwrap();
let mut table = wh.create_partitioned_table("ab", &arrow_schema, part_cols).await.unwrap();
let batch_b = RecordBatch::try_new(table.arrow_schema().unwrap(), columns.clone()).unwrap();
let t0 = Instant::now();
for _ in 0..commits {
table.append(std::slice::from_ref(&batch_b)).await.unwrap();
}
let write = t0.elapsed();
let t1 = Instant::now();
let batches = table.read().await.unwrap();
let read = t1.elapsed();
(write, read, batches.iter().map(|b| b.num_rows()).sum::<usize>())
});
assert_eq!(a_rows, b_rows, "[{label}] both paths must read back the same row count");
assert_eq!(a_rows, commits * rows, "[{label}] all appended rows must be readable");
println!("\n── {label} ──");
println!("WRITE nornir(append_batch): {:>8.0} rows/s {:>6.1} commits/s {:>6.1} MB/s ({:?})",
rps(a_write), cps(a_write), mbs(a_write), a_write);
println!("WRITE skade(append): {:>8.0} rows/s {:>6.1} commits/s {:>6.1} MB/s ({:?})",
rps(b_write), cps(b_write), mbs(b_write), b_write);
println!(" → skade/nornir write ratio: {:.2}x ({})",
a_write.as_secs_f64() / b_write.as_secs_f64(),
if b_write <= a_write { "skade ≥ nornir ✓" } else { "skade slower" });
println!("READ nornir(scan_all): {:>8.0} rows/s ({:?})", rps(a_read), a_read);
println!("READ skade(read): {:>8.0} rows/s ({:?})", rps(b_read), b_read);
println!(" → skade/nornir read ratio: {:.2}x",
a_read.as_secs_f64() / b_read.as_secs_f64());
a_writes.push((label, a_write));
b_writes.push((label, b_write));
}
if a_writes.len() == 2 {
let ov = |v: &[(&str, Duration)]| v[1].1.as_secs_f64() / v[0].1.as_secs_f64();
println!("\n── partition overhead (partitioned ÷ unpartitioned write) ──");
println!(" nornir(append_batch): {:.2}x skade(append): {:.2}x", ov(&a_writes), ov(&b_writes));
}
}
}