1use std::sync::Arc;
7
8use super::config::{BlockCacheConfig, ObjectStoreConfig, StorageConfig};
9use super::in_memory::InMemoryStorage;
10use super::metrics_recorder::{MetricsRsRecorder, MixtricsBridge as MetricsRsRegistry};
11use super::slate::{SlateDbStorage, SlateDbStorageReader};
12use super::{MergeOperator, Storage, StorageError, StorageRead, StorageResult};
13use slatedb::DbReader;
14use slatedb::config::Settings;
15pub use slatedb::db_cache::CachedEntry;
16pub use slatedb::db_cache::foyer::{FoyerCache, FoyerCacheOptions};
17pub use slatedb::db_cache::foyer_hybrid::FoyerHybridCache;
18use slatedb::db_cache::{CachedKey, DbCache};
19use slatedb::object_store::{self, ObjectStore};
20pub use slatedb::{CompactorBuilder, DbBuilder};
21use tracing::info;
22use uuid::Uuid;
23
24pub(crate) type OwnedHybridCache = foyer::HybridCache<CachedKey, CachedEntry>;
32
33struct ManagedBlockCache {
37 db_cache: Arc<dyn DbCache>,
38 hybrid: OwnedHybridCache,
39}
40
41pub struct StorageBuilder {
71 inner: StorageBuilderInner,
72 semantics: StorageSemantics,
73 managed_cache: Option<OwnedHybridCache>,
74}
75
76enum StorageBuilderInner {
77 InMemory,
78 SlateDb(Box<DbBuilder<String>>),
79}
80
81impl StorageBuilder {
82 pub async fn new(config: &StorageConfig) -> StorageResult<Self> {
89 let mut managed_cache: Option<OwnedHybridCache> = None;
90 let inner = match config {
91 StorageConfig::InMemory => StorageBuilderInner::InMemory,
92 StorageConfig::SlateDb(slate_config) => {
93 let object_store = create_object_store(&slate_config.object_store)?;
94 let settings = match &slate_config.settings_path {
95 Some(path) => Settings::from_file(path).map_err(|e| {
96 StorageError::Storage(format!(
97 "Failed to load SlateDB settings from {}: {}",
98 path, e
99 ))
100 })?,
101 None => Settings::load().unwrap_or_default(),
102 };
103 info!(
104 "create slatedb storage with config: {:?}, settings: {:?}",
105 slate_config, settings
106 );
107 let mut db_builder =
108 DbBuilder::new(slate_config.path.clone(), object_store).with_settings(settings);
109 if let Some(managed) =
110 create_block_cache_from_config(&slate_config.block_cache).await?
111 {
112 db_builder = db_builder.with_db_cache(managed.db_cache);
113 managed_cache = Some(managed.hybrid);
114 }
115 StorageBuilderInner::SlateDb(Box::new(db_builder))
116 }
117 };
118 Ok(Self {
119 inner,
120 semantics: StorageSemantics::default(),
121 managed_cache,
122 })
123 }
124
125 pub fn with_semantics(mut self, semantics: StorageSemantics) -> Self {
127 self.semantics = semantics;
128 self
129 }
130
131 pub fn map_slatedb(mut self, f: impl FnOnce(DbBuilder<String>) -> DbBuilder<String>) -> Self {
140 if let StorageBuilderInner::SlateDb(db) = self.inner {
141 self.inner = StorageBuilderInner::SlateDb(Box::new(f(*db)));
142 }
143 self
144 }
145
146 pub async fn build(self) -> StorageResult<Arc<dyn Storage>> {
150 match self.inner {
151 StorageBuilderInner::InMemory => {
152 let storage = match self.semantics.merge_operator {
153 Some(op) => InMemoryStorage::with_merge_operator(op),
154 None => InMemoryStorage::new(),
155 };
156 Ok(Arc::new(storage))
157 }
158 StorageBuilderInner::SlateDb(db_builder) => {
159 let mut db_builder = *db_builder;
160 db_builder = db_builder.with_metrics_recorder(Arc::new(MetricsRsRecorder));
161 if let Some(op) = self.semantics.merge_operator {
162 let adapter = SlateDbStorage::merge_operator_adapter(op);
163 db_builder = db_builder.with_merge_operator(Arc::new(adapter));
164 }
165 let db = db_builder.build().await.map_err(|e| {
166 StorageError::Storage(format!("Failed to create SlateDB: {}", e))
167 })?;
168 Ok(Arc::new(SlateDbStorage::new_with_managed_cache(
169 Arc::new(db),
170 self.managed_cache,
171 )))
172 }
173 }
174 }
175}
176
177#[derive(Default, Clone)]
183pub struct StorageReaderRuntime {
184 pub(crate) block_cache: Option<Arc<dyn DbCache>>,
185 pub(crate) object_store: Option<Arc<dyn ObjectStore>>,
186 pub(crate) checkpoint_id: Option<Uuid>,
187}
188
189impl StorageReaderRuntime {
190 pub fn new() -> Self {
192 Self::default()
193 }
194
195 pub fn with_block_cache(mut self, cache: Arc<dyn DbCache>) -> Self {
203 self.block_cache = Some(cache);
204 self
205 }
206
207 pub fn with_object_store(mut self, object_store: Arc<dyn ObjectStore>) -> Self {
208 self.object_store = Some(object_store);
209 self
210 }
211
212 pub fn with_checkpoint_id(mut self, id: Uuid) -> Self {
220 self.checkpoint_id = Some(id);
221 self
222 }
223}
224
225#[derive(Default)]
248pub struct StorageSemantics {
249 pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
250}
251
252impl StorageSemantics {
253 pub fn new() -> Self {
255 Self::default()
256 }
257
258 pub fn with_merge_operator(mut self, op: Arc<dyn MergeOperator>) -> Self {
263 self.merge_operator = Some(op);
264 self
265 }
266}
267
268pub fn create_object_store(config: &ObjectStoreConfig) -> StorageResult<Arc<dyn ObjectStore>> {
273 match config {
274 ObjectStoreConfig::InMemory => Ok(Arc::new(object_store::memory::InMemory::new())),
275 ObjectStoreConfig::Aws(aws_config) => {
276 let store = object_store::aws::AmazonS3Builder::from_env()
277 .with_region(&aws_config.region)
278 .with_bucket_name(&aws_config.bucket)
279 .build()
280 .map_err(|e| {
281 StorageError::Storage(format!("Failed to create AWS S3 store: {}", e))
282 })?;
283 Ok(Arc::new(store))
284 }
285 ObjectStoreConfig::Local(local_config) => {
286 std::fs::create_dir_all(&local_config.path).map_err(|e| {
287 StorageError::Storage(format!(
288 "Failed to create storage directory '{}': {}",
289 local_config.path, e
290 ))
291 })?;
292 let store = object_store::local::LocalFileSystem::new_with_prefix(&local_config.path)
293 .map_err(|e| {
294 StorageError::Storage(format!("Failed to create local filesystem store: {}", e))
295 })?;
296 Ok(Arc::new(store))
297 }
298 }
299}
300
301pub async fn create_storage_read(
319 config: &StorageConfig,
320 runtime: StorageReaderRuntime,
321 semantics: StorageSemantics,
322 reader_options: slatedb::config::DbReaderOptions,
323) -> StorageResult<Arc<dyn StorageRead>> {
324 match config {
325 StorageConfig::InMemory => {
326 let storage = match semantics.merge_operator {
328 Some(op) => InMemoryStorage::with_merge_operator(op),
329 None => InMemoryStorage::new(),
330 };
331 Ok(Arc::new(storage))
332 }
333 StorageConfig::SlateDb(slate_config) => {
334 let object_store = if let Some(object_store) = &runtime.object_store {
335 object_store.clone()
336 } else {
337 create_object_store(&slate_config.object_store)?
338 };
339
340 let mut builder = DbReader::builder(slate_config.path.clone(), object_store)
341 .with_options(reader_options)
342 .with_metrics_recorder(Arc::new(MetricsRsRecorder));
343 if let Some(checkpoint_id) = runtime.checkpoint_id {
344 builder = builder.with_checkpoint_id(checkpoint_id);
345 }
346 if let Some(op) = semantics.merge_operator {
347 let adapter = SlateDbStorage::merge_operator_adapter(op);
348 builder = builder.with_merge_operator(Arc::new(adapter));
349 }
350 let mut managed_cache: Option<OwnedHybridCache> = None;
354 if let Some(cache) = runtime.block_cache {
355 builder = builder.with_db_cache(cache);
356 } else if let Some(managed) =
357 create_block_cache_from_config(&slate_config.block_cache).await?
358 {
359 builder = builder.with_db_cache(managed.db_cache);
360 managed_cache = Some(managed.hybrid);
361 }
362 let reader = builder.build().await.map_err(|e| {
363 StorageError::Storage(format!("Failed to create SlateDB reader: {}", e))
364 })?;
365 Ok(Arc::new(SlateDbStorageReader::new_with_managed_cache(
366 Arc::new(reader),
367 managed_cache,
368 )))
369 }
370 }
371}
372
373async fn create_block_cache_from_config(
378 config: &Option<BlockCacheConfig>,
379) -> StorageResult<Option<ManagedBlockCache>> {
380 let Some(config) = config else {
381 return Ok(None);
382 };
383 match config {
384 BlockCacheConfig::FoyerHybrid(foyer_config) => {
385 use foyer::{
386 DirectFsDeviceOptions, Engine, HybridCacheBuilder, HybridCachePolicy,
387 LargeEngineOptions,
388 };
389
390 let memory_capacity = usize::try_from(foyer_config.memory_capacity).map_err(|_| {
391 StorageError::Storage(format!(
392 "memory_capacity {} exceeds usize::MAX on this platform",
393 foyer_config.memory_capacity
394 ))
395 })?;
396 let disk_capacity = usize::try_from(foyer_config.disk_capacity).map_err(|_| {
397 StorageError::Storage(format!(
398 "disk_capacity {} exceeds usize::MAX on this platform",
399 foyer_config.disk_capacity
400 ))
401 })?;
402
403 let buffer_pool_size = usize::try_from(foyer_config.effective_buffer_pool_size())
404 .map_err(|_| {
405 StorageError::Storage(format!(
406 "buffer_pool_size {} exceeds usize::MAX on this platform",
407 foyer_config.effective_buffer_pool_size()
408 ))
409 })?;
410 let submit_queue_size_threshold =
411 usize::try_from(foyer_config.submit_queue_size_threshold).map_err(|_| {
412 StorageError::Storage(format!(
413 "submit_queue_size_threshold {} exceeds usize::MAX on this platform",
414 foyer_config.submit_queue_size_threshold
415 ))
416 })?;
417
418 let policy = match foyer_config.write_policy {
419 super::config::FoyerWritePolicy::WriteOnInsertion => {
420 HybridCachePolicy::WriteOnInsertion
421 }
422 super::config::FoyerWritePolicy::WriteOnEviction => {
423 HybridCachePolicy::WriteOnEviction
424 }
425 };
426
427 let cache = HybridCacheBuilder::new()
428 .with_name("slatedb_block_cache")
429 .with_metrics_registry(Box::new(MetricsRsRegistry))
430 .with_policy(policy)
431 .memory(memory_capacity)
432 .with_weighter(|_, v: &CachedEntry| v.size())
433 .storage(Engine::Large(
434 LargeEngineOptions::new()
435 .with_flushers(foyer_config.flushers)
436 .with_buffer_pool_size(buffer_pool_size)
437 .with_submit_queue_size_threshold(submit_queue_size_threshold),
438 ))
439 .with_device_options(
440 DirectFsDeviceOptions::new(&foyer_config.disk_path)
441 .with_capacity(disk_capacity),
442 )
443 .build()
444 .await
445 .map_err(|e| {
446 StorageError::Storage(format!("Failed to create hybrid cache: {}", e))
447 })?;
448
449 info!(
450 memory_mb = foyer_config.memory_capacity / (1024 * 1024),
451 disk_mb = foyer_config.disk_capacity / (1024 * 1024),
452 disk_path = %foyer_config.disk_path,
453 write_policy = ?foyer_config.write_policy,
454 flushers = foyer_config.flushers,
455 buffer_pool_mb = foyer_config.effective_buffer_pool_size() / (1024 * 1024),
456 submit_queue_threshold_mb =
457 foyer_config.submit_queue_size_threshold / (1024 * 1024),
458 "hybrid block cache enabled"
459 );
460
461 let db_cache =
462 Arc::new(FoyerHybridCache::new_with_cache(cache.clone())) as Arc<dyn DbCache>;
463 Ok(Some(ManagedBlockCache {
464 db_cache,
465 hybrid: cache,
466 }))
467 }
468 }
469}
470
471#[cfg(test)]
472mod tests {
473 use super::*;
474 use crate::storage::config::{
475 FoyerHybridCacheConfig, LocalObjectStoreConfig, SlateDbStorageConfig,
476 };
477
478 fn foyer_cache_config(
479 memory_capacity: u64,
480 disk_capacity: u64,
481 disk_path: String,
482 ) -> FoyerHybridCacheConfig {
483 FoyerHybridCacheConfig {
484 memory_capacity,
485 disk_capacity,
486 disk_path,
487 write_policy: Default::default(),
488 flushers: 4,
489 buffer_pool_size: None,
490 submit_queue_size_threshold: 1024 * 1024 * 1024,
491 }
492 }
493
494 fn slatedb_config_with_local_dir(dir: &std::path::Path) -> StorageConfig {
495 StorageConfig::SlateDb(SlateDbStorageConfig {
496 path: "data".to_string(),
497 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
498 path: dir.to_str().unwrap().to_string(),
499 }),
500 settings_path: None,
501 block_cache: None,
502 })
503 }
504
505 #[tokio::test]
506 async fn should_create_storage_with_block_cache_from_config() {
507 let tmp = tempfile::tempdir().unwrap();
508 let cache_dir = tmp.path().join("block-cache");
509 std::fs::create_dir_all(&cache_dir).unwrap();
510
511 let config = StorageConfig::SlateDb(SlateDbStorageConfig {
512 path: "data".to_string(),
513 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
514 path: tmp.path().join("obj").to_str().unwrap().to_string(),
515 }),
516 settings_path: None,
517 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
518 1024 * 1024,
519 4 * 1024 * 1024,
520 cache_dir.to_str().unwrap().to_string(),
521 ))),
522 });
523
524 let storage = StorageBuilder::new(&config).await.unwrap().build().await;
525
526 assert!(
527 storage.is_ok(),
528 "expected config-driven block cache to work"
529 );
530 }
531
532 #[tokio::test]
533 async fn should_create_reader_with_block_cache_from_config() {
534 let tmp = tempfile::tempdir().unwrap();
535 let cache_dir = tmp.path().join("block-cache");
536 std::fs::create_dir_all(&cache_dir).unwrap();
537
538 let slate_config = SlateDbStorageConfig {
539 path: "data".to_string(),
540 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
541 path: tmp.path().join("obj").to_str().unwrap().to_string(),
542 }),
543 settings_path: None,
544 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
545 1024 * 1024,
546 4 * 1024 * 1024,
547 cache_dir.to_str().unwrap().to_string(),
548 ))),
549 };
550
551 let writer = StorageBuilder::new(&StorageConfig::SlateDb(slate_config.clone()))
553 .await
554 .unwrap()
555 .build()
556 .await
557 .unwrap();
558 drop(writer);
560
561 let reader = create_storage_read(
562 &StorageConfig::SlateDb(slate_config),
563 StorageReaderRuntime::new(),
564 StorageSemantics::new(),
565 slatedb::config::DbReaderOptions::default(),
566 )
567 .await;
568
569 assert!(
570 reader.is_ok(),
571 "expected config-driven block cache on reader to work"
572 );
573 }
574
575 #[cfg(target_pointer_width = "32")]
576 #[tokio::test]
577 async fn should_error_when_capacity_exceeds_usize() {
578 let config = BlockCacheConfig::FoyerHybrid(foyer_cache_config(
581 u64::MAX,
582 4 * 1024 * 1024,
583 "/tmp/unused".to_string(),
584 ));
585
586 let result = create_block_cache_from_config(&Some(config)).await;
587 assert!(result.is_err());
588 }
589
590 fn config_with_invalid_block_cache_disk_path(
593 obj_dir: &std::path::Path,
594 bad_disk_path: &str,
595 ) -> StorageConfig {
596 StorageConfig::SlateDb(SlateDbStorageConfig {
597 path: "data".to_string(),
598 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
599 path: obj_dir.to_str().unwrap().to_string(),
600 }),
601 settings_path: None,
602 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
603 1024 * 1024,
604 4 * 1024 * 1024,
605 bad_disk_path.to_string(),
606 ))),
607 })
608 }
609
610 #[tokio::test]
614 async fn should_fail_when_config_cache_disk_path_is_invalid() {
615 let tmp = tempfile::tempdir().unwrap();
616 let bad_path = tmp.path().join("not-a-dir");
618 std::fs::write(&bad_path, b"").unwrap();
619
620 let config = config_with_invalid_block_cache_disk_path(
621 &tmp.path().join("obj"),
622 bad_path.to_str().unwrap(),
623 );
624
625 let handle = tokio::spawn(async move {
627 let _ = StorageBuilder::new(&config).await.unwrap().build().await;
628 });
629 let result = handle.await;
630 assert!(
631 result.is_err() && result.unwrap_err().is_panic(),
632 "expected foyer to panic on invalid disk_path"
633 );
634 }
635
636 #[tokio::test]
637 async fn should_fail_reader_when_config_cache_disk_path_is_invalid() {
638 let tmp = tempfile::tempdir().unwrap();
639 let bad_path = tmp.path().join("not-a-dir");
640 std::fs::write(&bad_path, b"").unwrap();
641
642 let slate_config = SlateDbStorageConfig {
643 path: "data".to_string(),
644 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
645 path: tmp.path().join("obj").to_str().unwrap().to_string(),
646 }),
647 settings_path: None,
648 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
649 1024 * 1024,
650 4 * 1024 * 1024,
651 bad_path.to_str().unwrap().to_string(),
652 ))),
653 };
654
655 let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
657 block_cache: None,
658 ..slate_config.clone()
659 }))
660 .await
661 .unwrap()
662 .build()
663 .await
664 .unwrap();
665 drop(writer);
666
667 let handle = tokio::spawn(async move {
669 let _ = create_storage_read(
670 &StorageConfig::SlateDb(slate_config),
671 StorageReaderRuntime::new(),
672 StorageSemantics::new(),
673 slatedb::config::DbReaderOptions::default(),
674 )
675 .await;
676 });
677 let result = handle.await;
678 assert!(
679 result.is_err() && result.unwrap_err().is_panic(),
680 "expected foyer to panic on invalid disk_path for reader"
681 );
682 }
683
684 #[tokio::test]
685 async fn reader_runtime_cache_should_take_precedence_over_config_cache() {
686 let tmp = tempfile::tempdir().unwrap();
687 let bad_path = tmp.path().join("not-a-dir");
688 std::fs::write(&bad_path, b"").unwrap();
689
690 let slate_config = SlateDbStorageConfig {
691 path: "data".to_string(),
692 object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
693 path: tmp.path().join("obj").to_str().unwrap().to_string(),
694 }),
695 settings_path: None,
696 block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
697 1024 * 1024,
698 4 * 1024 * 1024,
699 bad_path.to_str().unwrap().to_string(),
700 ))),
701 };
702
703 let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
705 block_cache: None,
706 ..slate_config.clone()
707 }))
708 .await
709 .unwrap()
710 .build()
711 .await
712 .unwrap();
713 drop(writer);
714
715 let runtime_cache = FoyerCache::new_with_opts(FoyerCacheOptions {
717 max_capacity: 1024 * 1024,
718 shards: 1,
719 });
720 let runtime = StorageReaderRuntime::new().with_block_cache(Arc::new(runtime_cache));
721
722 let result = create_storage_read(
723 &StorageConfig::SlateDb(slate_config),
724 runtime,
725 StorageSemantics::new(),
726 slatedb::config::DbReaderOptions::default(),
727 )
728 .await;
729
730 assert!(
731 result.is_ok(),
732 "reader runtime cache should take precedence, skipping invalid config cache"
733 );
734 }
735
736 #[tokio::test]
737 async fn should_return_none_when_no_block_cache_configured() {
738 let result = create_block_cache_from_config(&None).await.unwrap();
739 assert!(result.is_none());
740 }
741
742 #[tokio::test]
743 async fn should_work_without_block_cache() {
744 let tmp = tempfile::tempdir().unwrap();
745 let config = slatedb_config_with_local_dir(tmp.path());
746
747 let storage = StorageBuilder::new(&config).await.unwrap().build().await;
748
749 assert!(storage.is_ok());
750 }
751}