use std::fmt::Debug;
use async_trait::async_trait;
use iceberg::table::Table;
use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent, TableRequirement, TableUpdate};
#[async_trait]
pub trait CatalogExt: Catalog {
async fn commit_table(
&self,
table_ident: &TableIdent,
requirements: Vec<TableRequirement>,
updates: Vec<TableUpdate>,
) -> Result<Table>;
async fn update_table_metadata_location(
&self,
table_ident: &TableIdent,
new_metadata_location: String,
) -> Result<Table>;
async fn commit_tables_atomic(
&self,
table_changes: Vec<(TableIdent, Vec<TableRequirement>, Vec<TableUpdate>)>,
) -> Result<Vec<Table>>;
async fn storage_health_check(&self) -> Result<StorageHealthStatus>;
}
#[derive(Debug, Clone)]
pub struct StorageHealthStatus {
pub backend_type: String,
pub healthy: bool,
pub latency_ms: u64,
pub message: Option<String>,
}
impl StorageHealthStatus {
pub fn healthy(backend_type: impl Into<String>, latency_ms: u64) -> Self {
Self {
backend_type: backend_type.into(),
healthy: true,
latency_ms,
message: None,
}
}
pub fn unhealthy(backend_type: impl Into<String>, message: impl Into<String>) -> Self {
Self {
backend_type: backend_type.into(),
healthy: false,
latency_ms: 0,
message: Some(message.into()),
}
}
}
#[derive(Debug)]
pub struct ExtendedCatalog<C: Catalog> {
inner: C,
}
impl<C: Catalog> ExtendedCatalog<C> {
pub fn new(inner: C) -> Self {
Self { inner }
}
}
#[async_trait]
impl<C: Catalog + Send + Sync> Catalog for ExtendedCatalog<C> {
async fn list_namespaces(
&self,
parent: Option<&iceberg::NamespaceIdent>,
) -> Result<Vec<iceberg::NamespaceIdent>> {
self.inner.list_namespaces(parent).await
}
async fn create_namespace(
&self,
namespace: &iceberg::NamespaceIdent,
properties: std::collections::HashMap<String, String>,
) -> Result<iceberg::Namespace> {
self.inner.create_namespace(namespace, properties).await
}
async fn get_namespace(
&self,
namespace: &iceberg::NamespaceIdent,
) -> Result<iceberg::Namespace> {
self.inner.get_namespace(namespace).await
}
async fn namespace_exists(&self, namespace: &iceberg::NamespaceIdent) -> Result<bool> {
self.inner.namespace_exists(namespace).await
}
async fn update_namespace(
&self,
namespace: &iceberg::NamespaceIdent,
properties: std::collections::HashMap<String, String>,
) -> Result<()> {
self.inner.update_namespace(namespace, properties).await
}
async fn drop_namespace(&self, namespace: &iceberg::NamespaceIdent) -> Result<()> {
self.inner.drop_namespace(namespace).await
}
async fn list_tables(&self, namespace: &iceberg::NamespaceIdent) -> Result<Vec<TableIdent>> {
self.inner.list_tables(namespace).await
}
async fn create_table(
&self,
namespace: &iceberg::NamespaceIdent,
creation: iceberg::TableCreation,
) -> Result<Table> {
self.inner.create_table(namespace, creation).await
}
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
self.inner.load_table(table).await
}
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
self.inner.drop_table(table).await
}
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
self.inner.table_exists(table).await
}
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
self.inner.rename_table(src, dest).await
}
async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table> {
self.inner.register_table(table, metadata_location).await
}
async fn update_table(&self, commit: iceberg::TableCommit) -> Result<Table> {
self.inner.update_table(commit).await
}
}
#[async_trait]
impl<C: Catalog + Send + Sync> CatalogExt for ExtendedCatalog<C> {
async fn commit_table(
&self,
table_ident: &TableIdent,
requirements: Vec<TableRequirement>,
updates: Vec<TableUpdate>,
) -> Result<Table> {
let table = self.inner.load_table(table_ident).await?;
for requirement in &requirements {
requirement.check(Some(table.metadata()))?;
}
let current_metadata_location = table
.metadata_location()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Table has no metadata location"))?;
let mut metadata_builder = table
.metadata()
.clone()
.into_builder(Some(current_metadata_location.to_string()));
for update in updates {
metadata_builder = update.apply(metadata_builder)?;
}
let new_metadata = metadata_builder.build()?;
let new_metadata_location = generate_new_metadata_location(current_metadata_location)?;
new_metadata
.metadata
.write_to(table.file_io(), &new_metadata_location)
.await?;
self.update_table_metadata_location(table_ident, new_metadata_location)
.await
}
async fn update_table_metadata_location(
&self,
table_ident: &TableIdent,
new_metadata_location: String,
) -> Result<Table> {
tracing::debug!(
table = %table_ident,
new_location = %new_metadata_location,
"Updating table metadata location using drop-and-register pattern"
);
self.inner.drop_table(table_ident).await?;
match self
.inner
.register_table(table_ident, new_metadata_location.clone())
.await
{
Ok(table) => Ok(table),
Err(e) => {
tracing::error!(
table = %table_ident,
metadata_location = %new_metadata_location,
error = %e,
"CRITICAL: Table dropped but re-registration failed. \
Table entry is lost. Metadata file still exists at the \
specified location and can be manually recovered."
);
Err(e)
}
}
}
async fn commit_tables_atomic(
&self,
table_changes: Vec<(TableIdent, Vec<TableRequirement>, Vec<TableUpdate>)>,
) -> Result<Vec<Table>> {
if table_changes.len() <= 1 {
let mut results = Vec::with_capacity(table_changes.len());
for (ident, reqs, updates) in table_changes {
let table = self.commit_table(&ident, reqs, updates).await?;
results.push(table);
}
return Ok(results);
}
Err(Error::new(
ErrorKind::FeatureUnsupported,
format!(
"Atomic multi-table commit ({} tables) not supported by this catalog backend. \
Enable slatedb-storage feature and use a persistent storage backend for true atomic commits.",
table_changes.len()
),
))
}
async fn storage_health_check(&self) -> Result<StorageHealthStatus> {
Ok(StorageHealthStatus::healthy("memory", 0))
}
}
fn generate_new_metadata_location(current_location: &str) -> Result<String> {
use std::path::Path;
let path = Path::new(current_location);
let parent = path.parent().map(|p| p.to_string_lossy().to_string());
let filename = path
.file_name()
.and_then(|f| f.to_str())
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Invalid metadata location"))?;
let version = filename
.split('-')
.next()
.and_then(|v| v.parse::<u32>().ok())
.unwrap_or(0);
let new_version = version + 1;
let new_uuid = uuid::Uuid::new_v4();
let new_filename = format!("{:05}-{}.metadata.json", new_version, new_uuid);
match parent {
Some(p) if !p.is_empty() => Ok(format!("{}/{}", p, new_filename)),
_ => Ok(new_filename),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generate_new_metadata_location() {
let current = "s3://bucket/warehouse/db/table/metadata/00001-abc.metadata.json";
let new_loc = generate_new_metadata_location(current).unwrap();
assert!(new_loc.starts_with("s3://bucket/warehouse/db/table/metadata/00002-"));
assert!(new_loc.ends_with(".metadata.json"));
}
#[test]
fn test_generate_new_metadata_location_local() {
let current = "/tmp/warehouse/metadata/00005-xyz.metadata.json";
let new_loc = generate_new_metadata_location(current).unwrap();
assert!(new_loc.starts_with("/tmp/warehouse/metadata/00006-"));
assert!(new_loc.ends_with(".metadata.json"));
}
}