use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use iceberg::io::LocalFsStorageFactory;
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::transaction::{ApplyTransactionAction, Transaction};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
use skade_katalog::RedbCatalogBuilder;
use tempfile::TempDir;
fn schema_v1() -> Schema {
Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "run_id", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.expect("schema")
}
async fn open(tmp: &TempDir) -> Result<skade_katalog::RedbCatalog> {
let warehouse = tmp.path().join("warehouse");
std::fs::create_dir_all(&warehouse)?;
Ok(RedbCatalogBuilder::default()
.db_path(tmp.path().join("catalog.redb"))
.warehouse_location(format!("file://{}", warehouse.display()))
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load("nornir", HashMap::new())
.await?)
}
fn creation(name: &str) -> TableCreation {
TableCreation::builder()
.name(name.to_string())
.schema(schema_v1())
.build()
}
#[tokio::test]
async fn generation_advances_and_reads_reflect_mutations() -> Result<()> {
let tmp = TempDir::new()?;
let cat = open(&tmp).await?;
let ns = NamespaceIdent::new("g".to_string());
let g0 = cat.pointer_generation();
cat.create_namespace(&ns, HashMap::new()).await?;
cat.create_table(&ns, creation("t")).await?;
let g_after_create = cat.pointer_generation();
assert!(g_after_create > g0, "create_table should bump generation");
let ident = TableIdent::new(ns.clone(), "t".to_string());
let loaded = cat.load_table(&ident).await?;
let loc_v1 = loaded.metadata_location().map(str::to_string);
let tx = Transaction::new(&loaded);
let action = tx
.update_table_properties()
.set("k".to_string(), "v".to_string());
action.apply(tx)?.commit(&cat).await?;
let g_after_commit = cat.pointer_generation();
assert!(
g_after_commit > g_after_create,
"commit should bump generation"
);
let reloaded = cat.load_table(&ident).await?;
assert_ne!(
reloaded.metadata_location().map(str::to_string),
loc_v1,
"mirror must serve the post-commit location"
);
let dst = TableIdent::new(ns.clone(), "t2".to_string());
cat.rename_table(&ident, &dst).await?;
assert!(!cat.table_exists(&ident).await?, "old key gone from mirror");
assert!(cat.table_exists(&dst).await?, "new key present in mirror");
assert!(cat.load_table(&ident).await.is_err());
cat.load_table(&dst).await?;
cat.drop_table(&dst).await?;
assert!(!cat.table_exists(&dst).await?);
assert!(cat.load_table(&dst).await.is_err());
Ok(())
}
#[tokio::test]
async fn mirror_rebuilt_on_reopen() -> Result<()> {
let tmp = TempDir::new()?;
let ns = NamespaceIdent::new("re".to_string());
let ident = TableIdent::new(ns.clone(), "t".to_string());
let expected_loc;
{
let cat = open(&tmp).await?;
cat.create_namespace(&ns, HashMap::new()).await?;
let t = cat.create_table(&ns, creation("t")).await?;
expected_loc = t.metadata_location().map(str::to_string);
assert!(cat.pointer_generation() >= 1);
}
let cat2 = open(&tmp).await?;
assert_eq!(cat2.pointer_generation(), 0);
let loaded = cat2.load_table(&ident).await?;
assert_eq!(loaded.metadata_location().map(str::to_string), expected_loc);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
#[ignore = "microbench; run with --release --ignored --nocapture"]
async fn l1_read_breakdown_and_scaling() -> Result<()> {
use std::time::Instant;
const ITERS: u32 = 50_000;
let tmp = TempDir::new()?;
let cat = open(&tmp).await?;
let ns = NamespaceIdent::new("b".to_string());
cat.create_namespace(&ns, HashMap::new()).await?;
cat.create_table(&ns, creation("t")).await?;
let ident = TableIdent::new(ns, "t".to_string());
let loaded = cat.load_table(&ident).await?;
let md = loaded.metadata_ref();
let loc = loaded.metadata_location().unwrap().to_string();
let fileio = cat.file_io().clone();
let t = Instant::now();
for _ in 0..ITERS {
let _ = cat.load_table(&ident).await?;
}
let full_ns = t.elapsed().as_nanos() / ITERS as u128;
let t = Instant::now();
for _ in 0..ITERS {
let _ = iceberg::table::Table::builder()
.file_io(fileio.clone())
.identifier(ident.clone())
.metadata_location(loc.clone())
.metadata(md.clone())
.build()?;
}
let build_ns = t.elapsed().as_nanos() / ITERS as u128;
let t = Instant::now();
for _ in 0..ITERS {
let _ = cat.resolve_metadata(&ident).await?;
}
let resolve_ns = t.elapsed().as_nanos() / ITERS as u128;
eprintln!("L1 read breakdown (single thread, warm)");
eprintln!(" full load_table : {full_ns:>8} ns/op");
eprintln!(
" iceberg Table::build() : {build_ns:>8} ns/op ({}% of full)",
build_ns * 100 / full_ns.max(1)
);
eprintln!(" resolve_metadata (L1+L0): {resolve_ns:>7} ns/op ← catalog fast path");
for threads in [1usize, 4, 16] {
let t = Instant::now();
let mut handles = Vec::new();
for _ in 0..threads {
let cat = cat.clone();
let ident = ident.clone();
handles.push(tokio::spawn(async move {
for _ in 0..ITERS {
let _ = cat.load_table(&ident).await.unwrap();
}
}));
}
for h in handles {
h.await.unwrap();
}
let total = (threads as u128) * (ITERS as u128);
let ops_sec = total * 1_000_000_000 / t.elapsed().as_nanos().max(1);
eprintln!(" concurrency {threads:>2}: {ops_sec:>10} load_table/sec aggregate");
}
Ok(())
}