use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
use anyhow::Result;
use iceberg::io::LocalFsStorageFactory;
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
use skade_katalog::RedbCatalogBuilder;
use tempfile::TempDir;
fn schema_v1() -> Schema {
Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "run_id", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "ops_sec", Type::Primitive(PrimitiveType::Double)).into(),
])
.build()
.expect("schema v1")
}
async fn catalog_with_cache(tmp: &TempDir, cache_bytes: u64) -> Result<skade_katalog::RedbCatalog> {
let db_path = tmp.path().join("catalog.redb");
let warehouse = tmp.path().join("warehouse");
std::fs::create_dir_all(&warehouse)?;
Ok(RedbCatalogBuilder::default()
.db_path(db_path)
.warehouse_location(format!("file://{}", warehouse.display()))
.metadata_cache_bytes(cache_bytes)
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load("nornir", HashMap::new())
.await?)
}
async fn seed_table(cat: &skade_katalog::RedbCatalog) -> Result<TableIdent> {
let ns = NamespaceIdent::new("bench".to_string());
cat.create_namespace(&ns, HashMap::new()).await?;
let creation = TableCreation::builder()
.name("bench_runs".to_string())
.schema(schema_v1())
.build();
cat.create_table(&ns, creation).await?;
Ok(TableIdent::new(ns, "bench_runs".to_string()))
}
#[tokio::test]
async fn commit_is_never_masked_by_stale_cache() -> Result<()> {
let tmp = TempDir::new()?;
let cat = catalog_with_cache(&tmp, 64 * 1024 * 1024).await?;
let ident = seed_table(&cat).await?;
let v1 = cat.load_table(&ident).await?;
assert!(v1.metadata().properties().get("nornir.marker").is_none());
let loc_v1 = v1.metadata_location().map(str::to_string);
let tx = Transaction::new(&v1);
let action = tx
.update_table_properties()
.set("nornir.marker".to_string(), "v2".to_string());
action.apply(tx)?.commit(&cat).await?;
let v2 = cat.load_table(&ident).await?;
assert_eq!(
v2.metadata().properties().get("nornir.marker"),
Some(&"v2".to_string()),
"cache served a stale pre-commit metadata"
);
assert_ne!(
v2.metadata_location().map(str::to_string),
loc_v1,
"metadata location should advance after a commit"
);
Ok(())
}
#[tokio::test]
async fn disabled_cache_still_correct() -> Result<()> {
let tmp = TempDir::new()?;
let cat = catalog_with_cache(&tmp, 0).await?;
let ident = seed_table(&cat).await?;
let a = cat.load_table(&ident).await?;
let b = cat.load_table(&ident).await?;
assert_eq!(a.metadata_location(), b.metadata_location());
Ok(())
}
#[tokio::test]
async fn concurrent_cold_readers_all_succeed() -> Result<()> {
let tmp = TempDir::new()?;
let cat = catalog_with_cache(&tmp, 64 * 1024 * 1024).await?;
let ident = seed_table(&cat).await?;
let mut handles = Vec::new();
for _ in 0..64 {
let cat = cat.clone();
let ident = ident.clone();
handles.push(tokio::spawn(async move { cat.load_table(&ident).await }));
}
for h in handles {
h.await.expect("join")?;
}
Ok(())
}
#[tokio::test]
#[ignore = "microbench; run with --release --ignored --nocapture"]
async fn warm_vs_cold_microbench() -> Result<()> {
const WARM_ITERS: u32 = 20_000;
const COLD_ITERS: u32 = 500;
let tmp_on = TempDir::new()?;
let cat = catalog_with_cache(&tmp_on, 128 * 1024 * 1024).await?;
let ident = seed_table(&cat).await?;
let t = Instant::now();
let _ = cat.load_table(&ident).await?;
let cold_ns = t.elapsed().as_nanos();
let t = Instant::now();
for _ in 0..WARM_ITERS {
let _ = cat.load_table(&ident).await?;
}
let warm_ns = t.elapsed().as_nanos() / WARM_ITERS as u128;
let tmp_off = TempDir::new()?;
let cat_off = catalog_with_cache(&tmp_off, 0).await?;
let ident_off = seed_table(&cat_off).await?;
let _ = cat_off.load_table(&ident_off).await?; let t = Instant::now();
for _ in 0..COLD_ITERS {
let _ = cat_off.load_table(&ident_off).await?;
}
let off_ns = t.elapsed().as_nanos() / COLD_ITERS as u128;
eprintln!("L0 metadata cache microbench (local NVMe, single thread)");
eprintln!(" cold first load (miss) : {cold_ns:>10} ns");
eprintln!(" warm load (L0 hit) : {warm_ns:>10} ns/op");
eprintln!(" cache disabled (baseline): {off_ns:>10} ns/op");
eprintln!(
" warm speedup vs disabled : {:.1}x",
off_ns as f64 / warm_ns.max(1) as f64
);
Ok(())
}