1use std::sync::Arc;
7
8use super::config::{BlockCacheConfig, ObjectStoreConfig, StorageConfig};
9use super::in_memory::InMemoryStorage;
10use super::metrics_recorder::{MetricsRsRecorder, MixtricsBridge as MetricsRsRegistry};
11use super::slate::{SlateDbStorage, SlateDbStorageReader};
12use super::{MergeOperator, Storage, StorageError, StorageRead, StorageResult};
13use slatedb::DbReader;
14use slatedb::config::Settings;
15pub use slatedb::db_cache::CachedEntry;
16pub use slatedb::db_cache::foyer::{FoyerCache, FoyerCacheOptions};
17pub use slatedb::db_cache::foyer_hybrid::FoyerHybridCache;
18use slatedb::db_cache::{CachedKey, DbCache};
19use slatedb::object_store::{self, ObjectStore};
20pub use slatedb::{CompactorBuilder, DbBuilder};
21use tracing::info;
22
23pub(crate) type OwnedHybridCache = foyer::HybridCache<CachedKey, CachedEntry>;
31
32struct ManagedBlockCache {
36 db_cache: Arc<dyn DbCache>,
37 hybrid: OwnedHybridCache,
38}
39
40pub struct StorageBuilder {
70 inner: StorageBuilderInner,
71 semantics: StorageSemantics,
72 managed_cache: Option<OwnedHybridCache>,
73}
74
75enum StorageBuilderInner {
76 InMemory,
77 SlateDb(Box<DbBuilder<String>>),
78}
79
80impl StorageBuilder {
81 pub async fn new(config: &StorageConfig) -> StorageResult<Self> {
88 let mut managed_cache: Option<OwnedHybridCache> = None;
89 let inner = match config {
90 StorageConfig::InMemory => StorageBuilderInner::InMemory,
91 StorageConfig::SlateDb(slate_config) => {
92 let object_store = create_object_store(&slate_config.object_store)?;
93 let settings = match &slate_config.settings_path {
94 Some(path) => Settings::from_file(path).map_err(|e| {
95 StorageError::Storage(format!(
96 "Failed to load SlateDB settings from {}: {}",
97 path, e
98 ))
99 })?,
100 None => Settings::load().unwrap_or_default(),
101 };
102 info!(
103 "create slatedb storage with config: {:?}, settings: {:?}",
104 slate_config, settings
105 );
106 let mut db_builder =
107 DbBuilder::new(slate_config.path.clone(), object_store).with_settings(settings);
108 if let Some(managed) =
109 create_block_cache_from_config(&slate_config.block_cache).await?
110 {
111 db_builder = db_builder.with_db_cache(managed.db_cache);
112 managed_cache = Some(managed.hybrid);
113 }
114 StorageBuilderInner::SlateDb(Box::new(db_builder))
115 }
116 };
117 Ok(Self {
118 inner,
119 semantics: StorageSemantics::default(),
120 managed_cache,
121 })
122 }
123
124 pub fn with_semantics(mut self, semantics: StorageSemantics) -> Self {
126 self.semantics = semantics;
127 self
128 }
129
130 pub fn map_slatedb(mut self, f: impl FnOnce(DbBuilder<String>) -> DbBuilder<String>) -> Self {
139 if let StorageBuilderInner::SlateDb(db) = self.inner {
140 self.inner = StorageBuilderInner::SlateDb(Box::new(f(*db)));
141 }
142 self
143 }
144
145 pub async fn build(self) -> StorageResult<Arc<dyn Storage>> {
149 match self.inner {
150 StorageBuilderInner::InMemory => {
151 let storage = match self.semantics.merge_operator {
152 Some(op) => InMemoryStorage::with_merge_operator(op),
153 None => InMemoryStorage::new(),
154 };
155 Ok(Arc::new(storage))
156 }
157 StorageBuilderInner::SlateDb(db_builder) => {
158 let mut db_builder = *db_builder;
159 db_builder = db_builder.with_metrics_recorder(Arc::new(MetricsRsRecorder));
160 if let Some(op) = self.semantics.merge_operator {
161 let adapter = SlateDbStorage::merge_operator_adapter(op);
162 db_builder = db_builder.with_merge_operator(Arc::new(adapter));
163 }
164 let db = db_builder.build().await.map_err(|e| {
165 StorageError::Storage(format!("Failed to create SlateDB: {}", e))
166 })?;
167 Ok(Arc::new(SlateDbStorage::new_with_managed_cache(
168 Arc::new(db),
169 self.managed_cache,
170 )))
171 }
172 }
173 }
174}
175
176#[derive(Default, Clone)]
182pub struct StorageReaderRuntime {
183 pub(crate) block_cache: Option<Arc<dyn DbCache>>,
184 pub(crate) object_store: Option<Arc<dyn ObjectStore>>,
185}
186
187impl StorageReaderRuntime {
188 pub fn new() -> Self {
190 Self::default()
191 }
192
193 pub fn with_block_cache(mut self, cache: Arc<dyn DbCache>) -> Self {
201 self.block_cache = Some(cache);
202 self
203 }
204
205 pub fn with_object_store(mut self, object_store: Arc<dyn ObjectStore>) -> Self {
206 self.object_store = Some(object_store);
207 self
208 }
209}
210
211#[derive(Default)]
234pub struct StorageSemantics {
235 pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
236}
237
238impl StorageSemantics {
239 pub fn new() -> Self {
241 Self::default()
242 }
243
244 pub fn with_merge_operator(mut self, op: Arc<dyn MergeOperator>) -> Self {
249 self.merge_operator = Some(op);
250 self
251 }
252}
253
254pub fn create_object_store(config: &ObjectStoreConfig) -> StorageResult<Arc<dyn ObjectStore>> {
259 match config {
260 ObjectStoreConfig::InMemory => Ok(Arc::new(object_store::memory::InMemory::new())),
261 ObjectStoreConfig::Aws(aws_config) => {
262 let store = object_store::aws::AmazonS3Builder::from_env()
263 .with_region(&aws_config.region)
264 .with_bucket_name(&aws_config.bucket)
265 .build()
266 .map_err(|e| {
267 StorageError::Storage(format!("Failed to create AWS S3 store: {}", e))
268 })?;
269 Ok(Arc::new(store))
270 }
271 ObjectStoreConfig::Local(local_config) => {
272 std::fs::create_dir_all(&local_config.path).map_err(|e| {
273 StorageError::Storage(format!(
274 "Failed to create storage directory '{}': {}",
275 local_config.path, e
276 ))
277 })?;
278 let store = object_store::local::LocalFileSystem::new_with_prefix(&local_config.path)
279 .map_err(|e| {
280 StorageError::Storage(format!("Failed to create local filesystem store: {}", e))
281 })?;
282 Ok(Arc::new(store))
283 }
284 }
285}
286
287pub async fn create_storage_read(
305 config: &StorageConfig,
306 runtime: StorageReaderRuntime,
307 semantics: StorageSemantics,
308 reader_options: slatedb::config::DbReaderOptions,
309) -> StorageResult<Arc<dyn StorageRead>> {
310 match config {
311 StorageConfig::InMemory => {
312 let storage = match semantics.merge_operator {
314 Some(op) => InMemoryStorage::with_merge_operator(op),
315 None => InMemoryStorage::new(),
316 };
317 Ok(Arc::new(storage))
318 }
319 StorageConfig::SlateDb(slate_config) => {
320 let object_store = if let Some(object_store) = &runtime.object_store {
321 object_store.clone()
322 } else {
323 create_object_store(&slate_config.object_store)?
324 };
325
326 let mut builder = DbReader::builder(slate_config.path.clone(), object_store)
327 .with_options(reader_options)
328 .with_metrics_recorder(Arc::new(MetricsRsRecorder));
329 if let Some(op) = semantics.merge_operator {
330 let adapter = SlateDbStorage::merge_operator_adapter(op);
331 builder = builder.with_merge_operator(Arc::new(adapter));
332 }
333 let mut managed_cache: Option<OwnedHybridCache> = None;
337 if let Some(cache) = runtime.block_cache {
338 builder = builder.with_db_cache(cache);
339 } else if let Some(managed) =
340 create_block_cache_from_config(&slate_config.block_cache).await?
341 {
342 builder = builder.with_db_cache(managed.db_cache);
343 managed_cache = Some(managed.hybrid);
344 }
345 let reader = builder.build().await.map_err(|e| {
346 StorageError::Storage(format!("Failed to create SlateDB reader: {}", e))
347 })?;
348 Ok(Arc::new(SlateDbStorageReader::new_with_managed_cache(
349 Arc::new(reader),
350 managed_cache,
351 )))
352 }
353 }
354}
355
356async fn create_block_cache_from_config(
361 config: &Option<BlockCacheConfig>,
362) -> StorageResult<Option<ManagedBlockCache>> {
363 let Some(config) = config else {
364 return Ok(None);
365 };
366 match config {
367 BlockCacheConfig::FoyerHybrid(foyer_config) => {
368 use foyer::{
369 DirectFsDeviceOptions, Engine, HybridCacheBuilder, HybridCachePolicy,
370 LargeEngineOptions,
371 };
372
373 let memory_capacity = usize::try_from(foyer_config.memory_capacity).map_err(|_| {
374 StorageError::Storage(format!(
375 "memory_capacity {} exceeds usize::MAX on this platform",
376 foyer_config.memory_capacity
377 ))
378 })?;
379 let disk_capacity = usize::try_from(foyer_config.disk_capacity).map_err(|_| {
380 StorageError::Storage(format!(
381 "disk_capacity {} exceeds usize::MAX on this platform",
382 foyer_config.disk_capacity
383 ))
384 })?;
385
386 let buffer_pool_size = usize::try_from(foyer_config.effective_buffer_pool_size())
387 .map_err(|_| {
388 StorageError::Storage(format!(
389 "buffer_pool_size {} exceeds usize::MAX on this platform",
390 foyer_config.effective_buffer_pool_size()
391 ))
392 })?;
393 let submit_queue_size_threshold =
394 usize::try_from(foyer_config.submit_queue_size_threshold).map_err(|_| {
395 StorageError::Storage(format!(
396 "submit_queue_size_threshold {} exceeds usize::MAX on this platform",
397 foyer_config.submit_queue_size_threshold
398 ))
399 })?;
400
401 let policy = match foyer_config.write_policy {
402 super::config::FoyerWritePolicy::WriteOnInsertion => {
403 HybridCachePolicy::WriteOnInsertion
404 }
405 super::config::FoyerWritePolicy::WriteOnEviction => {
406 HybridCachePolicy::WriteOnEviction
407 }
408 };
409
410 let cache = HybridCacheBuilder::new()
411 .with_name("slatedb_block_cache")
412 .with_metrics_registry(Box::new(MetricsRsRegistry))
413 .with_policy(policy)
414 .memory(memory_capacity)
415 .with_weighter(|_, v: &CachedEntry| v.size())
416 .storage(Engine::Large(
417 LargeEngineOptions::new()
418 .with_flushers(foyer_config.flushers)
419 .with_buffer_pool_size(buffer_pool_size)
420 .with_submit_queue_size_threshold(submit_queue_size_threshold),
421 ))
422 .with_device_options(
423 DirectFsDeviceOptions::new(&foyer_config.disk_path)
424 .with_capacity(disk_capacity),
425 )
426 .build()
427 .await
428 .map_err(|e| {
429 StorageError::Storage(format!("Failed to create hybrid cache: {}", e))
430 })?;
431
432 info!(
433 memory_mb = foyer_config.memory_capacity / (1024 * 1024),
434 disk_mb = foyer_config.disk_capacity / (1024 * 1024),
435 disk_path = %foyer_config.disk_path,
436 write_policy = ?foyer_config.write_policy,
437 flushers = foyer_config.flushers,
438 buffer_pool_mb = foyer_config.effective_buffer_pool_size() / (1024 * 1024),
439 submit_queue_threshold_mb =
440 foyer_config.submit_queue_size_threshold / (1024 * 1024),
441 "hybrid block cache enabled"
442 );
443
444 let db_cache =
445 Arc::new(FoyerHybridCache::new_with_cache(cache.clone())) as Arc<dyn DbCache>;
446 Ok(Some(ManagedBlockCache {
447 db_cache,
448 hybrid: cache,
449 }))
450 }
451 }
452}
453
454#[cfg(test)]
455mod tests {
456 use super::*;
457 use crate::storage::config::{
458 FoyerHybridCacheConfig, LocalObjectStoreConfig, SlateDbStorageConfig,
459 };
460
461 fn foyer_cache_config(
462 memory_capacity: u64,
463 disk_capacity: u64,
464 disk_path: String,
465 ) -> FoyerHybridCacheConfig {
466 FoyerHybridCacheConfig {
467 memory_capacity,
468 disk_capacity,
469 disk_path,
470 write_policy: Default::default(),
471 flushers: 4,
472 buffer_pool_size: None,
473 submit_queue_size_threshold: 1024 * 1024 * 1024,
474 }
475 }
476
477 fn slatedb_config_with_local_dir(dir: &std::path::Path) -> StorageConfig {
478 StorageConfig::SlateDb(SlateDbStorageConfig {
479 path: "data".to_string(),
480 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
481 path: dir.to_str().unwrap().to_string(),
482 }),
483 settings_path: None,
484 block_cache: None,
485 })
486 }
487
488 #[tokio::test]
489 async fn should_create_storage_with_block_cache_from_config() {
490 let tmp = tempfile::tempdir().unwrap();
491 let cache_dir = tmp.path().join("block-cache");
492 std::fs::create_dir_all(&cache_dir).unwrap();
493
494 let config = StorageConfig::SlateDb(SlateDbStorageConfig {
495 path: "data".to_string(),
496 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
497 path: tmp.path().join("obj").to_str().unwrap().to_string(),
498 }),
499 settings_path: None,
500 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
501 1024 * 1024,
502 4 * 1024 * 1024,
503 cache_dir.to_str().unwrap().to_string(),
504 ))),
505 });
506
507 let storage = StorageBuilder::new(&config).await.unwrap().build().await;
508
509 assert!(
510 storage.is_ok(),
511 "expected config-driven block cache to work"
512 );
513 }
514
515 #[tokio::test]
516 async fn should_create_reader_with_block_cache_from_config() {
517 let tmp = tempfile::tempdir().unwrap();
518 let cache_dir = tmp.path().join("block-cache");
519 std::fs::create_dir_all(&cache_dir).unwrap();
520
521 let slate_config = SlateDbStorageConfig {
522 path: "data".to_string(),
523 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
524 path: tmp.path().join("obj").to_str().unwrap().to_string(),
525 }),
526 settings_path: None,
527 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
528 1024 * 1024,
529 4 * 1024 * 1024,
530 cache_dir.to_str().unwrap().to_string(),
531 ))),
532 };
533
534 let writer = StorageBuilder::new(&StorageConfig::SlateDb(slate_config.clone()))
536 .await
537 .unwrap()
538 .build()
539 .await
540 .unwrap();
541 drop(writer);
543
544 let reader = create_storage_read(
545 &StorageConfig::SlateDb(slate_config),
546 StorageReaderRuntime::new(),
547 StorageSemantics::new(),
548 slatedb::config::DbReaderOptions::default(),
549 )
550 .await;
551
552 assert!(
553 reader.is_ok(),
554 "expected config-driven block cache on reader to work"
555 );
556 }
557
558 #[cfg(target_pointer_width = "32")]
559 #[tokio::test]
560 async fn should_error_when_capacity_exceeds_usize() {
561 let config = BlockCacheConfig::FoyerHybrid(foyer_cache_config(
564 u64::MAX,
565 4 * 1024 * 1024,
566 "/tmp/unused".to_string(),
567 ));
568
569 let result = create_block_cache_from_config(&Some(config)).await;
570 assert!(result.is_err());
571 }
572
573 fn config_with_invalid_block_cache_disk_path(
576 obj_dir: &std::path::Path,
577 bad_disk_path: &str,
578 ) -> StorageConfig {
579 StorageConfig::SlateDb(SlateDbStorageConfig {
580 path: "data".to_string(),
581 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
582 path: obj_dir.to_str().unwrap().to_string(),
583 }),
584 settings_path: None,
585 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
586 1024 * 1024,
587 4 * 1024 * 1024,
588 bad_disk_path.to_string(),
589 ))),
590 })
591 }
592
593 #[tokio::test]
597 async fn should_fail_when_config_cache_disk_path_is_invalid() {
598 let tmp = tempfile::tempdir().unwrap();
599 let bad_path = tmp.path().join("not-a-dir");
601 std::fs::write(&bad_path, b"").unwrap();
602
603 let config = config_with_invalid_block_cache_disk_path(
604 &tmp.path().join("obj"),
605 bad_path.to_str().unwrap(),
606 );
607
608 let handle = tokio::spawn(async move {
610 let _ = StorageBuilder::new(&config).await.unwrap().build().await;
611 });
612 let result = handle.await;
613 assert!(
614 result.is_err() && result.unwrap_err().is_panic(),
615 "expected foyer to panic on invalid disk_path"
616 );
617 }
618
619 #[tokio::test]
620 async fn should_fail_reader_when_config_cache_disk_path_is_invalid() {
621 let tmp = tempfile::tempdir().unwrap();
622 let bad_path = tmp.path().join("not-a-dir");
623 std::fs::write(&bad_path, b"").unwrap();
624
625 let slate_config = SlateDbStorageConfig {
626 path: "data".to_string(),
627 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
628 path: tmp.path().join("obj").to_str().unwrap().to_string(),
629 }),
630 settings_path: None,
631 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
632 1024 * 1024,
633 4 * 1024 * 1024,
634 bad_path.to_str().unwrap().to_string(),
635 ))),
636 };
637
638 let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
640 block_cache: None,
641 ..slate_config.clone()
642 }))
643 .await
644 .unwrap()
645 .build()
646 .await
647 .unwrap();
648 drop(writer);
649
650 let handle = tokio::spawn(async move {
652 let _ = create_storage_read(
653 &StorageConfig::SlateDb(slate_config),
654 StorageReaderRuntime::new(),
655 StorageSemantics::new(),
656 slatedb::config::DbReaderOptions::default(),
657 )
658 .await;
659 });
660 let result = handle.await;
661 assert!(
662 result.is_err() && result.unwrap_err().is_panic(),
663 "expected foyer to panic on invalid disk_path for reader"
664 );
665 }
666
667 #[tokio::test]
668 async fn reader_runtime_cache_should_take_precedence_over_config_cache() {
669 let tmp = tempfile::tempdir().unwrap();
670 let bad_path = tmp.path().join("not-a-dir");
671 std::fs::write(&bad_path, b"").unwrap();
672
673 let slate_config = SlateDbStorageConfig {
674 path: "data".to_string(),
675 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
676 path: tmp.path().join("obj").to_str().unwrap().to_string(),
677 }),
678 settings_path: None,
679 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
680 1024 * 1024,
681 4 * 1024 * 1024,
682 bad_path.to_str().unwrap().to_string(),
683 ))),
684 };
685
686 let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
688 block_cache: None,
689 ..slate_config.clone()
690 }))
691 .await
692 .unwrap()
693 .build()
694 .await
695 .unwrap();
696 drop(writer);
697
698 let runtime_cache = FoyerCache::new_with_opts(FoyerCacheOptions {
700 max_capacity: 1024 * 1024,
701 shards: 1,
702 });
703 let runtime = StorageReaderRuntime::new().with_block_cache(Arc::new(runtime_cache));
704
705 let result = create_storage_read(
706 &StorageConfig::SlateDb(slate_config),
707 runtime,
708 StorageSemantics::new(),
709 slatedb::config::DbReaderOptions::default(),
710 )
711 .await;
712
713 assert!(
714 result.is_ok(),
715 "reader runtime cache should take precedence, skipping invalid config cache"
716 );
717 }
718
719 #[tokio::test]
720 async fn should_return_none_when_no_block_cache_configured() {
721 let result = create_block_cache_from_config(&None).await.unwrap();
722 assert!(result.is_none());
723 }
724
725 #[tokio::test]
726 async fn should_work_without_block_cache() {
727 let tmp = tempfile::tempdir().unwrap();
728 let config = slatedb_config_with_local_dir(tmp.path());
729
730 let storage = StorageBuilder::new(&config).await.unwrap().build().await;
731
732 assert!(storage.is_ok());
733 }
734}