1use std::sync::Arc;
7
8use super::config::{BlockCacheConfig, ObjectStoreConfig, StorageConfig};
9use super::in_memory::InMemoryStorage;
10use super::metrics_recorder::{MetricsRsRecorder, MixtricsBridge as MetricsRsRegistry};
11use super::slate::{SlateDbStorage, SlateDbStorageReader};
12use super::{MergeOperator, Storage, StorageError, StorageRead, StorageResult};
13use slatedb::DbReader;
14use slatedb::config::Settings;
15pub use slatedb::db_cache::DbCache;
16pub use slatedb::db_cache::foyer::{FoyerCache, FoyerCacheOptions};
17pub use slatedb::db_cache::foyer_hybrid::FoyerHybridCache;
18pub use slatedb::db_cache::{CachedEntry, CachedKey};
19use slatedb::object_store::{self, ObjectStore};
20pub use slatedb::{CompactorBuilder, DbBuilder};
21use tracing::info;
22use uuid::Uuid;
23
24pub(crate) type OwnedHybridCache = foyer::HybridCache<CachedKey, CachedEntry>;
32
33struct ManagedBlockCache {
37 db_cache: Arc<dyn DbCache>,
38 hybrid: OwnedHybridCache,
39}
40
41pub struct StorageBuilder {
71 inner: StorageBuilderInner,
72 semantics: StorageSemantics,
73 managed_cache: Option<OwnedHybridCache>,
74}
75
76enum StorageBuilderInner {
77 InMemory,
78 SlateDb(Box<DbBuilder<String>>),
79}
80
81impl StorageBuilder {
82 pub async fn new(config: &StorageConfig) -> StorageResult<Self> {
89 let mut managed_cache: Option<OwnedHybridCache> = None;
90 let inner = match config {
91 StorageConfig::InMemory => StorageBuilderInner::InMemory,
92 StorageConfig::SlateDb(slate_config) => {
93 let object_store = create_object_store(&slate_config.object_store)?;
94 let settings = match &slate_config.settings_path {
95 Some(path) => Settings::from_file(path).map_err(|e| {
96 StorageError::Storage(format!(
97 "Failed to load SlateDB settings from {}: {}",
98 path, e
99 ))
100 })?,
101 None => Settings::load().unwrap_or_default(),
102 };
103 info!(
104 "create slatedb storage with config: {:?}, settings: {:?}",
105 slate_config, settings
106 );
107 let mut db_builder =
108 DbBuilder::new(slate_config.path.clone(), object_store).with_settings(settings);
109 if let Some(managed) =
110 create_block_cache_from_config(&slate_config.block_cache).await?
111 {
112 db_builder = db_builder.with_db_cache(managed.db_cache);
113 managed_cache = Some(managed.hybrid);
114 }
115 StorageBuilderInner::SlateDb(Box::new(db_builder))
116 }
117 };
118 Ok(Self {
119 inner,
120 semantics: StorageSemantics::default(),
121 managed_cache,
122 })
123 }
124
125 pub fn with_semantics(mut self, semantics: StorageSemantics) -> Self {
127 self.semantics = semantics;
128 self
129 }
130
131 pub fn map_slatedb(mut self, f: impl FnOnce(DbBuilder<String>) -> DbBuilder<String>) -> Self {
140 if let StorageBuilderInner::SlateDb(db) = self.inner {
141 self.inner = StorageBuilderInner::SlateDb(Box::new(f(*db)));
142 }
143 self
144 }
145
146 pub async fn build(self) -> StorageResult<Arc<dyn Storage>> {
150 match self.inner {
151 StorageBuilderInner::InMemory => {
152 let storage = match self.semantics.merge_operator {
153 Some(op) => InMemoryStorage::with_merge_operator(op),
154 None => InMemoryStorage::new(),
155 };
156 Ok(Arc::new(storage))
157 }
158 StorageBuilderInner::SlateDb(db_builder) => {
159 let mut db_builder = *db_builder;
160 db_builder = db_builder.with_metrics_recorder(Arc::new(MetricsRsRecorder));
161 if let Some(op) = self.semantics.merge_operator {
162 let adapter = SlateDbStorage::merge_operator_adapter(op);
163 db_builder = db_builder.with_merge_operator(Arc::new(adapter));
164 }
165 let db = db_builder.build().await.map_err(|e| {
166 StorageError::Storage(format!("Failed to create SlateDB: {}", e))
167 })?;
168 Ok(Arc::new(SlateDbStorage::new_with_managed_cache(
169 Arc::new(db),
170 self.managed_cache,
171 )))
172 }
173 }
174 }
175}
176
177#[derive(Default, Clone)]
183pub struct StorageReaderRuntime {
184 pub(crate) block_cache: Option<Arc<dyn DbCache>>,
185 pub(crate) object_store: Option<Arc<dyn ObjectStore>>,
186 pub(crate) checkpoint_id: Option<Uuid>,
187}
188
189impl StorageReaderRuntime {
190 pub fn new() -> Self {
192 Self::default()
193 }
194
195 pub fn block_cache(&self) -> Option<Arc<dyn DbCache>> {
196 self.block_cache.clone()
197 }
198
199 pub fn with_block_cache(mut self, cache: Arc<dyn DbCache>) -> Self {
207 self.block_cache = Some(cache);
208 self
209 }
210
211 pub fn with_object_store(mut self, object_store: Arc<dyn ObjectStore>) -> Self {
212 self.object_store = Some(object_store);
213 self
214 }
215
216 pub fn with_checkpoint_id(mut self, id: Uuid) -> Self {
224 self.checkpoint_id = Some(id);
225 self
226 }
227}
228
229#[derive(Default)]
252pub struct StorageSemantics {
253 pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
254}
255
256impl StorageSemantics {
257 pub fn new() -> Self {
259 Self::default()
260 }
261
262 pub fn with_merge_operator(mut self, op: Arc<dyn MergeOperator>) -> Self {
267 self.merge_operator = Some(op);
268 self
269 }
270}
271
272pub fn create_object_store(config: &ObjectStoreConfig) -> StorageResult<Arc<dyn ObjectStore>> {
277 match config {
278 ObjectStoreConfig::InMemory => Ok(Arc::new(object_store::memory::InMemory::new())),
279 ObjectStoreConfig::Aws(aws_config) => {
280 let store = object_store::aws::AmazonS3Builder::from_env()
281 .with_region(&aws_config.region)
282 .with_bucket_name(&aws_config.bucket)
283 .build()
284 .map_err(|e| {
285 StorageError::Storage(format!("Failed to create AWS S3 store: {}", e))
286 })?;
287 Ok(Arc::new(store))
288 }
289 ObjectStoreConfig::Local(local_config) => {
290 std::fs::create_dir_all(&local_config.path).map_err(|e| {
291 StorageError::Storage(format!(
292 "Failed to create storage directory '{}': {}",
293 local_config.path, e
294 ))
295 })?;
296 let store = object_store::local::LocalFileSystem::new_with_prefix(&local_config.path)
297 .map_err(|e| {
298 StorageError::Storage(format!("Failed to create local filesystem store: {}", e))
299 })?;
300 Ok(Arc::new(store))
301 }
302 }
303}
304
305pub async fn create_storage_read(
323 config: &StorageConfig,
324 runtime: StorageReaderRuntime,
325 semantics: StorageSemantics,
326 reader_options: slatedb::config::DbReaderOptions,
327) -> StorageResult<Arc<dyn StorageRead>> {
328 match config {
329 StorageConfig::InMemory => {
330 let storage = match semantics.merge_operator {
332 Some(op) => InMemoryStorage::with_merge_operator(op),
333 None => InMemoryStorage::new(),
334 };
335 Ok(Arc::new(storage))
336 }
337 StorageConfig::SlateDb(slate_config) => {
338 let object_store = if let Some(object_store) = &runtime.object_store {
339 object_store.clone()
340 } else {
341 create_object_store(&slate_config.object_store)?
342 };
343
344 let mut builder = DbReader::builder(slate_config.path.clone(), object_store)
345 .with_options(reader_options)
346 .with_metrics_recorder(Arc::new(MetricsRsRecorder));
347 if let Some(checkpoint_id) = runtime.checkpoint_id {
348 builder = builder.with_checkpoint_id(checkpoint_id);
349 }
350 if let Some(op) = semantics.merge_operator {
351 let adapter = SlateDbStorage::merge_operator_adapter(op);
352 builder = builder.with_merge_operator(Arc::new(adapter));
353 }
354 let mut managed_cache: Option<OwnedHybridCache> = None;
358 if let Some(cache) = runtime.block_cache {
359 builder = builder.with_db_cache(cache);
360 } else if let Some(managed) =
361 create_block_cache_from_config(&slate_config.block_cache).await?
362 {
363 builder = builder.with_db_cache(managed.db_cache);
364 managed_cache = Some(managed.hybrid);
365 }
366 let reader = builder.build().await.map_err(|e| {
367 StorageError::Storage(format!("Failed to create SlateDB reader: {}", e))
368 })?;
369 Ok(Arc::new(SlateDbStorageReader::new_with_managed_cache(
370 Arc::new(reader),
371 managed_cache,
372 )))
373 }
374 }
375}
376
377async fn create_block_cache_from_config(
382 config: &Option<BlockCacheConfig>,
383) -> StorageResult<Option<ManagedBlockCache>> {
384 let Some(config) = config else {
385 return Ok(None);
386 };
387 match config {
388 BlockCacheConfig::FoyerHybrid(foyer_config) => {
389 use foyer::{
390 BlockEngineConfig, DeviceBuilder, FsDeviceBuilder, HybridCacheBuilder,
391 HybridCachePolicy, PsyncIoEngineConfig,
392 };
393
394 let memory_capacity = usize::try_from(foyer_config.memory_capacity).map_err(|_| {
395 StorageError::Storage(format!(
396 "memory_capacity {} exceeds usize::MAX on this platform",
397 foyer_config.memory_capacity
398 ))
399 })?;
400 let disk_capacity = usize::try_from(foyer_config.disk_capacity).map_err(|_| {
401 StorageError::Storage(format!(
402 "disk_capacity {} exceeds usize::MAX on this platform",
403 foyer_config.disk_capacity
404 ))
405 })?;
406
407 let buffer_pool_size = usize::try_from(foyer_config.effective_buffer_pool_size())
408 .map_err(|_| {
409 StorageError::Storage(format!(
410 "buffer_pool_size {} exceeds usize::MAX on this platform",
411 foyer_config.effective_buffer_pool_size()
412 ))
413 })?;
414 let submit_queue_size_threshold =
415 usize::try_from(foyer_config.submit_queue_size_threshold).map_err(|_| {
416 StorageError::Storage(format!(
417 "submit_queue_size_threshold {} exceeds usize::MAX on this platform",
418 foyer_config.submit_queue_size_threshold
419 ))
420 })?;
421
422 let policy = match foyer_config.write_policy {
423 super::config::FoyerWritePolicy::WriteOnInsertion => {
424 HybridCachePolicy::WriteOnInsertion
425 }
426 super::config::FoyerWritePolicy::WriteOnEviction => {
427 HybridCachePolicy::WriteOnEviction
428 }
429 };
430
431 let device = {
432 #[cfg(target_os = "linux")]
433 let builder = FsDeviceBuilder::new(&foyer_config.disk_path)
434 .with_capacity(disk_capacity)
435 .with_direct(true);
436 #[cfg(not(target_os = "linux"))]
437 let builder =
438 FsDeviceBuilder::new(&foyer_config.disk_path).with_capacity(disk_capacity);
439 builder.build().map_err(|e| {
440 StorageError::Storage(format!("Failed to build foyer device: {}", e))
441 })?
442 };
443
444 let cache = HybridCacheBuilder::new()
445 .with_name("slatedb_block_cache")
446 .with_metrics_registry(Box::new(MetricsRsRegistry))
447 .with_policy(policy)
448 .memory(memory_capacity)
449 .with_weighter(|_, v: &CachedEntry| v.size())
450 .storage()
451 .with_io_engine_config(PsyncIoEngineConfig::new())
452 .with_engine_config(
453 BlockEngineConfig::new(device)
454 .with_flushers(foyer_config.flushers)
455 .with_buffer_pool_size(buffer_pool_size)
456 .with_submit_queue_size_threshold(submit_queue_size_threshold),
457 )
458 .build()
459 .await
460 .map_err(|e| {
461 StorageError::Storage(format!("Failed to create hybrid cache: {}", e))
462 })?;
463
464 info!(
465 memory_mb = foyer_config.memory_capacity / (1024 * 1024),
466 disk_mb = foyer_config.disk_capacity / (1024 * 1024),
467 disk_path = %foyer_config.disk_path,
468 write_policy = ?foyer_config.write_policy,
469 flushers = foyer_config.flushers,
470 buffer_pool_mb = foyer_config.effective_buffer_pool_size() / (1024 * 1024),
471 submit_queue_threshold_mb =
472 foyer_config.submit_queue_size_threshold / (1024 * 1024),
473 "hybrid block cache enabled"
474 );
475
476 let db_cache =
477 Arc::new(FoyerHybridCache::new_with_cache(cache.clone())) as Arc<dyn DbCache>;
478 Ok(Some(ManagedBlockCache {
479 db_cache,
480 hybrid: cache,
481 }))
482 }
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489 use crate::storage::config::{
490 FoyerHybridCacheConfig, LocalObjectStoreConfig, SlateDbStorageConfig,
491 };
492
493 fn foyer_cache_config(
494 memory_capacity: u64,
495 disk_capacity: u64,
496 disk_path: String,
497 ) -> FoyerHybridCacheConfig {
498 FoyerHybridCacheConfig {
499 memory_capacity,
500 disk_capacity,
501 disk_path,
502 write_policy: Default::default(),
503 flushers: 4,
504 buffer_pool_size: None,
505 submit_queue_size_threshold: 1024 * 1024 * 1024,
506 }
507 }
508
509 fn slatedb_config_with_local_dir(dir: &std::path::Path) -> StorageConfig {
510 StorageConfig::SlateDb(SlateDbStorageConfig {
511 path: "data".to_string(),
512 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
513 path: dir.to_str().unwrap().to_string(),
514 }),
515 settings_path: None,
516 block_cache: None,
517 })
518 }
519
520 #[tokio::test]
521 async fn should_create_storage_with_block_cache_from_config() {
522 let tmp = tempfile::tempdir().unwrap();
523 let cache_dir = tmp.path().join("block-cache");
524 std::fs::create_dir_all(&cache_dir).unwrap();
525
526 let config = StorageConfig::SlateDb(SlateDbStorageConfig {
527 path: "data".to_string(),
528 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
529 path: tmp.path().join("obj").to_str().unwrap().to_string(),
530 }),
531 settings_path: None,
532 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
533 1024 * 1024,
534 4 * 1024 * 1024,
535 cache_dir.to_str().unwrap().to_string(),
536 ))),
537 });
538
539 let storage = StorageBuilder::new(&config).await.unwrap().build().await;
540
541 assert!(
542 storage.is_ok(),
543 "expected config-driven block cache to work"
544 );
545 }
546
547 #[tokio::test]
548 async fn should_create_reader_with_block_cache_from_config() {
549 let tmp = tempfile::tempdir().unwrap();
550 let cache_dir = tmp.path().join("block-cache");
551 std::fs::create_dir_all(&cache_dir).unwrap();
552
553 let slate_config = SlateDbStorageConfig {
554 path: "data".to_string(),
555 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
556 path: tmp.path().join("obj").to_str().unwrap().to_string(),
557 }),
558 settings_path: None,
559 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
560 1024 * 1024,
561 4 * 1024 * 1024,
562 cache_dir.to_str().unwrap().to_string(),
563 ))),
564 };
565
566 let writer = StorageBuilder::new(&StorageConfig::SlateDb(slate_config.clone()))
568 .await
569 .unwrap()
570 .build()
571 .await
572 .unwrap();
573 drop(writer);
575
576 let reader = create_storage_read(
577 &StorageConfig::SlateDb(slate_config),
578 StorageReaderRuntime::new(),
579 StorageSemantics::new(),
580 slatedb::config::DbReaderOptions::default(),
581 )
582 .await;
583
584 assert!(
585 reader.is_ok(),
586 "expected config-driven block cache on reader to work"
587 );
588 }
589
590 #[cfg(target_pointer_width = "32")]
591 #[tokio::test]
592 async fn should_error_when_capacity_exceeds_usize() {
593 let config = BlockCacheConfig::FoyerHybrid(foyer_cache_config(
596 u64::MAX,
597 4 * 1024 * 1024,
598 "/tmp/unused".to_string(),
599 ));
600
601 let result = create_block_cache_from_config(&Some(config)).await;
602 assert!(result.is_err());
603 }
604
605 fn config_with_invalid_block_cache_disk_path(
608 obj_dir: &std::path::Path,
609 bad_disk_path: &str,
610 ) -> StorageConfig {
611 StorageConfig::SlateDb(SlateDbStorageConfig {
612 path: "data".to_string(),
613 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
614 path: obj_dir.to_str().unwrap().to_string(),
615 }),
616 settings_path: None,
617 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
618 1024 * 1024,
619 4 * 1024 * 1024,
620 bad_disk_path.to_string(),
621 ))),
622 })
623 }
624
625 #[tokio::test]
629 async fn should_fail_when_config_cache_disk_path_is_invalid() {
630 let tmp = tempfile::tempdir().unwrap();
631 let bad_path = tmp.path().join("not-a-dir");
633 std::fs::write(&bad_path, b"").unwrap();
634
635 let config = config_with_invalid_block_cache_disk_path(
636 &tmp.path().join("obj"),
637 bad_path.to_str().unwrap(),
638 );
639
640 let handle = tokio::spawn(async move {
642 let _ = StorageBuilder::new(&config).await.unwrap().build().await;
643 });
644 let result = handle.await;
645 assert!(
646 result.is_err() && result.unwrap_err().is_panic(),
647 "expected foyer to panic on invalid disk_path"
648 );
649 }
650
651 #[tokio::test]
652 async fn should_fail_reader_when_config_cache_disk_path_is_invalid() {
653 let tmp = tempfile::tempdir().unwrap();
654 let bad_path = tmp.path().join("not-a-dir");
655 std::fs::write(&bad_path, b"").unwrap();
656
657 let slate_config = SlateDbStorageConfig {
658 path: "data".to_string(),
659 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
660 path: tmp.path().join("obj").to_str().unwrap().to_string(),
661 }),
662 settings_path: None,
663 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
664 1024 * 1024,
665 4 * 1024 * 1024,
666 bad_path.to_str().unwrap().to_string(),
667 ))),
668 };
669
670 let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
672 block_cache: None,
673 ..slate_config.clone()
674 }))
675 .await
676 .unwrap()
677 .build()
678 .await
679 .unwrap();
680 drop(writer);
681
682 let handle = tokio::spawn(async move {
684 let _ = create_storage_read(
685 &StorageConfig::SlateDb(slate_config),
686 StorageReaderRuntime::new(),
687 StorageSemantics::new(),
688 slatedb::config::DbReaderOptions::default(),
689 )
690 .await;
691 });
692 let result = handle.await;
693 assert!(
694 result.is_err() && result.unwrap_err().is_panic(),
695 "expected foyer to panic on invalid disk_path for reader"
696 );
697 }
698
699 #[tokio::test]
700 async fn reader_runtime_cache_should_take_precedence_over_config_cache() {
701 let tmp = tempfile::tempdir().unwrap();
702 let bad_path = tmp.path().join("not-a-dir");
703 std::fs::write(&bad_path, b"").unwrap();
704
705 let slate_config = SlateDbStorageConfig {
706 path: "data".to_string(),
707 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
708 path: tmp.path().join("obj").to_str().unwrap().to_string(),
709 }),
710 settings_path: None,
711 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
712 1024 * 1024,
713 4 * 1024 * 1024,
714 bad_path.to_str().unwrap().to_string(),
715 ))),
716 };
717
718 let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
720 block_cache: None,
721 ..slate_config.clone()
722 }))
723 .await
724 .unwrap()
725 .build()
726 .await
727 .unwrap();
728 drop(writer);
729
730 let runtime_cache = FoyerCache::new_with_opts(FoyerCacheOptions {
732 max_capacity: 1024 * 1024,
733 shards: 1,
734 });
735 let runtime = StorageReaderRuntime::new().with_block_cache(Arc::new(runtime_cache));
736
737 let result = create_storage_read(
738 &StorageConfig::SlateDb(slate_config),
739 runtime,
740 StorageSemantics::new(),
741 slatedb::config::DbReaderOptions::default(),
742 )
743 .await;
744
745 assert!(
746 result.is_ok(),
747 "reader runtime cache should take precedence, skipping invalid config cache"
748 );
749 }
750
751 #[tokio::test]
752 async fn should_return_none_when_no_block_cache_configured() {
753 let result = create_block_cache_from_config(&None).await.unwrap();
754 assert!(result.is_none());
755 }
756
757 #[tokio::test]
758 async fn should_work_without_block_cache() {
759 let tmp = tempfile::tempdir().unwrap();
760 let config = slatedb_config_with_local_dir(tmp.path());
761
762 let storage = StorageBuilder::new(&config).await.unwrap().build().await;
763
764 assert!(storage.is_ok());
765 }
766}