#![cfg(feature = "slatedb-storage")]
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
use rustberg::catalog::SlateCatalog;
use slatedb::Db;
use std::collections::HashMap;
use std::sync::Arc;
use tempfile::TempDir;
async fn create_test_catalog() -> (SlateCatalog, TempDir) {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let warehouse_path = temp_dir.path().join("warehouse");
let catalog_path = temp_dir.path().join("catalog");
std::fs::create_dir_all(&warehouse_path).expect("Failed to create warehouse dir");
std::fs::create_dir_all(&catalog_path).expect("Failed to create catalog dir");
let object_store = Arc::new(
object_store::local::LocalFileSystem::new_with_prefix(&catalog_path)
.expect("Failed to create LocalFileSystem"),
);
let db = Db::builder("db", object_store)
.build()
.await
.expect("Failed to create SlateDB");
let warehouse_location = format!("file://{}", warehouse_path.to_string_lossy());
let catalog = SlateCatalog::new(Arc::new(db), warehouse_location)
.await
.expect("Failed to create SlateCatalog");
(catalog, temp_dir)
}
fn test_schema() -> Schema {
Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::optional(3, "created_at", Type::Primitive(PrimitiveType::Timestamp))
.into(),
])
.build()
.expect("Failed to build schema")
}
#[tokio::test]
async fn test_namespace_create_and_list() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("test_db".to_string());
let mut props = HashMap::new();
props.insert("owner".to_string(), "admin".to_string());
catalog
.create_namespace(&ns, props.clone())
.await
.expect("Failed to create namespace");
let namespaces = catalog
.list_namespaces(None)
.await
.expect("Failed to list namespaces");
assert_eq!(namespaces.len(), 1);
assert_eq!(namespaces[0], ns);
let namespace = catalog
.get_namespace(&ns)
.await
.expect("Failed to get namespace");
assert_eq!(
namespace.properties().get("owner"),
Some(&"admin".to_string())
);
}
#[tokio::test]
async fn test_namespace_exists() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("check_db".to_string());
assert!(!catalog
.namespace_exists(&ns)
.await
.expect("Failed to check namespace"));
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
assert!(catalog
.namespace_exists(&ns)
.await
.expect("Failed to check namespace"));
}
#[tokio::test]
async fn test_namespace_update_properties() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("update_db".to_string());
let mut initial_props = HashMap::new();
initial_props.insert("version".to_string(), "1".to_string());
catalog
.create_namespace(&ns, initial_props)
.await
.expect("Failed to create namespace");
let mut new_props = HashMap::new();
new_props.insert("version".to_string(), "2".to_string());
new_props.insert("description".to_string(), "Updated namespace".to_string());
catalog
.update_namespace(&ns, new_props)
.await
.expect("Failed to update namespace");
let namespace = catalog
.get_namespace(&ns)
.await
.expect("Failed to get namespace");
assert_eq!(
namespace.properties().get("version"),
Some(&"2".to_string())
);
assert_eq!(
namespace.properties().get("description"),
Some(&"Updated namespace".to_string())
);
}
#[tokio::test]
async fn test_namespace_drop() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("drop_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
catalog
.drop_namespace(&ns)
.await
.expect("Failed to drop namespace");
assert!(!catalog
.namespace_exists(&ns)
.await
.expect("Failed to check namespace"));
}
#[tokio::test]
async fn test_namespace_drop_not_empty_fails() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("nonempty_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("test_table".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
let result = catalog.drop_namespace(&ns).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_nested_namespace() {
let (catalog, _temp) = create_test_catalog().await;
let parent_ns = NamespaceIdent::new("parent".to_string());
catalog
.create_namespace(&parent_ns, HashMap::new())
.await
.expect("Failed to create parent namespace");
let child_ns =
NamespaceIdent::from_vec(vec!["parent".to_string(), "child".to_string()]).unwrap();
catalog
.create_namespace(&child_ns, HashMap::new())
.await
.expect("Failed to create child namespace");
let top_level = catalog
.list_namespaces(None)
.await
.expect("Failed to list namespaces");
assert!(top_level.contains(&parent_ns));
let children = catalog
.list_namespaces(Some(&parent_ns))
.await
.expect("Failed to list child namespaces");
assert!(children.contains(&child_ns));
}
#[tokio::test]
async fn test_table_create_and_load() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("tables_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("users".to_string())
.schema(test_schema())
.build();
let table = catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
assert_eq!(table.identifier().name(), "users");
assert_eq!(table.identifier().namespace(), &ns);
assert_eq!(
table.metadata().current_schema().as_struct().fields().len(),
3
);
let table_ident = TableIdent::new(ns.clone(), "users".to_string());
let loaded = catalog
.load_table(&table_ident)
.await
.expect("Failed to load table");
assert_eq!(loaded.metadata().uuid(), table.metadata().uuid());
}
#[tokio::test]
async fn test_table_create_with_location() {
let (catalog, temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("loc_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let custom_location = format!(
"file://{}/custom_table",
temp.path().join("warehouse").to_string_lossy()
);
let creation = TableCreation::builder()
.name("custom_table".to_string())
.schema(test_schema())
.location(custom_location.clone())
.build();
let table = catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
assert_eq!(table.metadata().location(), &custom_location);
}
#[tokio::test]
async fn test_table_create_with_properties() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("props_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let mut props = HashMap::new();
props.insert("write.format.default".to_string(), "parquet".to_string());
props.insert("commit.retry.num-retries".to_string(), "5".to_string());
let creation = TableCreation::builder()
.name("props_table".to_string())
.schema(test_schema())
.properties(props)
.build();
let table = catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
assert_eq!(
table.metadata().properties().get("write.format.default"),
Some(&"parquet".to_string())
);
}
#[tokio::test]
async fn test_table_create_in_nonexistent_namespace_fails() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("nonexistent_db".to_string());
let creation = TableCreation::builder()
.name("orphan_table".to_string())
.schema(test_schema())
.build();
let result = catalog.create_table(&ns, creation).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_table_create_duplicate_fails() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("dup_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation1 = TableCreation::builder()
.name("dup_table".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation1)
.await
.expect("Failed to create first table");
let creation2 = TableCreation::builder()
.name("dup_table".to_string())
.schema(test_schema())
.build();
let result = catalog.create_table(&ns, creation2).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_table_list() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("list_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
for name in ["table1", "table2", "table3"] {
let creation = TableCreation::builder()
.name(name.to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
}
let tables = catalog
.list_tables(&ns)
.await
.expect("Failed to list tables");
assert_eq!(tables.len(), 3);
let names: Vec<&str> = tables.iter().map(|t| t.name.as_str()).collect();
assert!(names.contains(&"table1"));
assert!(names.contains(&"table2"));
assert!(names.contains(&"table3"));
}
#[tokio::test]
async fn test_table_exists() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("exists_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let table_ident = TableIdent::new(ns.clone(), "check_table".to_string());
assert!(!catalog
.table_exists(&table_ident)
.await
.expect("Failed to check table"));
let creation = TableCreation::builder()
.name("check_table".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
assert!(catalog
.table_exists(&table_ident)
.await
.expect("Failed to check table"));
}
#[tokio::test]
async fn test_table_drop() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("drop_table_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("drop_me".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
let table_ident = TableIdent::new(ns.clone(), "drop_me".to_string());
catalog
.drop_table(&table_ident)
.await
.expect("Failed to drop table");
assert!(!catalog
.table_exists(&table_ident)
.await
.expect("Failed to check table"));
}
#[tokio::test]
async fn test_table_rename_same_namespace() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("rename_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("old_name".to_string())
.schema(test_schema())
.build();
let table = catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
let original_uuid = table.metadata().uuid();
let src = TableIdent::new(ns.clone(), "old_name".to_string());
let dest = TableIdent::new(ns.clone(), "new_name".to_string());
catalog
.rename_table(&src, &dest)
.await
.expect("Failed to rename table");
assert!(!catalog
.table_exists(&src)
.await
.expect("Failed to check table"));
assert!(catalog
.table_exists(&dest)
.await
.expect("Failed to check table"));
let loaded = catalog
.load_table(&dest)
.await
.expect("Failed to load table");
assert_eq!(loaded.metadata().uuid(), original_uuid);
}
#[tokio::test]
async fn test_table_rename_cross_namespace() {
let (catalog, _temp) = create_test_catalog().await;
let ns1 = NamespaceIdent::new("source_db".to_string());
let ns2 = NamespaceIdent::new("target_db".to_string());
catalog
.create_namespace(&ns1, HashMap::new())
.await
.expect("Failed to create namespace");
catalog
.create_namespace(&ns2, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("moving_table".to_string())
.schema(test_schema())
.build();
let table = catalog
.create_table(&ns1, creation)
.await
.expect("Failed to create table");
let original_uuid = table.metadata().uuid();
let src = TableIdent::new(ns1.clone(), "moving_table".to_string());
let dest = TableIdent::new(ns2.clone(), "moved_table".to_string());
catalog
.rename_table(&src, &dest)
.await
.expect("Failed to rename table");
assert!(!catalog
.table_exists(&src)
.await
.expect("Failed to check table"));
assert!(catalog
.table_exists(&dest)
.await
.expect("Failed to check table"));
let loaded = catalog
.load_table(&dest)
.await
.expect("Failed to load table");
assert_eq!(loaded.metadata().uuid(), original_uuid);
}
#[tokio::test]
async fn test_metadata_persists_to_filesystem() {
let temp_dir = TempDir::new().expect("Failed to create temp directory");
let warehouse_path = temp_dir.path().join("warehouse");
let catalog_path = temp_dir.path().join("catalog");
std::fs::create_dir_all(&warehouse_path).expect("Failed to create warehouse dir");
std::fs::create_dir_all(&catalog_path).expect("Failed to create catalog dir");
let warehouse_location = format!("file://{}", warehouse_path.to_string_lossy());
let ns = NamespaceIdent::new("persist_db".to_string());
let table_ident = TableIdent::new(ns.clone(), "persist_table".to_string());
{
let object_store = Arc::new(
object_store::local::LocalFileSystem::new_with_prefix(&catalog_path)
.expect("Failed to create LocalFileSystem"),
);
let db = Db::builder("db", object_store)
.build()
.await
.expect("Failed to create SlateDB");
let catalog = SlateCatalog::new(Arc::new(db), warehouse_location.clone())
.await
.expect("Failed to create SlateCatalog");
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("persist_table".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
}
{
let object_store = Arc::new(
object_store::local::LocalFileSystem::new_with_prefix(&catalog_path)
.expect("Failed to create LocalFileSystem"),
);
let db = Db::builder("db", object_store)
.build()
.await
.expect("Failed to create SlateDB");
let catalog = SlateCatalog::new(Arc::new(db), warehouse_location)
.await
.expect("Failed to create SlateCatalog");
assert!(catalog
.namespace_exists(&ns)
.await
.expect("Failed to check namespace"));
let table = catalog
.load_table(&table_ident)
.await
.expect("Failed to load table");
assert_eq!(
table.metadata().current_schema().as_struct().fields().len(),
3
);
}
}
#[tokio::test]
async fn test_metadata_file_written_to_warehouse() {
let (catalog, temp) = create_test_catalog().await;
let _warehouse_path = temp.path().join("warehouse");
let ns = NamespaceIdent::new("file_check_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("file_check_table".to_string())
.schema(test_schema())
.build();
let table = catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
let metadata_location = table
.metadata_location()
.expect("Table should have metadata location");
let file_path = metadata_location
.strip_prefix("file://")
.unwrap_or(metadata_location);
assert!(
std::path::Path::new(file_path).exists(),
"Metadata file should exist at: {}",
file_path
);
let content = std::fs::read_to_string(file_path).expect("Failed to read metadata file");
let _: serde_json::Value =
serde_json::from_str(&content).expect("Metadata should be valid JSON");
}
#[tokio::test]
async fn test_load_nonexistent_table_fails() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("error_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let table_ident = TableIdent::new(ns, "nonexistent".to_string());
let result = catalog.load_table(&table_ident).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_get_nonexistent_namespace_fails() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("does_not_exist".to_string());
let result = catalog.get_namespace(&ns).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_drop_nonexistent_namespace_fails() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("ghost_db".to_string());
let result = catalog.drop_namespace(&ns).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_drop_nonexistent_table_fails() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("ghost_table_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let table_ident = TableIdent::new(ns, "ghost_table".to_string());
let result = catalog.drop_table(&table_ident).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_rename_nonexistent_source_fails() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("rename_fail_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let src = TableIdent::new(ns.clone(), "source".to_string());
let dest = TableIdent::new(ns, "dest".to_string());
let result = catalog.rename_table(&src, &dest).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_rename_to_existing_fails() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("rename_conflict_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
for name in ["source_table", "dest_table"] {
let creation = TableCreation::builder()
.name(name.to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
}
let src = TableIdent::new(ns.clone(), "source_table".to_string());
let dest = TableIdent::new(ns, "dest_table".to_string());
let result = catalog.rename_table(&src, &dest).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_table_version_initialized_on_create() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("version_test_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("version_table".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
let table_ident = TableIdent::new(ns, "version_table".to_string());
let table = catalog
.load_table(&table_ident)
.await
.expect("Failed to load table");
assert!(table.metadata_location().is_some());
}
#[tokio::test]
async fn test_atomic_rename_preserves_metadata() {
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("atomic_rename_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("atomic_source".to_string())
.schema(test_schema())
.build();
let original_table = catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
let original_location = original_table
.metadata_location()
.expect("Table has no metadata location")
.to_string();
let src = TableIdent::new(ns.clone(), "atomic_source".to_string());
let dest = TableIdent::new(ns.clone(), "atomic_dest".to_string());
catalog
.rename_table(&src, &dest)
.await
.expect("Failed to rename table");
let renamed_table = catalog
.load_table(&dest)
.await
.expect("Failed to load renamed table");
assert_eq!(
renamed_table.metadata_location().unwrap(),
original_location,
"Metadata location should be preserved after atomic rename"
);
assert!(
!catalog.table_exists(&src).await.expect("Failed to check"),
"Source table should not exist after rename"
);
}
#[tokio::test]
async fn test_concurrent_commit_conflict_detection() {
use iceberg::TableUpdate;
use rustberg::catalog::CatalogExt;
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("concurrent_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("concurrent_table".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
let table_ident = TableIdent::new(ns.clone(), "concurrent_table".to_string());
let _table_a = catalog
.load_table(&table_ident)
.await
.expect("Failed to load table for client A");
let _table_b = catalog
.load_table(&table_ident)
.await
.expect("Failed to load table for client B");
let update_a = TableUpdate::SetProperties {
updates: [("client".to_string(), "A".to_string())]
.into_iter()
.collect(),
};
let result_a = catalog
.commit_table(&table_ident, vec![], vec![update_a])
.await;
assert!(result_a.is_ok(), "Client A commit should succeed");
let update_b = TableUpdate::SetProperties {
updates: [("client".to_string(), "B".to_string())]
.into_iter()
.collect(),
};
let result_b = catalog
.commit_table(&table_ident, vec![], vec![update_b])
.await;
assert!(result_b.is_ok(), "Sequential commit should succeed");
let final_table = catalog.load_table(&table_ident).await.expect("Load failed");
let props = final_table.metadata().properties();
assert_eq!(
props.get("client"),
Some(&"B".to_string()),
"Final value should be from client B"
);
}
#[tokio::test]
async fn test_version_increments_on_commit() {
use iceberg::TableUpdate;
use rustberg::catalog::CatalogExt;
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("version_incr_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("version_table".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
let table_ident = TableIdent::new(ns.clone(), "version_table".to_string());
for i in 1..=5 {
let update = TableUpdate::SetProperties {
updates: [("iteration".to_string(), i.to_string())]
.into_iter()
.collect(),
};
catalog
.commit_table(&table_ident, vec![], vec![update])
.await
.unwrap_or_else(|e| panic!("Commit {} should succeed: {}", i, e));
}
let final_table = catalog.load_table(&table_ident).await.expect("Load failed");
let props = final_table.metadata().properties();
assert_eq!(
props.get("iteration"),
Some(&"5".to_string()),
"Final iteration should be 5"
);
}
#[tokio::test]
async fn test_atomic_multi_table_commit() {
use iceberg::TableUpdate;
use rustberg::catalog::CatalogExt;
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("atomic_test_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation1 = TableCreation::builder()
.name("table_a".to_string())
.schema(test_schema())
.build();
let creation2 = TableCreation::builder()
.name("table_b".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation1)
.await
.expect("Failed to create table_a");
catalog
.create_table(&ns, creation2)
.await
.expect("Failed to create table_b");
let table_a = TableIdent::new(ns.clone(), "table_a".to_string());
let table_b = TableIdent::new(ns.clone(), "table_b".to_string());
let update_a = TableUpdate::SetProperties {
updates: [("atomic_property".to_string(), "value_a".to_string())]
.into_iter()
.collect(),
};
let update_b = TableUpdate::SetProperties {
updates: [("atomic_property".to_string(), "value_b".to_string())]
.into_iter()
.collect(),
};
let table_changes = vec![
(table_a.clone(), vec![], vec![update_a]),
(table_b.clone(), vec![], vec![update_b]),
];
let results = catalog
.commit_tables_atomic(table_changes)
.await
.expect("Atomic commit should succeed");
assert_eq!(results.len(), 2, "Should return 2 updated tables");
let final_a = catalog
.load_table(&table_a)
.await
.expect("Load table_a failed");
let final_b = catalog
.load_table(&table_b)
.await
.expect("Load table_b failed");
assert_eq!(
final_a.metadata().properties().get("atomic_property"),
Some(&"value_a".to_string()),
"table_a should have atomic_property set"
);
assert_eq!(
final_b.metadata().properties().get("atomic_property"),
Some(&"value_b".to_string()),
"table_b should have atomic_property set"
);
}
#[tokio::test]
async fn test_atomic_commit_validates_all_requirements_first() {
use iceberg::{TableRequirement, TableUpdate};
use rustberg::catalog::CatalogExt;
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("atomic_reqs_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation1 = TableCreation::builder()
.name("table_c".to_string())
.schema(test_schema())
.build();
let creation2 = TableCreation::builder()
.name("table_d".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation1)
.await
.expect("Failed to create table_c");
catalog
.create_table(&ns, creation2)
.await
.expect("Failed to create table_d");
let table_c = TableIdent::new(ns.clone(), "table_c".to_string());
let table_d = TableIdent::new(ns.clone(), "table_d".to_string());
let update_c = TableUpdate::SetProperties {
updates: [("should_not_be_set".to_string(), "value_c".to_string())]
.into_iter()
.collect(),
};
let update_d = TableUpdate::SetProperties {
updates: [("should_not_be_set".to_string(), "value_d".to_string())]
.into_iter()
.collect(),
};
let invalid_req = TableRequirement::UuidMatch {
uuid: uuid::Uuid::parse_str("11111111-1111-1111-1111-111111111111").unwrap(),
};
let table_changes = vec![
(table_c.clone(), vec![], vec![update_c]),
(table_d.clone(), vec![invalid_req], vec![update_d]),
];
let result = catalog.commit_tables_atomic(table_changes).await;
assert!(
result.is_err(),
"Commit should fail due to invalid requirement on table_d"
);
let final_c = catalog
.load_table(&table_c)
.await
.expect("Load table_c failed");
let final_d = catalog
.load_table(&table_d)
.await
.expect("Load table_d failed");
assert!(
final_c
.metadata()
.properties()
.get("should_not_be_set")
.is_none(),
"table_c should NOT have been updated since table_d's requirement failed"
);
assert!(
final_d
.metadata()
.properties()
.get("should_not_be_set")
.is_none(),
"table_d should NOT have been updated since its requirement failed"
);
}
#[tokio::test]
async fn test_atomic_commit_single_table_fast_path() {
use iceberg::TableUpdate;
use rustberg::catalog::CatalogExt;
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("fast_path_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation = TableCreation::builder()
.name("single_table".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation)
.await
.expect("Failed to create table");
let table_ident = TableIdent::new(ns.clone(), "single_table".to_string());
let update = TableUpdate::SetProperties {
updates: [("fast_path".to_string(), "success".to_string())]
.into_iter()
.collect(),
};
let results = catalog
.commit_tables_atomic(vec![(table_ident.clone(), vec![], vec![update])])
.await
.expect("Single-table atomic commit should succeed");
assert_eq!(results.len(), 1);
let final_table = catalog.load_table(&table_ident).await.expect("Load failed");
assert_eq!(
final_table.metadata().properties().get("fast_path"),
Some(&"success".to_string())
);
}
#[tokio::test]
async fn test_concurrent_atomic_multi_table_commits() {
use iceberg::TableUpdate;
use rustberg::catalog::CatalogExt;
use std::sync::Arc;
use tokio::sync::Barrier;
let (catalog, _temp) = create_test_catalog().await;
let catalog = Arc::new(catalog);
let ns = NamespaceIdent::new("concurrent_atomic_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
for i in 0..4 {
let creation = TableCreation::builder()
.name(format!("table_{}", i))
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation)
.await
.unwrap_or_else(|e| panic!("Failed to create table_{}: {}", i, e));
}
let barrier = Arc::new(Barrier::new(3));
let mut handles = Vec::new();
{
let catalog = Arc::clone(&catalog);
let barrier = Arc::clone(&barrier);
let ns = ns.clone();
handles.push(tokio::spawn(async move {
barrier.wait().await;
let table_0 = TableIdent::new(ns.clone(), "table_0".to_string());
let table_1 = TableIdent::new(ns.clone(), "table_1".to_string());
let update_0 = TableUpdate::SetProperties {
updates: [("txn".to_string(), "1".to_string())].into_iter().collect(),
};
let update_1 = TableUpdate::SetProperties {
updates: [("txn".to_string(), "1".to_string())].into_iter().collect(),
};
catalog
.commit_tables_atomic(vec![
(table_0, vec![], vec![update_0]),
(table_1, vec![], vec![update_1]),
])
.await
}));
}
{
let catalog = Arc::clone(&catalog);
let barrier = Arc::clone(&barrier);
let ns = ns.clone();
handles.push(tokio::spawn(async move {
barrier.wait().await;
let table_1 = TableIdent::new(ns.clone(), "table_1".to_string());
let table_2 = TableIdent::new(ns.clone(), "table_2".to_string());
let update_1 = TableUpdate::SetProperties {
updates: [("txn".to_string(), "2".to_string())].into_iter().collect(),
};
let update_2 = TableUpdate::SetProperties {
updates: [("txn".to_string(), "2".to_string())].into_iter().collect(),
};
catalog
.commit_tables_atomic(vec![
(table_1, vec![], vec![update_1]),
(table_2, vec![], vec![update_2]),
])
.await
}));
}
{
let catalog = Arc::clone(&catalog);
let barrier = Arc::clone(&barrier);
let ns = ns.clone();
handles.push(tokio::spawn(async move {
barrier.wait().await;
let table_2 = TableIdent::new(ns.clone(), "table_2".to_string());
let table_3 = TableIdent::new(ns.clone(), "table_3".to_string());
let update_2 = TableUpdate::SetProperties {
updates: [("txn".to_string(), "3".to_string())].into_iter().collect(),
};
let update_3 = TableUpdate::SetProperties {
updates: [("txn".to_string(), "3".to_string())].into_iter().collect(),
};
catalog
.commit_tables_atomic(vec![
(table_2, vec![], vec![update_2]),
(table_3, vec![], vec![update_3]),
])
.await
}));
}
let mut success_count = 0;
for handle in handles {
match handle.await {
Ok(Ok(_)) => success_count += 1,
Ok(Err(e)) => {
tracing::warn!("Transaction failed: {}", e);
}
Err(e) => panic!("Task panicked: {}", e),
}
}
assert!(
success_count >= 2,
"Expected at least 2 successful transactions, got {}",
success_count
);
for i in 0..4 {
let table_ident = TableIdent::new(ns.clone(), format!("table_{}", i));
let table = catalog
.load_table(&table_ident)
.await
.unwrap_or_else(|e| panic!("Load table_{} failed: {}", i, e));
let props = table.metadata().properties();
assert!(
props.get("txn").is_some(),
"table_{} should have txn property",
i
);
}
}
#[tokio::test]
async fn test_atomic_commit_with_new_table_during_transaction() {
use iceberg::TableUpdate;
use rustberg::catalog::CatalogExt;
let (catalog, _temp) = create_test_catalog().await;
let ns = NamespaceIdent::new("new_table_during_txn_db".to_string());
catalog
.create_namespace(&ns, HashMap::new())
.await
.expect("Failed to create namespace");
let creation1 = TableCreation::builder()
.name("existing_table".to_string())
.schema(test_schema())
.build();
catalog
.create_table(&ns, creation1)
.await
.expect("Failed to create existing_table");
let table_ident = TableIdent::new(ns.clone(), "existing_table".to_string());
let update = TableUpdate::SetProperties {
updates: [("concurrent_test".to_string(), "initial".to_string())]
.into_iter()
.collect(),
};
let result = catalog
.commit_tables_atomic(vec![(table_ident.clone(), vec![], vec![update])])
.await;
assert!(result.is_ok(), "Commit should succeed: {:?}", result.err());
let final_table = catalog.load_table(&table_ident).await.expect("Load failed");
assert_eq!(
final_table.metadata().properties().get("concurrent_test"),
Some(&"initial".to_string())
);
}