1use std::sync::Arc;
7
8use super::config::{BlockCacheConfig, ObjectStoreConfig, StorageConfig};
9use super::in_memory::InMemoryStorage;
10use super::slate::{SlateDbStorage, SlateDbStorageReader};
11use super::{MergeOperator, Storage, StorageError, StorageRead, StorageResult};
12use slatedb::DbReader;
13use slatedb::config::Settings;
14pub use slatedb::db_cache::CachedEntry;
15use slatedb::db_cache::DbCache;
16pub use slatedb::db_cache::foyer::{FoyerCache, FoyerCacheOptions};
17pub use slatedb::db_cache::foyer_hybrid::FoyerHybridCache;
18use slatedb::object_store::{self, ObjectStore};
19pub use slatedb::{CompactorBuilder, DbBuilder};
20use tracing::info;
21
22pub struct StorageBuilder {
52 inner: StorageBuilderInner,
53 semantics: StorageSemantics,
54}
55
56enum StorageBuilderInner {
57 InMemory,
58 SlateDb(Box<DbBuilder<String>>),
59}
60
61impl StorageBuilder {
62 pub async fn new(config: &StorageConfig) -> StorageResult<Self> {
69 let inner = match config {
70 StorageConfig::InMemory => StorageBuilderInner::InMemory,
71 StorageConfig::SlateDb(slate_config) => {
72 let object_store = create_object_store(&slate_config.object_store)?;
73 let settings = match &slate_config.settings_path {
74 Some(path) => Settings::from_file(path).map_err(|e| {
75 StorageError::Storage(format!(
76 "Failed to load SlateDB settings from {}: {}",
77 path, e
78 ))
79 })?,
80 None => Settings::load().unwrap_or_default(),
81 };
82 info!(
83 "create slatedb storage with config: {:?}, settings: {:?}",
84 slate_config, settings
85 );
86 let mut db_builder =
87 DbBuilder::new(slate_config.path.clone(), object_store).with_settings(settings);
88 if let Some(cache) =
89 create_block_cache_from_config(&slate_config.block_cache).await?
90 {
91 db_builder = db_builder.with_db_cache(cache);
92 }
93 StorageBuilderInner::SlateDb(Box::new(db_builder))
94 }
95 };
96 Ok(Self {
97 inner,
98 semantics: StorageSemantics::default(),
99 })
100 }
101
102 pub fn with_semantics(mut self, semantics: StorageSemantics) -> Self {
104 self.semantics = semantics;
105 self
106 }
107
108 pub fn map_slatedb(mut self, f: impl FnOnce(DbBuilder<String>) -> DbBuilder<String>) -> Self {
117 if let StorageBuilderInner::SlateDb(db) = self.inner {
118 self.inner = StorageBuilderInner::SlateDb(Box::new(f(*db)));
119 }
120 self
121 }
122
123 pub async fn build(self) -> StorageResult<Arc<dyn Storage>> {
127 match self.inner {
128 StorageBuilderInner::InMemory => {
129 let storage = match self.semantics.merge_operator {
130 Some(op) => InMemoryStorage::with_merge_operator(op),
131 None => InMemoryStorage::new(),
132 };
133 Ok(Arc::new(storage))
134 }
135 StorageBuilderInner::SlateDb(db_builder) => {
136 let mut db_builder = *db_builder;
137 if let Some(op) = self.semantics.merge_operator {
138 let adapter = SlateDbStorage::merge_operator_adapter(op);
139 db_builder = db_builder.with_merge_operator(Arc::new(adapter));
140 }
141 let db = db_builder.build().await.map_err(|e| {
142 StorageError::Storage(format!("Failed to create SlateDB: {}", e))
143 })?;
144 Ok(Arc::new(SlateDbStorage::new(Arc::new(db))))
145 }
146 }
147 }
148}
149
150#[derive(Default)]
156pub struct StorageReaderRuntime {
157 pub(crate) block_cache: Option<Arc<dyn DbCache>>,
158}
159
160impl StorageReaderRuntime {
161 pub fn new() -> Self {
163 Self::default()
164 }
165
166 pub fn with_block_cache(mut self, cache: Arc<dyn DbCache>) -> Self {
174 self.block_cache = Some(cache);
175 self
176 }
177}
178
179#[derive(Default)]
202pub struct StorageSemantics {
203 pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
204}
205
206impl StorageSemantics {
207 pub fn new() -> Self {
209 Self::default()
210 }
211
212 pub fn with_merge_operator(mut self, op: Arc<dyn MergeOperator>) -> Self {
217 self.merge_operator = Some(op);
218 self
219 }
220}
221
222pub fn create_object_store(config: &ObjectStoreConfig) -> StorageResult<Arc<dyn ObjectStore>> {
227 match config {
228 ObjectStoreConfig::InMemory => Ok(Arc::new(object_store::memory::InMemory::new())),
229 ObjectStoreConfig::Aws(aws_config) => {
230 let store = object_store::aws::AmazonS3Builder::from_env()
231 .with_region(&aws_config.region)
232 .with_bucket_name(&aws_config.bucket)
233 .build()
234 .map_err(|e| {
235 StorageError::Storage(format!("Failed to create AWS S3 store: {}", e))
236 })?;
237 Ok(Arc::new(store))
238 }
239 ObjectStoreConfig::Local(local_config) => {
240 std::fs::create_dir_all(&local_config.path).map_err(|e| {
241 StorageError::Storage(format!(
242 "Failed to create storage directory '{}': {}",
243 local_config.path, e
244 ))
245 })?;
246 let store = object_store::local::LocalFileSystem::new_with_prefix(&local_config.path)
247 .map_err(|e| {
248 StorageError::Storage(format!("Failed to create local filesystem store: {}", e))
249 })?;
250 Ok(Arc::new(store))
251 }
252 }
253}
254
255pub async fn create_storage_read(
273 config: &StorageConfig,
274 runtime: StorageReaderRuntime,
275 semantics: StorageSemantics,
276 reader_options: slatedb::config::DbReaderOptions,
277) -> StorageResult<Arc<dyn StorageRead>> {
278 match config {
279 StorageConfig::InMemory => {
280 let storage = match semantics.merge_operator {
282 Some(op) => InMemoryStorage::with_merge_operator(op),
283 None => InMemoryStorage::new(),
284 };
285 Ok(Arc::new(storage))
286 }
287 StorageConfig::SlateDb(slate_config) => {
288 let object_store = create_object_store(&slate_config.object_store)?;
289
290 let mut options = reader_options;
291 if let Some(op) = semantics.merge_operator {
292 let adapter = SlateDbStorage::merge_operator_adapter(op);
293 options.merge_operator = Some(Arc::new(adapter));
294 }
295 if let Some(cache) = runtime.block_cache {
297 options.block_cache = Some(cache);
298 } else if let Some(cache) =
299 create_block_cache_from_config(&slate_config.block_cache).await?
300 {
301 options.block_cache = Some(cache);
302 }
303 let reader = DbReader::open(
304 slate_config.path.clone(),
305 object_store,
306 None, options,
308 )
309 .await
310 .map_err(|e| {
311 StorageError::Storage(format!("Failed to create SlateDB reader: {}", e))
312 })?;
313 Ok(Arc::new(SlateDbStorageReader::new(Arc::new(reader))))
314 }
315 }
316}
317
318async fn create_block_cache_from_config(
320 config: &Option<BlockCacheConfig>,
321) -> StorageResult<Option<Arc<dyn DbCache>>> {
322 let Some(config) = config else {
323 return Ok(None);
324 };
325 match config {
326 BlockCacheConfig::FoyerHybrid(foyer_config) => {
327 use foyer::{DirectFsDeviceOptions, Engine, HybridCacheBuilder};
328
329 let memory_capacity = usize::try_from(foyer_config.memory_capacity).map_err(|_| {
330 StorageError::Storage(format!(
331 "memory_capacity {} exceeds usize::MAX on this platform",
332 foyer_config.memory_capacity
333 ))
334 })?;
335 let disk_capacity = usize::try_from(foyer_config.disk_capacity).map_err(|_| {
336 StorageError::Storage(format!(
337 "disk_capacity {} exceeds usize::MAX on this platform",
338 foyer_config.disk_capacity
339 ))
340 })?;
341
342 let cache = HybridCacheBuilder::new()
343 .with_name("slatedb_block_cache")
344 .memory(memory_capacity)
345 .with_weighter(|_, v: &CachedEntry| v.size())
346 .storage(Engine::large())
347 .with_device_options(
348 DirectFsDeviceOptions::new(&foyer_config.disk_path)
349 .with_capacity(disk_capacity),
350 )
351 .build()
352 .await
353 .map_err(|e| {
354 StorageError::Storage(format!("Failed to create hybrid cache: {}", e))
355 })?;
356
357 info!(
358 memory_mb = foyer_config.memory_capacity / (1024 * 1024),
359 disk_mb = foyer_config.disk_capacity / (1024 * 1024),
360 disk_path = %foyer_config.disk_path,
361 "hybrid block cache enabled"
362 );
363
364 Ok(Some(
365 Arc::new(FoyerHybridCache::new_with_cache(cache)) as Arc<dyn DbCache>
366 ))
367 }
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::storage::config::{
375 FoyerHybridCacheConfig, LocalObjectStoreConfig, SlateDbStorageConfig,
376 };
377
378 fn slatedb_config_with_local_dir(dir: &std::path::Path) -> StorageConfig {
379 StorageConfig::SlateDb(SlateDbStorageConfig {
380 path: "data".to_string(),
381 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
382 path: dir.to_str().unwrap().to_string(),
383 }),
384 settings_path: None,
385 block_cache: None,
386 })
387 }
388
389 #[tokio::test]
390 async fn should_create_storage_with_block_cache_from_config() {
391 let tmp = tempfile::tempdir().unwrap();
392 let cache_dir = tmp.path().join("block-cache");
393 std::fs::create_dir_all(&cache_dir).unwrap();
394
395 let config = StorageConfig::SlateDb(SlateDbStorageConfig {
396 path: "data".to_string(),
397 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
398 path: tmp.path().join("obj").to_str().unwrap().to_string(),
399 }),
400 settings_path: None,
401 block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
402 memory_capacity: 1024 * 1024,
403 disk_capacity: 4 * 1024 * 1024,
404 disk_path: cache_dir.to_str().unwrap().to_string(),
405 })),
406 });
407
408 let storage = StorageBuilder::new(&config).await.unwrap().build().await;
409
410 assert!(
411 storage.is_ok(),
412 "expected config-driven block cache to work"
413 );
414 }
415
416 #[tokio::test]
417 async fn should_create_reader_with_block_cache_from_config() {
418 let tmp = tempfile::tempdir().unwrap();
419 let cache_dir = tmp.path().join("block-cache");
420 std::fs::create_dir_all(&cache_dir).unwrap();
421
422 let slate_config = SlateDbStorageConfig {
423 path: "data".to_string(),
424 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
425 path: tmp.path().join("obj").to_str().unwrap().to_string(),
426 }),
427 settings_path: None,
428 block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
429 memory_capacity: 1024 * 1024,
430 disk_capacity: 4 * 1024 * 1024,
431 disk_path: cache_dir.to_str().unwrap().to_string(),
432 })),
433 };
434
435 let writer = StorageBuilder::new(&StorageConfig::SlateDb(slate_config.clone()))
437 .await
438 .unwrap()
439 .build()
440 .await
441 .unwrap();
442 drop(writer);
444
445 let reader = create_storage_read(
446 &StorageConfig::SlateDb(slate_config),
447 StorageReaderRuntime::new(),
448 StorageSemantics::new(),
449 slatedb::config::DbReaderOptions::default(),
450 )
451 .await;
452
453 assert!(
454 reader.is_ok(),
455 "expected config-driven block cache on reader to work"
456 );
457 }
458
459 #[cfg(target_pointer_width = "32")]
460 #[tokio::test]
461 async fn should_error_when_capacity_exceeds_usize() {
462 let config = BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
465 memory_capacity: u64::MAX,
466 disk_capacity: 4 * 1024 * 1024,
467 disk_path: "/tmp/unused".to_string(),
468 });
469
470 let result = create_block_cache_from_config(&Some(config)).await;
471 assert!(result.is_err());
472 }
473
474 fn config_with_invalid_block_cache_disk_path(
477 obj_dir: &std::path::Path,
478 bad_disk_path: &str,
479 ) -> StorageConfig {
480 StorageConfig::SlateDb(SlateDbStorageConfig {
481 path: "data".to_string(),
482 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
483 path: obj_dir.to_str().unwrap().to_string(),
484 }),
485 settings_path: None,
486 block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
487 memory_capacity: 1024 * 1024,
488 disk_capacity: 4 * 1024 * 1024,
489 disk_path: bad_disk_path.to_string(),
490 })),
491 })
492 }
493
494 #[tokio::test]
498 async fn should_fail_when_config_cache_disk_path_is_invalid() {
499 let tmp = tempfile::tempdir().unwrap();
500 let bad_path = tmp.path().join("not-a-dir");
502 std::fs::write(&bad_path, b"").unwrap();
503
504 let config = config_with_invalid_block_cache_disk_path(
505 &tmp.path().join("obj"),
506 bad_path.to_str().unwrap(),
507 );
508
509 let handle = tokio::spawn(async move {
511 let _ = StorageBuilder::new(&config).await.unwrap().build().await;
512 });
513 let result = handle.await;
514 assert!(
515 result.is_err() && result.unwrap_err().is_panic(),
516 "expected foyer to panic on invalid disk_path"
517 );
518 }
519
520 #[tokio::test]
521 async fn should_fail_reader_when_config_cache_disk_path_is_invalid() {
522 let tmp = tempfile::tempdir().unwrap();
523 let bad_path = tmp.path().join("not-a-dir");
524 std::fs::write(&bad_path, b"").unwrap();
525
526 let slate_config = SlateDbStorageConfig {
527 path: "data".to_string(),
528 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
529 path: tmp.path().join("obj").to_str().unwrap().to_string(),
530 }),
531 settings_path: None,
532 block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
533 memory_capacity: 1024 * 1024,
534 disk_capacity: 4 * 1024 * 1024,
535 disk_path: bad_path.to_str().unwrap().to_string(),
536 })),
537 };
538
539 let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
541 block_cache: None,
542 ..slate_config.clone()
543 }))
544 .await
545 .unwrap()
546 .build()
547 .await
548 .unwrap();
549 drop(writer);
550
551 let handle = tokio::spawn(async move {
553 let _ = create_storage_read(
554 &StorageConfig::SlateDb(slate_config),
555 StorageReaderRuntime::new(),
556 StorageSemantics::new(),
557 slatedb::config::DbReaderOptions::default(),
558 )
559 .await;
560 });
561 let result = handle.await;
562 assert!(
563 result.is_err() && result.unwrap_err().is_panic(),
564 "expected foyer to panic on invalid disk_path for reader"
565 );
566 }
567
568 #[tokio::test]
569 async fn reader_runtime_cache_should_take_precedence_over_config_cache() {
570 let tmp = tempfile::tempdir().unwrap();
571 let bad_path = tmp.path().join("not-a-dir");
572 std::fs::write(&bad_path, b"").unwrap();
573
574 let slate_config = SlateDbStorageConfig {
575 path: "data".to_string(),
576 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
577 path: tmp.path().join("obj").to_str().unwrap().to_string(),
578 }),
579 settings_path: None,
580 block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
581 memory_capacity: 1024 * 1024,
582 disk_capacity: 4 * 1024 * 1024,
583 disk_path: bad_path.to_str().unwrap().to_string(),
584 })),
585 };
586
587 let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
589 block_cache: None,
590 ..slate_config.clone()
591 }))
592 .await
593 .unwrap()
594 .build()
595 .await
596 .unwrap();
597 drop(writer);
598
599 let runtime_cache = FoyerCache::new_with_opts(FoyerCacheOptions {
601 max_capacity: 1024 * 1024,
602 shards: 1,
603 });
604 let runtime = StorageReaderRuntime::new().with_block_cache(Arc::new(runtime_cache));
605
606 let result = create_storage_read(
607 &StorageConfig::SlateDb(slate_config),
608 runtime,
609 StorageSemantics::new(),
610 slatedb::config::DbReaderOptions::default(),
611 )
612 .await;
613
614 assert!(
615 result.is_ok(),
616 "reader runtime cache should take precedence, skipping invalid config cache"
617 );
618 }
619
620 #[tokio::test]
621 async fn should_return_none_when_no_block_cache_configured() {
622 let result = create_block_cache_from_config(&None).await.unwrap();
623 assert!(result.is_none());
624 }
625
626 #[tokio::test]
627 async fn should_work_without_block_cache() {
628 let tmp = tempfile::tempdir().unwrap();
629 let config = slatedb_config_with_local_dir(tmp.path());
630
631 let storage = StorageBuilder::new(&config).await.unwrap().build().await;
632
633 assert!(storage.is_ok());
634 }
635}