use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use iceberg::io::LocalFsStorageFactory;
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
use skade_katalog::{RedbCatalog, RedbCatalogBuilder};
use tempfile::TempDir;
fn schema() -> Schema {
Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
])
.build()
.unwrap()
}
async fn open(db_path: &std::path::Path, warehouse: &std::path::Path) -> Result<RedbCatalog> {
Ok(RedbCatalogBuilder::default()
.db_path(db_path)
.warehouse_location(format!("file://{}", warehouse.display()))
.with_storage_factory(Arc::new(LocalFsStorageFactory))
.load("nornir", HashMap::new())
.await?)
}
#[tokio::test]
async fn commit_seq_advances_per_table_and_persists() -> Result<()> {
let tmp = TempDir::new()?;
let db_path = tmp.path().join("catalog.redb");
let warehouse = tmp.path().join("warehouse");
std::fs::create_dir_all(&warehouse)?;
{
let cat = open(&db_path, &warehouse).await?;
assert_eq!(cat.commit_seq().await?, 0, "fresh catalog has no commits");
let ns = NamespaceIdent::new("d".to_string());
cat.create_namespace(&ns, HashMap::new()).await?;
assert_eq!(cat.commit_seq().await?, 0);
for name in ["a", "b", "c"] {
let creation = TableCreation::builder()
.name(name.to_string())
.schema(schema())
.build();
cat.create_table(&ns, creation).await?;
}
assert_eq!(cat.commit_seq().await?, 3, "one commit per created table");
cat.rename_table(
&TableIdent::new(ns.clone(), "c".to_string()),
&TableIdent::new(ns.clone(), "c2".to_string()),
)
.await?;
assert_eq!(
cat.commit_seq().await?,
3,
"rename does not advance commit_seq"
);
}
let cat = open(&db_path, &warehouse).await?;
assert_eq!(
cat.commit_seq().await?,
3,
"commit_seq persists across reopen"
);
assert_eq!(
cat.pointer_generation(),
0,
"in-memory generation resets on reopen"
);
Ok(())
}