use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use iceberg::io::{FileIOBuilder, StorageFactory};
use iceberg::{CatalogBuilder, Error, ErrorKind, Result};
use crate::catalog::RedbCatalog;
use crate::error::map_redb;
use crate::store::Store;
pub const REDB_CATALOG_PROP_DB_PATH: &str = "redb.db-path";
pub const REDB_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
pub const REDB_CATALOG_PROP_METADATA_CACHE_BYTES: &str = "redb.metadata-cache-bytes";
pub const REDB_CATALOG_PROP_TABLE_HANDLE_CACHE_CAPACITY: &str = "redb.table-handle-cache-capacity";
pub const REDB_CATALOG_PROP_DURABILITY: &str = "redb.durability";
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum WriteDurability {
#[default]
Immediate,
Eventual,
None,
}
impl WriteDurability {
pub(crate) fn to_redb(self) -> redb::Durability {
match self {
WriteDurability::Immediate => redb::Durability::Immediate,
WriteDurability::Eventual => redb::Durability::Eventual,
WriteDurability::None => redb::Durability::None,
}
}
fn parse(s: &str) -> Option<Self> {
match s.trim().to_ascii_lowercase().as_str() {
"immediate" => Some(Self::Immediate),
"eventual" => Some(Self::Eventual),
"none" => Some(Self::None),
_ => None,
}
}
}
#[derive(Debug, Default)]
pub struct RedbCatalogBuilder {
db_path: Option<PathBuf>,
warehouse_location: Option<String>,
storage_factory: Option<Arc<dyn StorageFactory>>,
metadata_cache_bytes: Option<u64>,
table_handle_cache_capacity: Option<u64>,
durability: Option<WriteDurability>,
props: HashMap<String, String>,
}
impl RedbCatalogBuilder {
pub fn db_path(mut self, path: impl Into<PathBuf>) -> Self {
self.db_path = Some(path.into());
self
}
pub fn warehouse_location(mut self, loc: impl Into<String>) -> Self {
self.warehouse_location = Some(loc.into());
self
}
pub fn metadata_cache_bytes(mut self, bytes: u64) -> Self {
self.metadata_cache_bytes = Some(bytes);
self
}
pub fn table_handle_cache_capacity(mut self, capacity: u64) -> Self {
self.table_handle_cache_capacity = Some(capacity);
self
}
pub fn durability(mut self, durability: WriteDurability) -> Self {
self.durability = Some(durability);
self
}
pub fn props(mut self, props: HashMap<String, String>) -> Self {
for (k, v) in props {
self.props.insert(k, v);
}
self
}
pub fn prop(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.props.insert(key.into(), value.into());
self
}
}
impl CatalogBuilder for RedbCatalogBuilder {
type C = RedbCatalog;
fn with_storage_factory(mut self, storage_factory: Arc<dyn StorageFactory>) -> Self {
self.storage_factory = Some(storage_factory);
self
}
fn load(
mut self,
name: impl Into<String>,
props: HashMap<String, String>,
) -> impl Future<Output = Result<Self::C>> + Send {
for (k, v) in props {
self.props.insert(k, v);
}
if let Some(p) = self.props.remove(REDB_CATALOG_PROP_DB_PATH) {
self.db_path = Some(PathBuf::from(p));
}
if let Some(w) = self.props.remove(REDB_CATALOG_PROP_WAREHOUSE) {
self.warehouse_location = Some(w);
}
let cache_bytes_prop = self.props.remove(REDB_CATALOG_PROP_METADATA_CACHE_BYTES);
let table_cache_prop = self
.props
.remove(REDB_CATALOG_PROP_TABLE_HANDLE_CACHE_CAPACITY);
let durability_prop = self.props.remove(REDB_CATALOG_PROP_DURABILITY);
let name = name.into();
let db_path = self.db_path.clone();
let warehouse_location = self.warehouse_location.clone();
let storage_factory = self.storage_factory.clone();
let metadata_cache_bytes = self.metadata_cache_bytes;
let table_handle_cache_capacity = self.table_handle_cache_capacity;
let durability = self.durability;
async move {
if name.trim().is_empty() {
return Err(Error::new(
ErrorKind::DataInvalid,
"Catalog name cannot be empty",
));
}
let db_path = db_path.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"redb catalog requires `{REDB_CATALOG_PROP_DB_PATH}` to be set, \
either via RedbCatalogBuilder::db_path or props"
),
)
})?;
let warehouse_location = warehouse_location.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"redb catalog requires `{REDB_CATALOG_PROP_WAREHOUSE}` to be set, \
either via RedbCatalogBuilder::warehouse_location or props"
),
)
})?;
let factory = storage_factory.ok_or_else(|| {
Error::new(
ErrorKind::Unexpected,
"StorageFactory must be provided for RedbCatalog. \
Use `with_storage_factory` to configure it.",
)
})?;
let metadata_cache_bytes = match cache_bytes_prop {
Some(s) => s.trim().parse::<u64>().map_err(|_| {
Error::new(
ErrorKind::DataInvalid,
format!(
"`{REDB_CATALOG_PROP_METADATA_CACHE_BYTES}` must be a non-negative \
integer number of bytes, got `{s}`"
),
)
})?,
None => metadata_cache_bytes.unwrap_or(crate::DEFAULT_METADATA_CACHE_BYTES),
};
let table_handle_cache_capacity = match table_cache_prop {
Some(s) => s.trim().parse::<u64>().map_err(|_| {
Error::new(
ErrorKind::DataInvalid,
format!(
"`{REDB_CATALOG_PROP_TABLE_HANDLE_CACHE_CAPACITY}` must be a \
non-negative integer, got `{s}`"
),
)
})?,
None => table_handle_cache_capacity
.unwrap_or(crate::DEFAULT_TABLE_HANDLE_CACHE_CAPACITY),
};
let durability = match durability_prop {
Some(s) => WriteDurability::parse(&s).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"`{REDB_CATALOG_PROP_DURABILITY}` must be one of \
immediate|eventual|none, got `{s}`"
),
)
})?,
None => durability.unwrap_or_default(),
};
let fileio = FileIOBuilder::new(factory).build();
if let Some(parent) = db_path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
format!("failed to create catalog dir {}: {e}", parent.display()),
)
})?;
}
}
let store = Store::open(db_path, durability.to_redb()).map_err(map_redb)?;
Ok(RedbCatalog {
name,
warehouse_location,
fileio,
store,
meta_cache: crate::meta_cache::MetadataCache::new(metadata_cache_bytes),
table_cache: crate::table_cache::TableHandleCache::new(table_handle_cache_capacity),
})
}
}
}