use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use iceberg::io::FileIO;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation, TableIdent, TableRequirement, TableUpdate,
};
use serde::{Deserialize, Serialize};
use slatedb::Db;
use super::CatalogExt;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct NamespaceMetadata {
namespace: Vec<String>,
properties: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TableRegistryEntry {
namespace: Vec<String>,
name: String,
metadata_location: String,
}
pub struct SlateCatalog {
db: Arc<Db>,
file_io: FileIO,
warehouse_location: String,
}
impl std::fmt::Debug for SlateCatalog {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SlateCatalog")
.field("warehouse_location", &self.warehouse_location)
.finish_non_exhaustive()
}
}
impl SlateCatalog {
pub async fn new(db: Arc<Db>, warehouse_location: String) -> Result<Self> {
let warehouse_location = Self::normalize_and_ensure_local_directory(&warehouse_location)?;
let file_io = FileIO::from_path(&warehouse_location)?.build()?;
Ok(Self {
db,
file_io,
warehouse_location,
})
}
pub async fn with_props(
db: Arc<Db>,
warehouse_location: String,
props: HashMap<String, String>,
) -> Result<Self> {
let warehouse_location = Self::normalize_and_ensure_local_directory(&warehouse_location)?;
let file_io = FileIO::from_path(&warehouse_location)?
.with_props(props)
.build()?;
Ok(Self {
db,
file_io,
warehouse_location,
})
}
fn normalize_and_ensure_local_directory(warehouse_location: &str) -> Result<String> {
if warehouse_location.starts_with("s3://")
|| warehouse_location.starts_with("gs://")
|| warehouse_location.starts_with("az://")
|| warehouse_location.starts_with("memory://")
{
return Ok(warehouse_location.to_string());
}
let path = if warehouse_location.starts_with("file://") {
warehouse_location
.strip_prefix("file://")
.unwrap_or(warehouse_location)
} else {
warehouse_location
};
let absolute_path = if std::path::Path::new(path).is_absolute() {
std::path::PathBuf::from(path)
} else {
std::env::current_dir()
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to get current directory: {}", e),
)
})?
.join(path)
};
std::fs::create_dir_all(&absolute_path).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!(
"Failed to create warehouse directory '{}': {}",
absolute_path.display(),
e
),
)
})?;
let normalized = format!("file://{}", absolute_path.display());
tracing::info!(
original = %warehouse_location,
normalized = %normalized,
"Normalized warehouse location"
);
Ok(normalized)
}
fn namespace_key(namespace: &NamespaceIdent) -> Vec<u8> {
format!("namespace:{}", namespace.join(".")).into_bytes()
}
fn table_key(table: &TableIdent) -> Vec<u8> {
format!("table:{}:{}", table.namespace.join("."), table.name).into_bytes()
}
fn table_prefix(namespace: &NamespaceIdent) -> String {
format!("table:{}:", namespace.join("."))
}
fn table_location(&self, table: &TableIdent) -> String {
format!(
"{}/{}/{}",
self.warehouse_location,
table.namespace.join("/"),
table.name
)
}
fn convert_error(err: slatedb::Error) -> Error {
Error::new(ErrorKind::Unexpected, format!("SlateDB error: {}", err))
}
fn make_namespace_ident(parts: Vec<String>) -> NamespaceIdent {
NamespaceIdent::from_vec(parts).expect("namespace parts should be valid")
}
}
#[async_trait]
impl Catalog for SlateCatalog {
async fn list_namespaces(
&self,
parent: Option<&NamespaceIdent>,
) -> Result<Vec<NamespaceIdent>> {
let prefix = if let Some(p) = parent {
format!("namespace:{}.", p.join("."))
} else {
"namespace:".to_string()
};
let mut namespaces = Vec::new();
let mut iter = self
.db
.scan_prefix(prefix.as_bytes())
.await
.map_err(Self::convert_error)?;
while let Some(kv) = iter.next().await.map_err(Self::convert_error)? {
if let Ok(metadata) = serde_json::from_slice::<NamespaceMetadata>(&kv.value) {
if let Some(parent_ns) = parent {
let parent_parts: Vec<&str> = parent_ns.iter().map(|s| s.as_str()).collect();
if metadata.namespace.len() == parent_parts.len() + 1
&& metadata.namespace[..parent_parts.len()]
.iter()
.zip(parent_parts.iter())
.all(|(a, b)| a == *b)
{
namespaces.push(Self::make_namespace_ident(metadata.namespace));
}
} else if metadata.namespace.len() == 1 {
namespaces.push(Self::make_namespace_ident(metadata.namespace));
}
}
}
Ok(namespaces)
}
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<Namespace> {
let key = Self::namespace_key(namespace);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_some()
{
return Err(Error::new(
ErrorKind::NamespaceAlreadyExists,
format!("Namespace already exists: {}", namespace.join(".")),
));
}
let metadata = NamespaceMetadata {
namespace: namespace.iter().map(|s| s.to_string()).collect(),
properties: properties.clone(),
};
let value = serde_json::to_vec(&metadata).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize namespace metadata: {}", e),
)
})?;
self.db
.put(&key, &value)
.await
.map_err(Self::convert_error)?;
Ok(Namespace::with_properties(namespace.clone(), properties))
}
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
let key = Self::namespace_key(namespace);
let value = self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| {
Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", namespace.join(".")),
)
})?;
let metadata: NamespaceMetadata = serde_json::from_slice(&value).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize namespace metadata: {}", e),
)
})?;
Ok(Namespace::with_properties(
namespace.clone(),
metadata.properties,
))
}
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
let key = Self::namespace_key(namespace);
Ok(self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_some())
}
async fn update_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
let key = Self::namespace_key(namespace);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_none()
{
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", namespace.join(".")),
));
}
let metadata = NamespaceMetadata {
namespace: namespace.iter().map(|s| s.to_string()).collect(),
properties,
};
let new_value = serde_json::to_vec(&metadata).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize namespace metadata: {}", e),
)
})?;
self.db
.put(&key, &new_value)
.await
.map_err(Self::convert_error)?;
Ok(())
}
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
let key = Self::namespace_key(namespace);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_none()
{
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", namespace.join(".")),
));
}
let prefix = Self::table_prefix(namespace);
let mut iter = self
.db
.scan_prefix(prefix.as_bytes())
.await
.map_err(Self::convert_error)?;
if iter.next().await.map_err(Self::convert_error)?.is_some() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Namespace not empty: {}", namespace.join(".")),
));
}
self.db.delete(&key).await.map_err(Self::convert_error)?;
Ok(())
}
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
if !self.namespace_exists(namespace).await? {
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", namespace.join(".")),
));
}
let prefix = Self::table_prefix(namespace);
let mut tables = Vec::new();
let mut iter = self
.db
.scan_prefix(prefix.as_bytes())
.await
.map_err(Self::convert_error)?;
while let Some(kv) = iter.next().await.map_err(Self::convert_error)? {
if let Ok(entry) = serde_json::from_slice::<TableRegistryEntry>(&kv.value) {
tables.push(TableIdent::new(
Self::make_namespace_ident(entry.namespace),
entry.name,
));
}
}
Ok(tables)
}
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
if !self.namespace_exists(namespace).await? {
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", namespace.join(".")),
));
}
let table_name = creation.name.clone();
let table_ident = TableIdent::new(namespace.clone(), table_name);
let key = Self::table_key(&table_ident);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_some()
{
return Err(Error::new(
ErrorKind::TableAlreadyExists,
format!("Table already exists: {}", table_ident.name()),
));
}
let (creation, location) = match creation.location.clone() {
Some(loc) => (creation, loc),
None => {
let location = self.table_location(&table_ident);
let new_creation = TableCreation {
location: Some(location.clone()),
..creation
};
(new_creation, location)
}
};
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = MetadataLocation::new_with_table_location(&location).to_string();
metadata.write_to(&self.file_io, &metadata_location).await?;
let registry_entry = TableRegistryEntry {
namespace: namespace.iter().map(|s| s.to_string()).collect(),
name: table_ident.name().to_string(),
metadata_location: metadata_location.clone(),
};
let value = serde_json::to_vec(®istry_entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
self.db
.put(&key, &value)
.await
.map_err(Self::convert_error)?;
Table::builder()
.file_io(self.file_io.clone())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(table_ident)
.build()
}
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
let key = Self::table_key(table);
let value = self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| {
Error::new(
ErrorKind::TableNotFound,
format!(
"Table not found: {}.{}",
table.namespace.join("."),
table.name
),
)
})?;
let entry: TableRegistryEntry = serde_json::from_slice(&value).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize table registry entry: {}", e),
)
})?;
let metadata = TableMetadata::read_from(&self.file_io, &entry.metadata_location).await?;
Table::builder()
.identifier(table.clone())
.metadata(metadata)
.metadata_location(entry.metadata_location)
.file_io(self.file_io.clone())
.build()
}
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
let key = Self::table_key(table);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_none()
{
return Err(Error::new(
ErrorKind::TableNotFound,
format!(
"Table not found: {}.{}",
table.namespace.join("."),
table.name
),
));
}
self.db.delete(&key).await.map_err(Self::convert_error)?;
Ok(())
}
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
let key = Self::table_key(table);
Ok(self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_some())
}
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
let src_key = Self::table_key(src);
let dest_key = Self::table_key(dest);
let value = self
.db
.get(&src_key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| {
Error::new(
ErrorKind::TableNotFound,
format!(
"Source table not found: {}.{}",
src.namespace.join("."),
src.name
),
)
})?;
if self
.db
.get(&dest_key)
.await
.map_err(Self::convert_error)?
.is_some()
{
return Err(Error::new(
ErrorKind::TableAlreadyExists,
format!(
"Destination table already exists: {}.{}",
dest.namespace.join("."),
dest.name
),
));
}
if !self.namespace_exists(&dest.namespace).await? {
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!(
"Destination namespace not found: {}",
dest.namespace.join(".")
),
));
}
let mut entry: TableRegistryEntry = serde_json::from_slice(&value).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize table registry entry: {}", e),
)
})?;
entry.namespace = dest.namespace.iter().map(|s| s.to_string()).collect();
entry.name = dest.name.clone();
let new_value = serde_json::to_vec(&entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
self.db
.put(&dest_key, &new_value)
.await
.map_err(Self::convert_error)?;
self.db
.delete(&src_key)
.await
.map_err(Self::convert_error)?;
Ok(())
}
async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table> {
let key = Self::table_key(table);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_some()
{
return Err(Error::new(
ErrorKind::TableAlreadyExists,
format!(
"Table already exists: {}.{}",
table.namespace.join("."),
table.name
),
));
}
if !self.namespace_exists(&table.namespace).await? {
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", table.namespace.join(".")),
));
}
let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?;
let entry = TableRegistryEntry {
namespace: table.namespace.iter().map(|s| s.to_string()).collect(),
name: table.name.clone(),
metadata_location: metadata_location.clone(),
};
let value = serde_json::to_vec(&entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
self.db
.put(&key, &value)
.await
.map_err(Self::convert_error)?;
Table::builder()
.identifier(table.clone())
.metadata(metadata)
.metadata_location(metadata_location)
.file_io(self.file_io.clone())
.build()
}
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
let table_ident = commit.identifier().clone();
let key = Self::table_key(&table_ident);
let current_table = self.load_table(&table_ident).await?;
let current_metadata = current_table.metadata().clone();
let current_location = current_table
.metadata_location()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "Table has no metadata location"))?;
for req in commit.take_requirements() {
req.check(Some(¤t_metadata))?;
}
let mut builder = TableMetadataBuilder::new_from_metadata(
current_metadata.clone(),
Some(current_location.to_string()),
);
for update in commit.take_updates() {
builder = update.apply(builder)?;
}
let new_metadata = builder.build()?.metadata;
let table_location = current_table.metadata().location();
let new_metadata_location = MetadataLocation::new_with_table_location(table_location)
.with_next_version()
.to_string();
new_metadata
.write_to(&self.file_io, &new_metadata_location)
.await?;
let entry = TableRegistryEntry {
namespace: table_ident
.namespace
.iter()
.map(|s| s.to_string())
.collect(),
name: table_ident.name().to_string(),
metadata_location: new_metadata_location.clone(),
};
let value = serde_json::to_vec(&entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
self.db
.put(&key, &value)
.await
.map_err(Self::convert_error)?;
Table::builder()
.identifier(table_ident)
.metadata(new_metadata)
.metadata_location(new_metadata_location)
.file_io(self.file_io.clone())
.build()
}
}
#[async_trait]
impl CatalogExt for SlateCatalog {
async fn commit_table(
&self,
table_ident: &TableIdent,
requirements: Vec<TableRequirement>,
updates: Vec<TableUpdate>,
) -> Result<Table> {
let table = self.load_table(table_ident).await?;
for requirement in &requirements {
requirement.check(Some(table.metadata()))?;
}
let current_metadata_location = table
.metadata_location()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Table has no metadata location"))?;
let mut metadata_builder = table
.metadata()
.clone()
.into_builder(Some(current_metadata_location.to_string()));
for update in updates {
metadata_builder = update.apply(metadata_builder)?;
}
let new_metadata = metadata_builder.build()?;
let table_location = table.metadata().location();
let new_metadata_location = MetadataLocation::new_with_table_location(table_location)
.with_next_version()
.to_string();
new_metadata
.metadata
.write_to(&self.file_io, &new_metadata_location)
.await?;
self.update_table_metadata_location(table_ident, new_metadata_location)
.await
}
async fn update_table_metadata_location(
&self,
table_ident: &TableIdent,
new_metadata_location: String,
) -> Result<Table> {
let key = Self::table_key(table_ident);
let metadata = TableMetadata::read_from(&self.file_io, &new_metadata_location).await?;
let entry = TableRegistryEntry {
namespace: table_ident
.namespace
.iter()
.map(|s| s.to_string())
.collect(),
name: table_ident.name().to_string(),
metadata_location: new_metadata_location.clone(),
};
let value = serde_json::to_vec(&entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
self.db
.put(&key, &value)
.await
.map_err(Self::convert_error)?;
Table::builder()
.identifier(table_ident.clone())
.metadata(metadata)
.metadata_location(new_metadata_location)
.file_io(self.file_io.clone())
.build()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_namespace_key() {
let ns = NamespaceIdent::from_vec(vec!["db".to_string(), "schema".to_string()]).unwrap();
let key = SlateCatalog::namespace_key(&ns);
assert_eq!(String::from_utf8_lossy(&key), "namespace:db.schema");
}
#[test]
fn test_table_key() {
let ns = NamespaceIdent::from_vec(vec!["db".to_string()]).unwrap();
let table = TableIdent::new(ns, "my_table".to_string());
let key = SlateCatalog::table_key(&table);
assert_eq!(String::from_utf8_lossy(&key), "table:db:my_table");
}
#[test]
fn test_table_prefix() {
let ns = NamespaceIdent::from_vec(vec!["db".to_string(), "schema".to_string()]).unwrap();
let prefix = SlateCatalog::table_prefix(&ns);
assert_eq!(prefix, "table:db.schema:");
}
}