use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use iceberg::io::FileIO;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation, TableIdent, TableRequirement, TableUpdate,
};
use serde::{Deserialize, Serialize};
use slatedb::{Db, WriteBatch};
use super::CatalogExt;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct NamespaceMetadata {
namespace: Vec<String>,
properties: HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TableRegistryEntry {
namespace: Vec<String>,
name: String,
metadata_location: String,
#[serde(default)]
version: u64,
}
pub struct SlateCatalog {
db: Arc<Db>,
file_io: FileIO,
warehouse_location: String,
io_timeouts: IoTimeoutConfig,
}
#[derive(Debug, Clone)]
pub struct IoTimeoutConfig {
pub read_timeout: std::time::Duration,
pub write_timeout: std::time::Duration,
}
impl Default for IoTimeoutConfig {
fn default() -> Self {
Self {
read_timeout: std::time::Duration::from_secs(60),
write_timeout: std::time::Duration::from_secs(30),
}
}
}
impl IoTimeoutConfig {
pub fn new(read_timeout: std::time::Duration, write_timeout: std::time::Duration) -> Self {
Self {
read_timeout,
write_timeout,
}
}
pub fn with_read_timeout(mut self, timeout: std::time::Duration) -> Self {
self.read_timeout = timeout;
self
}
pub fn with_write_timeout(mut self, timeout: std::time::Duration) -> Self {
self.write_timeout = timeout;
self
}
}
impl std::fmt::Debug for SlateCatalog {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SlateCatalog")
.field("warehouse_location", &self.warehouse_location)
.finish_non_exhaustive()
}
}
impl SlateCatalog {
pub async fn new(db: Arc<Db>, warehouse_location: String) -> Result<Self> {
Self::with_timeouts(db, warehouse_location, IoTimeoutConfig::default()).await
}
pub async fn with_timeouts(
db: Arc<Db>,
warehouse_location: String,
io_timeouts: IoTimeoutConfig,
) -> Result<Self> {
let warehouse_location = Self::normalize_and_ensure_local_directory(&warehouse_location)?;
let file_io = FileIO::from_path(&warehouse_location)?.build()?;
Ok(Self {
db,
file_io,
warehouse_location,
io_timeouts,
})
}
pub async fn with_props(
db: Arc<Db>,
warehouse_location: String,
props: HashMap<String, String>,
) -> Result<Self> {
Self::with_props_and_timeouts(db, warehouse_location, props, IoTimeoutConfig::default())
.await
}
pub async fn with_props_and_timeouts(
db: Arc<Db>,
warehouse_location: String,
props: HashMap<String, String>,
io_timeouts: IoTimeoutConfig,
) -> Result<Self> {
let warehouse_location = Self::normalize_and_ensure_local_directory(&warehouse_location)?;
let file_io = FileIO::from_path(&warehouse_location)?
.with_props(props)
.build()?;
Ok(Self {
db,
file_io,
warehouse_location,
io_timeouts,
})
}
fn normalize_and_ensure_local_directory(warehouse_location: &str) -> Result<String> {
if warehouse_location.starts_with("s3://")
|| warehouse_location.starts_with("gs://")
|| warehouse_location.starts_with("az://")
|| warehouse_location.starts_with("memory://")
{
return Ok(warehouse_location.to_string());
}
let path = if warehouse_location.starts_with("file://") {
warehouse_location
.strip_prefix("file://")
.unwrap_or(warehouse_location)
} else {
warehouse_location
};
let absolute_path = if std::path::Path::new(path).is_absolute() {
std::path::PathBuf::from(path)
} else {
std::env::current_dir()
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to get current directory: {}", e),
)
})?
.join(path)
};
std::fs::create_dir_all(&absolute_path).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!(
"Failed to create warehouse directory '{}': {}",
absolute_path.display(),
e
),
)
})?;
let normalized = format!("file://{}", absolute_path.display());
tracing::info!(
original = %warehouse_location,
normalized = %normalized,
"Normalized warehouse location"
);
Ok(normalized)
}
fn namespace_key(namespace: &NamespaceIdent) -> Vec<u8> {
format!("namespace:{}", namespace.join(".")).into_bytes()
}
fn table_key(table: &TableIdent) -> Vec<u8> {
format!("table:{}:{}", table.namespace.join("."), table.name).into_bytes()
}
fn table_prefix(namespace: &NamespaceIdent) -> String {
format!("table:{}:", namespace.join("."))
}
fn table_location(&self, table: &TableIdent) -> String {
format!(
"{}/{}/{}",
self.warehouse_location,
table.namespace.join("/"),
table.name
)
}
fn convert_error(err: slatedb::Error) -> Error {
Error::new(ErrorKind::Unexpected, format!("SlateDB error: {}", err))
}
fn make_namespace_ident(parts: Vec<String>) -> Result<NamespaceIdent> {
NamespaceIdent::from_vec(parts).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
format!("Invalid namespace identifier in stored data: {}", e),
)
})
}
async fn read_metadata_with_timeout(&self, location: &str) -> Result<TableMetadata> {
tokio::time::timeout(
self.io_timeouts.read_timeout,
TableMetadata::read_from(&self.file_io, location),
)
.await
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
format!(
"Timeout reading table metadata from '{}' after {:?}",
location, self.io_timeouts.read_timeout
),
)
})?
}
async fn write_metadata_with_timeout(
&self,
metadata: &TableMetadata,
location: &str,
) -> Result<()> {
tokio::time::timeout(
self.io_timeouts.write_timeout,
metadata.write_to(&self.file_io, location),
)
.await
.map_err(|_| {
Error::new(
ErrorKind::Unexpected,
format!(
"Timeout writing table metadata to '{}' after {:?}",
location, self.io_timeouts.write_timeout
),
)
})?
}
}
#[async_trait]
impl Catalog for SlateCatalog {
async fn list_namespaces(
&self,
parent: Option<&NamespaceIdent>,
) -> Result<Vec<NamespaceIdent>> {
let prefix = if let Some(p) = parent {
format!("namespace:{}.", p.join("."))
} else {
"namespace:".to_string()
};
let mut namespaces = Vec::new();
let mut iter = self
.db
.scan_prefix(prefix.as_bytes())
.await
.map_err(Self::convert_error)?;
while let Some(kv) = iter.next().await.map_err(Self::convert_error)? {
if let Ok(metadata) = serde_json::from_slice::<NamespaceMetadata>(&kv.value) {
if let Some(parent_ns) = parent {
let parent_parts: Vec<&str> = parent_ns.iter().map(|s| s.as_str()).collect();
if metadata.namespace.len() == parent_parts.len() + 1
&& metadata.namespace[..parent_parts.len()]
.iter()
.zip(parent_parts.iter())
.all(|(a, b)| a == *b)
{
namespaces.push(Self::make_namespace_ident(metadata.namespace)?);
}
} else if metadata.namespace.len() == 1 {
namespaces.push(Self::make_namespace_ident(metadata.namespace)?);
}
}
}
Ok(namespaces)
}
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<Namespace> {
let key = Self::namespace_key(namespace);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_some()
{
return Err(Error::new(
ErrorKind::NamespaceAlreadyExists,
format!("Namespace already exists: {}", namespace.join(".")),
));
}
let metadata = NamespaceMetadata {
namespace: namespace.iter().map(|s| s.to_string()).collect(),
properties: properties.clone(),
};
let value = serde_json::to_vec(&metadata).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize namespace metadata: {}", e),
)
})?;
self.db
.put(&key, &value)
.await
.map_err(Self::convert_error)?;
Ok(Namespace::with_properties(namespace.clone(), properties))
}
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
let key = Self::namespace_key(namespace);
let value = self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| {
Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", namespace.join(".")),
)
})?;
let metadata: NamespaceMetadata = serde_json::from_slice(&value).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize namespace metadata: {}", e),
)
})?;
Ok(Namespace::with_properties(
namespace.clone(),
metadata.properties,
))
}
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
let key = Self::namespace_key(namespace);
Ok(self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_some())
}
async fn update_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
let key = Self::namespace_key(namespace);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_none()
{
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", namespace.join(".")),
));
}
let metadata = NamespaceMetadata {
namespace: namespace.iter().map(|s| s.to_string()).collect(),
properties,
};
let new_value = serde_json::to_vec(&metadata).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize namespace metadata: {}", e),
)
})?;
self.db
.put(&key, &new_value)
.await
.map_err(Self::convert_error)?;
Ok(())
}
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
let key = Self::namespace_key(namespace);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_none()
{
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", namespace.join(".")),
));
}
let prefix = Self::table_prefix(namespace);
let mut iter = self
.db
.scan_prefix(prefix.as_bytes())
.await
.map_err(Self::convert_error)?;
if iter.next().await.map_err(Self::convert_error)?.is_some() {
return Err(Error::new(
ErrorKind::DataInvalid,
format!("Namespace not empty: {}", namespace.join(".")),
));
}
self.db.delete(&key).await.map_err(Self::convert_error)?;
Ok(())
}
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
if !self.namespace_exists(namespace).await? {
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", namespace.join(".")),
));
}
let prefix = Self::table_prefix(namespace);
let mut tables = Vec::new();
let mut iter = self
.db
.scan_prefix(prefix.as_bytes())
.await
.map_err(Self::convert_error)?;
while let Some(kv) = iter.next().await.map_err(Self::convert_error)? {
if let Ok(entry) = serde_json::from_slice::<TableRegistryEntry>(&kv.value) {
tables.push(TableIdent::new(
Self::make_namespace_ident(entry.namespace)?,
entry.name,
));
}
}
Ok(tables)
}
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
if !self.namespace_exists(namespace).await? {
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", namespace.join(".")),
));
}
let table_name = creation.name.clone();
let table_ident = TableIdent::new(namespace.clone(), table_name);
let key = Self::table_key(&table_ident);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_some()
{
return Err(Error::new(
ErrorKind::TableAlreadyExists,
format!("Table already exists: {}", table_ident.name()),
));
}
let (creation, location) = match creation.location.clone() {
Some(loc) => (creation, loc),
None => {
let location = self.table_location(&table_ident);
let new_creation = TableCreation {
location: Some(location.clone()),
..creation
};
(new_creation, location)
}
};
let metadata = TableMetadataBuilder::from_table_creation(creation)?
.build()?
.metadata;
let metadata_location = MetadataLocation::new_with_table_location(&location).to_string();
self.write_metadata_with_timeout(&metadata, &metadata_location)
.await?;
let registry_entry = TableRegistryEntry {
namespace: namespace.iter().map(|s| s.to_string()).collect(),
name: table_ident.name().to_string(),
metadata_location: metadata_location.clone(),
version: 0, };
let value = serde_json::to_vec(®istry_entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
self.db
.put(&key, &value)
.await
.map_err(Self::convert_error)?;
Table::builder()
.file_io(self.file_io.clone())
.metadata_location(metadata_location)
.metadata(metadata)
.identifier(table_ident)
.build()
}
async fn load_table(&self, table: &TableIdent) -> Result<Table> {
let key = Self::table_key(table);
let value = self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| {
Error::new(
ErrorKind::TableNotFound,
format!(
"Table not found: {}.{}",
table.namespace.join("."),
table.name
),
)
})?;
let entry: TableRegistryEntry = serde_json::from_slice(&value).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize table registry entry: {}", e),
)
})?;
let metadata = self
.read_metadata_with_timeout(&entry.metadata_location)
.await?;
Table::builder()
.identifier(table.clone())
.metadata(metadata)
.metadata_location(entry.metadata_location)
.file_io(self.file_io.clone())
.build()
}
async fn drop_table(&self, table: &TableIdent) -> Result<()> {
let key = Self::table_key(table);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_none()
{
return Err(Error::new(
ErrorKind::TableNotFound,
format!(
"Table not found: {}.{}",
table.namespace.join("."),
table.name
),
));
}
self.db.delete(&key).await.map_err(Self::convert_error)?;
Ok(())
}
async fn table_exists(&self, table: &TableIdent) -> Result<bool> {
let key = Self::table_key(table);
Ok(self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_some())
}
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
let src_key = Self::table_key(src);
let dest_key = Self::table_key(dest);
let value = self
.db
.get(&src_key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| {
Error::new(
ErrorKind::TableNotFound,
format!(
"Source table not found: {}.{}",
src.namespace.join("."),
src.name
),
)
})?;
if self
.db
.get(&dest_key)
.await
.map_err(Self::convert_error)?
.is_some()
{
return Err(Error::new(
ErrorKind::TableAlreadyExists,
format!(
"Destination table already exists: {}.{}",
dest.namespace.join("."),
dest.name
),
));
}
if !self.namespace_exists(&dest.namespace).await? {
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!(
"Destination namespace not found: {}",
dest.namespace.join(".")
),
));
}
let mut entry: TableRegistryEntry = serde_json::from_slice(&value).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize table registry entry: {}", e),
)
})?;
entry.namespace = dest.namespace.iter().map(|s| s.to_string()).collect();
entry.name = dest.name.clone();
let new_value = serde_json::to_vec(&entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
let mut batch = WriteBatch::new();
batch.put(&dest_key, &new_value);
batch.delete(&src_key);
self.db.write(batch).await.map_err(Self::convert_error)?;
Ok(())
}
async fn register_table(&self, table: &TableIdent, metadata_location: String) -> Result<Table> {
let key = Self::table_key(table);
if self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.is_some()
{
return Err(Error::new(
ErrorKind::TableAlreadyExists,
format!(
"Table already exists: {}.{}",
table.namespace.join("."),
table.name
),
));
}
if !self.namespace_exists(&table.namespace).await? {
return Err(Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace not found: {}", table.namespace.join(".")),
));
}
let metadata = self.read_metadata_with_timeout(&metadata_location).await?;
let entry = TableRegistryEntry {
namespace: table.namespace.iter().map(|s| s.to_string()).collect(),
name: table.name.clone(),
metadata_location: metadata_location.clone(),
version: 0, };
let value = serde_json::to_vec(&entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
self.db
.put(&key, &value)
.await
.map_err(Self::convert_error)?;
Table::builder()
.identifier(table.clone())
.metadata(metadata)
.metadata_location(metadata_location)
.file_io(self.file_io.clone())
.build()
}
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
let table_ident = commit.identifier().clone();
let key = Self::table_key(&table_ident);
let current_table = self.load_table(&table_ident).await?;
let current_metadata = current_table.metadata().clone();
let current_location = current_table
.metadata_location()
.ok_or_else(|| Error::new(ErrorKind::Unexpected, "Table has no metadata location"))?;
let current_entry_bytes = self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| Error::new(ErrorKind::TableNotFound, "Table not found during update"))?;
let current_entry: TableRegistryEntry = serde_json::from_slice(¤t_entry_bytes)
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize entry: {}", e),
)
})?;
let expected_version = current_entry.version;
for req in commit.take_requirements() {
req.check(Some(¤t_metadata))?;
}
let mut builder = TableMetadataBuilder::new_from_metadata(
current_metadata.clone(),
Some(current_location.to_string()),
);
for update in commit.take_updates() {
builder = update.apply(builder)?;
}
let new_metadata = builder.build()?.metadata;
let table_location = current_table.metadata().location();
let new_metadata_location = MetadataLocation::new_with_table_location(table_location)
.with_next_version()
.to_string();
self.write_metadata_with_timeout(&new_metadata, &new_metadata_location)
.await?;
let verify_entry_bytes = self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| {
Error::new(ErrorKind::TableNotFound, "Table disappeared during update")
})?;
let verify_entry: TableRegistryEntry = serde_json::from_slice(&verify_entry_bytes)
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize entry: {}", e),
)
})?;
if verify_entry.version != expected_version {
return Err(Error::new(
ErrorKind::CatalogCommitConflicts,
format!(
"Commit conflict: table was modified by another process (expected version {}, found {})",
expected_version, verify_entry.version
),
));
}
let entry = TableRegistryEntry {
namespace: table_ident
.namespace
.iter()
.map(|s| s.to_string())
.collect(),
name: table_ident.name().to_string(),
metadata_location: new_metadata_location.clone(),
version: expected_version + 1, };
let value = serde_json::to_vec(&entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
self.db
.put(&key, &value)
.await
.map_err(Self::convert_error)?;
Table::builder()
.identifier(table_ident)
.metadata(new_metadata)
.metadata_location(new_metadata_location)
.file_io(self.file_io.clone())
.build()
}
}
#[async_trait]
impl CatalogExt for SlateCatalog {
async fn commit_table(
&self,
table_ident: &TableIdent,
requirements: Vec<TableRequirement>,
updates: Vec<TableUpdate>,
) -> Result<Table> {
let key = Self::table_key(table_ident);
let table = self.load_table(table_ident).await?;
let current_entry_bytes = self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| Error::new(ErrorKind::TableNotFound, "Table not found"))?;
let current_entry: TableRegistryEntry = serde_json::from_slice(¤t_entry_bytes)
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize entry: {}", e),
)
})?;
let expected_version = current_entry.version;
let expected_metadata_location = current_entry.metadata_location.clone();
for requirement in &requirements {
requirement.check(Some(table.metadata()))?;
}
let current_metadata_location = table
.metadata_location()
.ok_or_else(|| Error::new(ErrorKind::DataInvalid, "Table has no metadata location"))?;
let mut metadata_builder = table
.metadata()
.clone()
.into_builder(Some(current_metadata_location.to_string()));
for update in updates {
metadata_builder = update.apply(metadata_builder)?;
}
let new_metadata = metadata_builder.build()?;
let table_location = table.metadata().location();
let new_metadata_location = MetadataLocation::new_with_table_location(table_location)
.with_next_version()
.to_string();
self.write_metadata_with_timeout(&new_metadata.metadata, &new_metadata_location)
.await?;
let verify_entry_bytes = self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| {
Error::new(ErrorKind::TableNotFound, "Table disappeared during commit")
})?;
let verify_entry: TableRegistryEntry = serde_json::from_slice(&verify_entry_bytes)
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize entry: {}", e),
)
})?;
if verify_entry.version != expected_version
|| verify_entry.metadata_location != expected_metadata_location
{
return Err(Error::new(
ErrorKind::CatalogCommitConflicts,
format!(
"Commit conflict: table was modified by another process (expected version {}, found {})",
expected_version, verify_entry.version
),
));
}
let metadata = self
.read_metadata_with_timeout(&new_metadata_location)
.await?;
let entry = TableRegistryEntry {
namespace: table_ident
.namespace
.iter()
.map(|s| s.to_string())
.collect(),
name: table_ident.name().to_string(),
metadata_location: new_metadata_location.clone(),
version: expected_version + 1, };
let value = serde_json::to_vec(&entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
self.db
.put(&key, &value)
.await
.map_err(Self::convert_error)?;
Table::builder()
.identifier(table_ident.clone())
.metadata(metadata)
.metadata_location(new_metadata_location)
.file_io(self.file_io.clone())
.build()
}
async fn update_table_metadata_location(
&self,
table_ident: &TableIdent,
new_metadata_location: String,
) -> Result<Table> {
let key = Self::table_key(table_ident);
let current_entry_bytes = self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| Error::new(ErrorKind::TableNotFound, "Table not found"))?;
let current_entry: TableRegistryEntry = serde_json::from_slice(¤t_entry_bytes)
.map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize entry: {}", e),
)
})?;
let expected_version = current_entry.version;
let metadata = self
.read_metadata_with_timeout(&new_metadata_location)
.await?;
let entry = TableRegistryEntry {
namespace: table_ident
.namespace
.iter()
.map(|s| s.to_string())
.collect(),
name: table_ident.name().to_string(),
metadata_location: new_metadata_location.clone(),
version: expected_version + 1, };
let value = serde_json::to_vec(&entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
self.db
.put(&key, &value)
.await
.map_err(Self::convert_error)?;
Table::builder()
.identifier(table_ident.clone())
.metadata(metadata)
.metadata_location(new_metadata_location)
.file_io(self.file_io.clone())
.build()
}
async fn commit_tables_atomic(
&self,
table_changes: Vec<(TableIdent, Vec<TableRequirement>, Vec<TableUpdate>)>,
) -> Result<Vec<Table>> {
use std::time::Duration;
use tokio::time::sleep;
const MAX_RETRIES: u32 = 10;
const BASE_BACKOFF_MS: u64 = 10;
if table_changes.len() == 1 {
let (ident, reqs, updates) = table_changes.into_iter().next().unwrap();
let table = self.commit_table(&ident, reqs, updates).await?;
return Ok(vec![table]);
}
for attempt in 0..MAX_RETRIES {
match self.try_commit_tables_atomic(&table_changes).await {
Ok(tables) => return Ok(tables),
Err(e) if e.kind() == ErrorKind::CatalogCommitConflicts => {
let backoff_ms = BASE_BACKOFF_MS * (1 << attempt.min(6)); tracing::debug!(
attempt = attempt + 1,
max_retries = MAX_RETRIES,
backoff_ms = backoff_ms,
"Commit conflict detected, retrying"
);
sleep(Duration::from_millis(backoff_ms)).await;
}
Err(e) => return Err(e),
}
}
Err(Error::new(
ErrorKind::CatalogCommitConflicts,
format!(
"Failed to commit atomic transaction after {} retries",
MAX_RETRIES
),
))
}
async fn storage_health_check(&self) -> Result<super::StorageHealthStatus> {
use std::time::Instant;
let start = Instant::now();
let backend_type = if self.warehouse_location.starts_with("s3://") {
"s3"
} else if self.warehouse_location.starts_with("gs://") {
"gcs"
} else if self.warehouse_location.starts_with("az://")
|| self.warehouse_location.starts_with("abfs://")
{
"azure"
} else if self.warehouse_location.starts_with("file://") {
"file"
} else if self.warehouse_location.starts_with("memory://") {
"memory"
} else {
"unknown"
};
match self.file_io.exists(&self.warehouse_location).await {
Ok(_) => {
let latency_ms = start.elapsed().as_millis() as u64;
Ok(super::StorageHealthStatus::healthy(
backend_type,
latency_ms,
))
}
Err(e) => {
let err_str = e.to_string();
if err_str.contains("NotFound") || err_str.contains("not found") {
let latency_ms = start.elapsed().as_millis() as u64;
Ok(super::StorageHealthStatus::healthy(
backend_type,
latency_ms,
))
} else {
Ok(super::StorageHealthStatus::unhealthy(
backend_type,
format!("Storage check failed: {}", e),
))
}
}
}
}
}
impl SlateCatalog {
async fn try_commit_tables_atomic(
&self,
table_changes: &[(TableIdent, Vec<TableRequirement>, Vec<TableUpdate>)],
) -> Result<Vec<Table>> {
let mut table_states: Vec<(
&TableIdent,
Table,
TableRegistryEntry,
Vec<u8>, // registry key
)> = Vec::with_capacity(table_changes.len());
for (ident, _, _) in table_changes {
let key = Self::table_key(ident);
let table = self.load_table(ident).await?;
let entry_bytes = self
.db
.get(&key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| {
Error::new(
ErrorKind::TableNotFound,
format!("Table not found: {}", ident),
)
})?;
let entry: TableRegistryEntry = serde_json::from_slice(&entry_bytes).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize entry: {}", e),
)
})?;
table_states.push((ident, table, entry, key));
}
for ((ident, requirements, _), (_, table, _, _)) in
table_changes.iter().zip(table_states.iter())
{
for requirement in requirements {
requirement.check(Some(table.metadata())).map_err(|e| {
Error::new(
e.kind(),
format!("Requirement failed for table {}: {}", ident, e.message()),
)
})?;
}
}
#[allow(clippy::type_complexity)]
let mut prepared_updates: Vec<(
&TableIdent,
String, // new metadata location
TableRegistryEntry, // new registry entry
Vec<u8>, // registry key
u64, // expected version
String, // expected metadata location
)> = Vec::with_capacity(table_changes.len());
for ((ident, _, updates), (_, table, current_entry, key)) in
table_changes.iter().zip(table_states.iter())
{
let current_metadata_location = table.metadata_location().ok_or_else(|| {
Error::new(ErrorKind::DataInvalid, "Table has no metadata location")
})?;
let mut metadata_builder = table
.metadata()
.clone()
.into_builder(Some(current_metadata_location.to_string()));
for update in updates {
metadata_builder = update.clone().apply(metadata_builder)?;
}
let new_metadata = metadata_builder.build()?;
let table_location = table.metadata().location();
let new_metadata_location = MetadataLocation::new_with_table_location(table_location)
.with_next_version()
.to_string();
self.write_metadata_with_timeout(&new_metadata.metadata, &new_metadata_location)
.await?;
let new_entry = TableRegistryEntry {
namespace: ident.namespace.iter().map(|s| s.to_string()).collect(),
name: ident.name().to_string(),
metadata_location: new_metadata_location.clone(),
version: current_entry.version + 1,
};
prepared_updates.push((
ident,
new_metadata_location,
new_entry,
key.clone(),
current_entry.version,
current_entry.metadata_location.clone(),
));
}
for (ident, _, _, key, expected_version, expected_location) in &prepared_updates {
let verify_bytes = self
.db
.get(key)
.await
.map_err(Self::convert_error)?
.ok_or_else(|| {
Error::new(
ErrorKind::TableNotFound,
format!("Table disappeared during commit: {}", ident),
)
})?;
let verify_entry: TableRegistryEntry =
serde_json::from_slice(&verify_bytes).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to deserialize entry: {}", e),
)
})?;
if verify_entry.version != *expected_version
|| &verify_entry.metadata_location != expected_location
{
return Err(Error::new(
ErrorKind::CatalogCommitConflicts,
format!(
"Commit conflict on table {}: expected version {}, found {}",
ident, expected_version, verify_entry.version
),
));
}
}
let mut batch = WriteBatch::new();
for (_, _, new_entry, key, _, _) in &prepared_updates {
let value = serde_json::to_vec(new_entry).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("Failed to serialize table registry entry: {}", e),
)
})?;
batch.put(key, &value);
}
self.db.write(batch).await.map_err(Self::convert_error)?;
let mut result_tables = Vec::with_capacity(prepared_updates.len());
for (ident, new_metadata_location, _, _, _, _) in prepared_updates {
let metadata = self
.read_metadata_with_timeout(&new_metadata_location)
.await?;
let table = Table::builder()
.identifier(ident.clone())
.metadata(metadata)
.metadata_location(new_metadata_location)
.file_io(self.file_io.clone())
.build()?;
result_tables.push(table);
}
Ok(result_tables)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_namespace_key() {
let ns = NamespaceIdent::from_vec(vec!["db".to_string(), "schema".to_string()]).unwrap();
let key = SlateCatalog::namespace_key(&ns);
assert_eq!(String::from_utf8_lossy(&key), "namespace:db.schema");
}
#[test]
fn test_table_key() {
let ns = NamespaceIdent::from_vec(vec!["db".to_string()]).unwrap();
let table = TableIdent::new(ns, "my_table".to_string());
let key = SlateCatalog::table_key(&table);
assert_eq!(String::from_utf8_lossy(&key), "table:db:my_table");
}
#[test]
fn test_table_prefix() {
let ns = NamespaceIdent::from_vec(vec!["db".to_string(), "schema".to_string()]).unwrap();
let prefix = SlateCatalog::table_prefix(&ns);
assert_eq!(prefix, "table:db.schema:");
}
}