use std::sync::Arc;
use super::config::{BlockCacheConfig, ObjectStoreConfig, StorageConfig};
use super::in_memory::InMemoryStorage;
use super::slate::{SlateDbStorage, SlateDbStorageReader};
use super::{MergeOperator, Storage, StorageError, StorageRead, StorageResult};
use slatedb::DbReader;
use slatedb::config::Settings;
pub use slatedb::db_cache::CachedEntry;
use slatedb::db_cache::DbCache;
pub use slatedb::db_cache::foyer::{FoyerCache, FoyerCacheOptions};
pub use slatedb::db_cache::foyer_hybrid::FoyerHybridCache;
use slatedb::object_store::{self, ObjectStore};
pub use slatedb::{CompactorBuilder, DbBuilder};
use tracing::info;
pub struct StorageBuilder {
inner: StorageBuilderInner,
semantics: StorageSemantics,
}
enum StorageBuilderInner {
InMemory,
SlateDb(Box<DbBuilder<String>>),
}
impl StorageBuilder {
pub async fn new(config: &StorageConfig) -> StorageResult<Self> {
let inner = match config {
StorageConfig::InMemory => StorageBuilderInner::InMemory,
StorageConfig::SlateDb(slate_config) => {
let object_store = create_object_store(&slate_config.object_store)?;
let settings = match &slate_config.settings_path {
Some(path) => Settings::from_file(path).map_err(|e| {
StorageError::Storage(format!(
"Failed to load SlateDB settings from {}: {}",
path, e
))
})?,
None => Settings::load().unwrap_or_default(),
};
info!(
"create slatedb storage with config: {:?}, settings: {:?}",
slate_config, settings
);
let mut db_builder =
DbBuilder::new(slate_config.path.clone(), object_store).with_settings(settings);
if let Some(cache) =
create_block_cache_from_config(&slate_config.block_cache).await?
{
db_builder = db_builder.with_db_cache(cache);
}
StorageBuilderInner::SlateDb(Box::new(db_builder))
}
};
Ok(Self {
inner,
semantics: StorageSemantics::default(),
})
}
pub fn with_semantics(mut self, semantics: StorageSemantics) -> Self {
self.semantics = semantics;
self
}
pub fn map_slatedb(mut self, f: impl FnOnce(DbBuilder<String>) -> DbBuilder<String>) -> Self {
if let StorageBuilderInner::SlateDb(db) = self.inner {
self.inner = StorageBuilderInner::SlateDb(Box::new(f(*db)));
}
self
}
pub async fn build(self) -> StorageResult<Arc<dyn Storage>> {
match self.inner {
StorageBuilderInner::InMemory => {
let storage = match self.semantics.merge_operator {
Some(op) => InMemoryStorage::with_merge_operator(op),
None => InMemoryStorage::new(),
};
Ok(Arc::new(storage))
}
StorageBuilderInner::SlateDb(db_builder) => {
let mut db_builder = *db_builder;
if let Some(op) = self.semantics.merge_operator {
let adapter = SlateDbStorage::merge_operator_adapter(op);
db_builder = db_builder.with_merge_operator(Arc::new(adapter));
}
let db = db_builder.build().await.map_err(|e| {
StorageError::Storage(format!("Failed to create SlateDB: {}", e))
})?;
Ok(Arc::new(SlateDbStorage::new(Arc::new(db))))
}
}
}
}
#[derive(Default)]
pub struct StorageReaderRuntime {
pub(crate) block_cache: Option<Arc<dyn DbCache>>,
}
impl StorageReaderRuntime {
pub fn new() -> Self {
Self::default()
}
pub fn with_block_cache(mut self, cache: Arc<dyn DbCache>) -> Self {
self.block_cache = Some(cache);
self
}
}
#[derive(Default)]
pub struct StorageSemantics {
pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
}
impl StorageSemantics {
pub fn new() -> Self {
Self::default()
}
pub fn with_merge_operator(mut self, op: Arc<dyn MergeOperator>) -> Self {
self.merge_operator = Some(op);
self
}
}
pub fn create_object_store(config: &ObjectStoreConfig) -> StorageResult<Arc<dyn ObjectStore>> {
match config {
ObjectStoreConfig::InMemory => Ok(Arc::new(object_store::memory::InMemory::new())),
ObjectStoreConfig::Aws(aws_config) => {
let store = object_store::aws::AmazonS3Builder::from_env()
.with_region(&aws_config.region)
.with_bucket_name(&aws_config.bucket)
.build()
.map_err(|e| {
StorageError::Storage(format!("Failed to create AWS S3 store: {}", e))
})?;
Ok(Arc::new(store))
}
ObjectStoreConfig::Local(local_config) => {
std::fs::create_dir_all(&local_config.path).map_err(|e| {
StorageError::Storage(format!(
"Failed to create storage directory '{}': {}",
local_config.path, e
))
})?;
let store = object_store::local::LocalFileSystem::new_with_prefix(&local_config.path)
.map_err(|e| {
StorageError::Storage(format!("Failed to create local filesystem store: {}", e))
})?;
Ok(Arc::new(store))
}
}
}
pub async fn create_storage_read(
config: &StorageConfig,
runtime: StorageReaderRuntime,
semantics: StorageSemantics,
reader_options: slatedb::config::DbReaderOptions,
) -> StorageResult<Arc<dyn StorageRead>> {
match config {
StorageConfig::InMemory => {
let storage = match semantics.merge_operator {
Some(op) => InMemoryStorage::with_merge_operator(op),
None => InMemoryStorage::new(),
};
Ok(Arc::new(storage))
}
StorageConfig::SlateDb(slate_config) => {
let object_store = create_object_store(&slate_config.object_store)?;
let mut options = reader_options;
if let Some(op) = semantics.merge_operator {
let adapter = SlateDbStorage::merge_operator_adapter(op);
options.merge_operator = Some(Arc::new(adapter));
}
if let Some(cache) = runtime.block_cache {
options.block_cache = Some(cache);
} else if let Some(cache) =
create_block_cache_from_config(&slate_config.block_cache).await?
{
options.block_cache = Some(cache);
}
let reader = DbReader::open(
slate_config.path.clone(),
object_store,
None, options,
)
.await
.map_err(|e| {
StorageError::Storage(format!("Failed to create SlateDB reader: {}", e))
})?;
Ok(Arc::new(SlateDbStorageReader::new(Arc::new(reader))))
}
}
}
async fn create_block_cache_from_config(
config: &Option<BlockCacheConfig>,
) -> StorageResult<Option<Arc<dyn DbCache>>> {
let Some(config) = config else {
return Ok(None);
};
match config {
BlockCacheConfig::FoyerHybrid(foyer_config) => {
use foyer::{DirectFsDeviceOptions, Engine, HybridCacheBuilder};
let memory_capacity = usize::try_from(foyer_config.memory_capacity).map_err(|_| {
StorageError::Storage(format!(
"memory_capacity {} exceeds usize::MAX on this platform",
foyer_config.memory_capacity
))
})?;
let disk_capacity = usize::try_from(foyer_config.disk_capacity).map_err(|_| {
StorageError::Storage(format!(
"disk_capacity {} exceeds usize::MAX on this platform",
foyer_config.disk_capacity
))
})?;
let cache = HybridCacheBuilder::new()
.with_name("slatedb_block_cache")
.memory(memory_capacity)
.with_weighter(|_, v: &CachedEntry| v.size())
.storage(Engine::large())
.with_device_options(
DirectFsDeviceOptions::new(&foyer_config.disk_path)
.with_capacity(disk_capacity),
)
.build()
.await
.map_err(|e| {
StorageError::Storage(format!("Failed to create hybrid cache: {}", e))
})?;
info!(
memory_mb = foyer_config.memory_capacity / (1024 * 1024),
disk_mb = foyer_config.disk_capacity / (1024 * 1024),
disk_path = %foyer_config.disk_path,
"hybrid block cache enabled"
);
Ok(Some(
Arc::new(FoyerHybridCache::new_with_cache(cache)) as Arc<dyn DbCache>
))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::config::{
FoyerHybridCacheConfig, LocalObjectStoreConfig, SlateDbStorageConfig,
};
fn slatedb_config_with_local_dir(dir: &std::path::Path) -> StorageConfig {
StorageConfig::SlateDb(SlateDbStorageConfig {
path: "data".to_string(),
object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: dir.to_str().unwrap().to_string(),
}),
settings_path: None,
block_cache: None,
})
}
#[tokio::test]
async fn should_create_storage_with_block_cache_from_config() {
let tmp = tempfile::tempdir().unwrap();
let cache_dir = tmp.path().join("block-cache");
std::fs::create_dir_all(&cache_dir).unwrap();
let config = StorageConfig::SlateDb(SlateDbStorageConfig {
path: "data".to_string(),
object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: tmp.path().join("obj").to_str().unwrap().to_string(),
}),
settings_path: None,
block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
memory_capacity: 1024 * 1024,
disk_capacity: 4 * 1024 * 1024,
disk_path: cache_dir.to_str().unwrap().to_string(),
})),
});
let storage = StorageBuilder::new(&config).await.unwrap().build().await;
assert!(
storage.is_ok(),
"expected config-driven block cache to work"
);
}
#[tokio::test]
async fn should_create_reader_with_block_cache_from_config() {
let tmp = tempfile::tempdir().unwrap();
let cache_dir = tmp.path().join("block-cache");
std::fs::create_dir_all(&cache_dir).unwrap();
let slate_config = SlateDbStorageConfig {
path: "data".to_string(),
object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: tmp.path().join("obj").to_str().unwrap().to_string(),
}),
settings_path: None,
block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
memory_capacity: 1024 * 1024,
disk_capacity: 4 * 1024 * 1024,
disk_path: cache_dir.to_str().unwrap().to_string(),
})),
};
let writer = StorageBuilder::new(&StorageConfig::SlateDb(slate_config.clone()))
.await
.unwrap()
.build()
.await
.unwrap();
drop(writer);
let reader = create_storage_read(
&StorageConfig::SlateDb(slate_config),
StorageReaderRuntime::new(),
StorageSemantics::new(),
slatedb::config::DbReaderOptions::default(),
)
.await;
assert!(
reader.is_ok(),
"expected config-driven block cache on reader to work"
);
}
#[cfg(target_pointer_width = "32")]
#[tokio::test]
async fn should_error_when_capacity_exceeds_usize() {
let config = BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
memory_capacity: u64::MAX,
disk_capacity: 4 * 1024 * 1024,
disk_path: "/tmp/unused".to_string(),
});
let result = create_block_cache_from_config(&Some(config)).await;
assert!(result.is_err());
}
fn config_with_invalid_block_cache_disk_path(
obj_dir: &std::path::Path,
bad_disk_path: &str,
) -> StorageConfig {
StorageConfig::SlateDb(SlateDbStorageConfig {
path: "data".to_string(),
object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: obj_dir.to_str().unwrap().to_string(),
}),
settings_path: None,
block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
memory_capacity: 1024 * 1024,
disk_capacity: 4 * 1024 * 1024,
disk_path: bad_disk_path.to_string(),
})),
})
}
#[tokio::test]
async fn should_fail_when_config_cache_disk_path_is_invalid() {
let tmp = tempfile::tempdir().unwrap();
let bad_path = tmp.path().join("not-a-dir");
std::fs::write(&bad_path, b"").unwrap();
let config = config_with_invalid_block_cache_disk_path(
&tmp.path().join("obj"),
bad_path.to_str().unwrap(),
);
let handle = tokio::spawn(async move {
let _ = StorageBuilder::new(&config).await.unwrap().build().await;
});
let result = handle.await;
assert!(
result.is_err() && result.unwrap_err().is_panic(),
"expected foyer to panic on invalid disk_path"
);
}
#[tokio::test]
async fn should_fail_reader_when_config_cache_disk_path_is_invalid() {
let tmp = tempfile::tempdir().unwrap();
let bad_path = tmp.path().join("not-a-dir");
std::fs::write(&bad_path, b"").unwrap();
let slate_config = SlateDbStorageConfig {
path: "data".to_string(),
object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: tmp.path().join("obj").to_str().unwrap().to_string(),
}),
settings_path: None,
block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
memory_capacity: 1024 * 1024,
disk_capacity: 4 * 1024 * 1024,
disk_path: bad_path.to_str().unwrap().to_string(),
})),
};
let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
block_cache: None,
..slate_config.clone()
}))
.await
.unwrap()
.build()
.await
.unwrap();
drop(writer);
let handle = tokio::spawn(async move {
let _ = create_storage_read(
&StorageConfig::SlateDb(slate_config),
StorageReaderRuntime::new(),
StorageSemantics::new(),
slatedb::config::DbReaderOptions::default(),
)
.await;
});
let result = handle.await;
assert!(
result.is_err() && result.unwrap_err().is_panic(),
"expected foyer to panic on invalid disk_path for reader"
);
}
#[tokio::test]
async fn reader_runtime_cache_should_take_precedence_over_config_cache() {
let tmp = tempfile::tempdir().unwrap();
let bad_path = tmp.path().join("not-a-dir");
std::fs::write(&bad_path, b"").unwrap();
let slate_config = SlateDbStorageConfig {
path: "data".to_string(),
object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
path: tmp.path().join("obj").to_str().unwrap().to_string(),
}),
settings_path: None,
block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
memory_capacity: 1024 * 1024,
disk_capacity: 4 * 1024 * 1024,
disk_path: bad_path.to_str().unwrap().to_string(),
})),
};
let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
block_cache: None,
..slate_config.clone()
}))
.await
.unwrap()
.build()
.await
.unwrap();
drop(writer);
let runtime_cache = FoyerCache::new_with_opts(FoyerCacheOptions {
max_capacity: 1024 * 1024,
shards: 1,
});
let runtime = StorageReaderRuntime::new().with_block_cache(Arc::new(runtime_cache));
let result = create_storage_read(
&StorageConfig::SlateDb(slate_config),
runtime,
StorageSemantics::new(),
slatedb::config::DbReaderOptions::default(),
)
.await;
assert!(
result.is_ok(),
"reader runtime cache should take precedence, skipping invalid config cache"
);
}
#[tokio::test]
async fn should_return_none_when_no_block_cache_configured() {
let result = create_block_cache_from_config(&None).await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn should_work_without_block_cache() {
let tmp = tempfile::tempdir().unwrap();
let config = slatedb_config_with_local_dir(tmp.path());
let storage = StorageBuilder::new(&config).await.unwrap().build().await;
assert!(storage.is_ok());
}
}