use std::fmt::Debug;
use async_trait::async_trait;
use iceberg::table::Table;
use iceberg::{Catalog, Error, ErrorKind, Result, TableIdent, TableRequirement, TableUpdate};
#[async_trait]
pub trait CatalogExt: Catalog {
async fn commit_table(
&self,
table_ident: &TableIdent,
requirements: Vec<TableRequirement>,
updates: Vec<TableUpdate>,
) -> Result<Table>;
async fn update_table_metadata_location(
&self,
table_ident: &TableIdent,
new_metadata_location: String,
) -> Result<Table>;
}
#[derive(Debug)]
pub struct ExtendedCatalog<C: Catalog> {
inner: C,
}
impl<C: Catalog> ExtendedCatalog<C> {
pub fn new(inner: C) -> Self {
Self { inner }
}
}
#[async_trait]
impl<C: Catalog + Send + Sync> Catalog for ExtendedCatalog<C> {
async fn list_namespaces(
&self,
parent: Option<&iceberg::NamespaceIdent>,
) -> Result<Vec<iceberg::NamespaceIdent>> {
self.inner.list_namespaces(parent).await
}
async fn create_namespace(
&self,
namespace: &iceberg::NamespaceIdent,
properties: std::collections::HashMap<String, String>,
) -> Result<iceberg::Namespace> {
self.inner.create_namespace(namespace, properties).await
}
async fn get_namespace(
&self,
namespace: &iceberg::NamespaceIdent,
) -> Result<iceberg::Namespace> {
self.inner.get_namespace(namespace).await
}
async fn namespace_exists(&self, namespace: &iceberg::NamespaceIdent) -> Result<bool> {
self.inner.namespace_exists(namespace).await
}
async fn update_namespace(
&self,
namespace: &iceberg::NamespaceIdent,
properties: std::collections::HashMap<String, String>,
) -> Result<()> {
self.inner.update_namespace(namespace, properties).await
}
async fn drop_namespace(&self, namespace: &iceberg::NamespaceIdent) -> Result<()> {
self.inner.drop_namespace(namespace).await
}
async fn list_tables(&self, namespace: &iceberg::NamespaceIdent) -> Result<Vec<TableIdent>> {
self.inner.list_tables(namespace).await
}
async fn create_table(
&self,
namespace: &iceberg::NamespaceIdent,
creation: iceberg::TableCreation,
) -> Result<Table> {
self.inner.create_table(namespace, creation).await
}
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
self.inner.load_table(table).await
}
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
self.inner.drop_table(table).await
}
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
self.inner.table_exists(table).await
}
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
self.inner.rename_table(src, dest).await
}
async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table> {
self.inner.register_table(table, metadata_location).await
}
async fn update_table(&self, commit: iceberg::TableCommit) -> Result<Table> {
self.inner.update_table(commit).await
}
}
#[async_trait]
impl<C: Catalog + Send + Sync> CatalogExt for ExtendedCatalog<C> {
async fn commit_table(
&self,
table_ident: &TableIdent,
requirements: Vec<TableRequirement>,
updates: Vec<TableUpdate>,
) -> Result<Table> {
let table = self.inner.load_table(table_ident).await?;
for requirement in &requirements {
requirement.check(Some(table.metadata()))?;
}
let current_metadata_location = table
.metadata_location()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Table has no metadata location"))?;
let mut metadata_builder = table
.metadata()
.clone()
.into_builder(Some(current_metadata_location.to_string()));
for update in updates {
metadata_builder = update.apply(metadata_builder)?;
}
let new_metadata = metadata_builder.build()?;
let new_metadata_location = generate_new_metadata_location(current_metadata_location)?;
new_metadata
.metadata
.write_to(table.file_io(), &new_metadata_location)
.await?;
self.update_table_metadata_location(table_ident, new_metadata_location)
.await
}
async fn update_table_metadata_location(
&self,
table_ident: &TableIdent,
new_metadata_location: String,
) -> Result<Table> {
self.inner.drop_table(table_ident).await?;
self.inner
.register_table(table_ident, new_metadata_location)
.await
}
}
fn generate_new_metadata_location(current_location: &str) -> Result<String> {
use std::path::Path;
let path = Path::new(current_location);
let parent = path.parent().map(|p| p.to_string_lossy().to_string());
let filename = path
.file_name()
.and_then(|f| f.to_str())
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Invalid metadata location"))?;
let version = filename
.split('-')
.next()
.and_then(|v| v.parse::<u32>().ok())
.unwrap_or(0);
let new_version = version + 1;
let new_uuid = uuid::Uuid::new_v4();
let new_filename = format!("{:05}-{}.metadata.json", new_version, new_uuid);
match parent {
Some(p) if !p.is_empty() => Ok(format!("{}/{}", p, new_filename)),
_ => Ok(new_filename),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generate_new_metadata_location() {
let current = "s3://bucket/warehouse/db/table/metadata/00001-abc.metadata.json";
let new_loc = generate_new_metadata_location(current).unwrap();
assert!(new_loc.starts_with("s3://bucket/warehouse/db/table/metadata/00002-"));
assert!(new_loc.ends_with(".metadata.json"));
}
#[test]
fn test_generate_new_metadata_location_local() {
let current = "/tmp/warehouse/metadata/00005-xyz.metadata.json";
let new_loc = generate_new_metadata_location(current).unwrap();
assert!(new_loc.starts_with("/tmp/warehouse/metadata/00006-"));
assert!(new_loc.ends_with(".metadata.json"));
}
}