Skip to main content

common/storage/
factory.rs

1//! Storage factory for creating storage instances from configuration.
2//!
3//! This module provides factory functions for creating storage backends
4//! based on configuration, supporting both InMemory and SlateDB backends.
5
6use std::sync::Arc;
7
8use super::config::{BlockCacheConfig, ObjectStoreConfig, StorageConfig};
9use super::in_memory::InMemoryStorage;
10use super::metrics_recorder::{MetricsRsRecorder, MixtricsBridge as MetricsRsRegistry};
11use super::slate::{SlateDbStorage, SlateDbStorageReader};
12use super::{MergeOperator, Storage, StorageError, StorageRead, StorageResult};
13use slatedb::DbReader;
14use slatedb::config::Settings;
15pub use slatedb::db_cache::DbCache;
16pub use slatedb::db_cache::foyer::{FoyerCache, FoyerCacheOptions};
17pub use slatedb::db_cache::foyer_hybrid::FoyerHybridCache;
18pub use slatedb::db_cache::{CachedEntry, CachedKey};
19use slatedb::object_store::{self, ObjectStore};
20pub use slatedb::{CompactorBuilder, DbBuilder};
21use tracing::info;
22use uuid::Uuid;
23
24/// Handle to a foyer hybrid cache that we own and must close explicitly on
25/// shutdown. Cloneable because foyer's `HybridCache` is Arc-backed.
26///
27/// TODO(slatedb 0.13): remove this and the surrounding plumbing. SlateDB 0.13
28/// adds a `close()` hook to the `DbCache` trait and drives cache shutdown from
29/// `Db::close()` / `DbReader::close()`, so callers won't need to hold a side
30/// handle to the hybrid cache just to close it deterministically.
31pub(crate) type OwnedHybridCache = foyer::HybridCache<CachedKey, CachedEntry>;
32
33/// Block cache we constructed internally — keep the `HybridCache` handle so
34/// we can call `close().await` from `StorageRead::close()` instead of relying
35/// on foyer's Drop-based close, which races the runtime shutdown.
36struct ManagedBlockCache {
37    db_cache: Arc<dyn DbCache>,
38    hybrid: OwnedHybridCache,
39}
40
41/// Builder for creating storage instances from configuration.
42///
43/// `StorageBuilder` provides layered access to the underlying SlateDB
44/// [`DbBuilder`], replacing the previous `StorageRuntime` middleman.
45///
46/// # Example
47///
48/// ```rust,ignore
49/// use common::{StorageBuilder, StorageSemantics, create_object_store};
50/// use common::storage::factory::CompactorBuilder;
51///
52/// // Simple usage:
53/// let storage = StorageBuilder::new(&config.storage).await?
54///     .with_semantics(StorageSemantics::new().with_merge_operator(Arc::new(MyOp)))
55///     .build()
56///     .await?;
57///
58/// // Escape hatch for low-level SlateDB configuration:
59/// let storage = StorageBuilder::new(&config.storage).await?
60///     .map_slatedb(|db| {
61///         let obj_store = create_object_store(&slate_config.object_store).unwrap();
62///         db.with_compactor_builder(
63///             CompactorBuilder::new(slate_config.path.clone(), obj_store)
64///                 .with_runtime(compaction_runtime.handle().clone())
65///         )
66///     })
67///     .build()
68///     .await?;
69/// ```
70pub struct StorageBuilder {
71    inner: StorageBuilderInner,
72    semantics: StorageSemantics,
73    managed_cache: Option<OwnedHybridCache>,
74}
75
76enum StorageBuilderInner {
77    InMemory,
78    SlateDb(Box<DbBuilder<String>>),
79}
80
81impl StorageBuilder {
82    /// Creates a new `StorageBuilder` from a [`StorageConfig`].
83    ///
84    /// For SlateDB configs this creates a [`DbBuilder`] with the configured
85    /// path, object store, settings, and block cache (if configured). For
86    /// InMemory configs it stores a sentinel so that `build()` returns an
87    /// `InMemoryStorage`.
88    pub async fn new(config: &StorageConfig) -> StorageResult<Self> {
89        let mut managed_cache: Option<OwnedHybridCache> = None;
90        let inner = match config {
91            StorageConfig::InMemory => StorageBuilderInner::InMemory,
92            StorageConfig::SlateDb(slate_config) => {
93                let object_store = create_object_store(&slate_config.object_store)?;
94                let settings = match &slate_config.settings_path {
95                    Some(path) => Settings::from_file(path).map_err(|e| {
96                        StorageError::Storage(format!(
97                            "Failed to load SlateDB settings from {}: {}",
98                            path, e
99                        ))
100                    })?,
101                    None => Settings::load().unwrap_or_default(),
102                };
103                info!(
104                    "create slatedb storage with config: {:?}, settings: {:?}",
105                    slate_config, settings
106                );
107                let mut db_builder =
108                    DbBuilder::new(slate_config.path.clone(), object_store).with_settings(settings);
109                if let Some(managed) =
110                    create_block_cache_from_config(&slate_config.block_cache).await?
111                {
112                    db_builder = db_builder.with_db_cache(managed.db_cache);
113                    managed_cache = Some(managed.hybrid);
114                }
115                StorageBuilderInner::SlateDb(Box::new(db_builder))
116            }
117        };
118        Ok(Self {
119            inner,
120            semantics: StorageSemantics::default(),
121            managed_cache,
122        })
123    }
124
125    /// Sets the [`StorageSemantics`] (merge operator, etc.) for this builder.
126    pub fn with_semantics(mut self, semantics: StorageSemantics) -> Self {
127        self.semantics = semantics;
128        self
129    }
130
131    /// Maps over the underlying [`DbBuilder`] for low-level SlateDB configuration.
132    ///
133    /// This is the escape hatch for any SlateDB knob not exposed by
134    /// `StorageBuilder` itself (compactor builder, block cache, GC runtime, etc.).
135    /// Use `db.with_db_cache(...)` inside the closure to override the
136    /// config-driven block cache.
137    ///
138    /// For InMemory storage this is a no-op.
139    pub fn map_slatedb(mut self, f: impl FnOnce(DbBuilder<String>) -> DbBuilder<String>) -> Self {
140        if let StorageBuilderInner::SlateDb(db) = self.inner {
141            self.inner = StorageBuilderInner::SlateDb(Box::new(f(*db)));
142        }
143        self
144    }
145
146    /// Builds the storage instance.
147    ///
148    /// Applies semantics (merge operator) to the `DbBuilder` and calls `.build()`.
149    pub async fn build(self) -> StorageResult<Arc<dyn Storage>> {
150        match self.inner {
151            StorageBuilderInner::InMemory => {
152                let storage = match self.semantics.merge_operator {
153                    Some(op) => InMemoryStorage::with_merge_operator(op),
154                    None => InMemoryStorage::new(),
155                };
156                Ok(Arc::new(storage))
157            }
158            StorageBuilderInner::SlateDb(db_builder) => {
159                let mut db_builder = *db_builder;
160                db_builder = db_builder.with_metrics_recorder(Arc::new(MetricsRsRecorder));
161                if let Some(op) = self.semantics.merge_operator {
162                    let adapter = SlateDbStorage::merge_operator_adapter(op);
163                    db_builder = db_builder.with_merge_operator(Arc::new(adapter));
164                }
165                let db = db_builder.build().await.map_err(|e| {
166                    StorageError::Storage(format!("Failed to create SlateDB: {}", e))
167                })?;
168                Ok(Arc::new(SlateDbStorage::new_with_managed_cache(
169                    Arc::new(db),
170                    self.managed_cache,
171                )))
172            }
173        }
174    }
175}
176
177/// Runtime options for read-only storage instances.
178///
179/// This struct holds non-serializable runtime configuration for `DbReader`.
180/// Unlike `StorageBuilder`, it only exposes options relevant to readers
181/// (currently just block cache).
182#[derive(Default, Clone)]
183pub struct StorageReaderRuntime {
184    pub(crate) block_cache: Option<Arc<dyn DbCache>>,
185    pub(crate) object_store: Option<Arc<dyn ObjectStore>>,
186    pub(crate) checkpoint_id: Option<Uuid>,
187}
188
189impl StorageReaderRuntime {
190    /// Creates a new reader runtime with default options.
191    pub fn new() -> Self {
192        Self::default()
193    }
194
195    pub fn block_cache(&self) -> Option<Arc<dyn DbCache>> {
196        self.block_cache.clone()
197    }
198
199    /// Sets a block cache for SlateDB reads.
200    ///
201    /// When provided, the `DbReader` will use this cache for SST block lookups,
202    /// reducing disk I/O on repeated reads. Use `FoyerCache::new_with_opts`
203    /// to control capacity.
204    ///
205    /// This option only affects SlateDB storage; it is ignored for in-memory storage.
206    pub fn with_block_cache(mut self, cache: Arc<dyn DbCache>) -> Self {
207        self.block_cache = Some(cache);
208        self
209    }
210
211    pub fn with_object_store(mut self, object_store: Arc<dyn ObjectStore>) -> Self {
212        self.object_store = Some(object_store);
213        self
214    }
215
216    /// Pins this reader to a specific SlateDB checkpoint.
217    ///
218    /// When set, the reader serves a consistent view of the database as of
219    /// the checkpoint and does not advance with newer writes. The checkpoint
220    /// must already exist in storage (typically created via
221    /// [`crate::Storage::create_checkpoint`]). Only meaningful for SlateDB
222    /// storage.
223    pub fn with_checkpoint_id(mut self, id: Uuid) -> Self {
224        self.checkpoint_id = Some(id);
225        self
226    }
227}
228
229/// Storage semantics configured by system crates.
230///
231/// This struct holds semantic concerns like merge operators that are specific
232/// to each system (log, timeseries, vector). End users should not use this
233/// directly - each system configures its own semantics internally.
234///
235/// # Internal Use Only
236///
237/// This type is public so that system crates (timeseries, vector, log) can
238/// access it, but it is not intended for end-user consumption.
239///
240/// # Example (for system crate implementers)
241///
242/// ```rust,ignore
243/// // In timeseries crate:
244/// let semantics = StorageSemantics::new()
245///     .with_merge_operator(Arc::new(TimeSeriesMergeOperator));
246/// let storage = StorageBuilder::new(&config).await?
247///     .with_semantics(semantics)
248///     .build()
249///     .await?;
250/// ```
251#[derive(Default)]
252pub struct StorageSemantics {
253    pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
254}
255
256impl StorageSemantics {
257    /// Creates new storage semantics with default values.
258    pub fn new() -> Self {
259        Self::default()
260    }
261
262    /// Sets the merge operator for merge operations.
263    ///
264    /// The merge operator defines how values are combined during compaction.
265    /// Each system (timeseries, vector) defines its own merge semantics.
266    pub fn with_merge_operator(mut self, op: Arc<dyn MergeOperator>) -> Self {
267        self.merge_operator = Some(op);
268        self
269    }
270}
271
272/// Creates an object store from configuration without initializing SlateDB.
273///
274/// This is useful for cleanup operations where you need to access the object store
275/// after the database has been closed.
276pub fn create_object_store(config: &ObjectStoreConfig) -> StorageResult<Arc<dyn ObjectStore>> {
277    match config {
278        ObjectStoreConfig::InMemory => Ok(Arc::new(object_store::memory::InMemory::new())),
279        ObjectStoreConfig::Aws(aws_config) => {
280            let store = object_store::aws::AmazonS3Builder::from_env()
281                .with_region(&aws_config.region)
282                .with_bucket_name(&aws_config.bucket)
283                .build()
284                .map_err(|e| {
285                    StorageError::Storage(format!("Failed to create AWS S3 store: {}", e))
286                })?;
287            Ok(Arc::new(store))
288        }
289        ObjectStoreConfig::Local(local_config) => {
290            std::fs::create_dir_all(&local_config.path).map_err(|e| {
291                StorageError::Storage(format!(
292                    "Failed to create storage directory '{}': {}",
293                    local_config.path, e
294                ))
295            })?;
296            let store = object_store::local::LocalFileSystem::new_with_prefix(&local_config.path)
297                .map_err(|e| {
298                StorageError::Storage(format!("Failed to create local filesystem store: {}", e))
299            })?;
300            Ok(Arc::new(store))
301        }
302    }
303}
304
305/// Creates a read-only storage instance based on configuration.
306///
307/// This function creates a storage backend that only supports read operations.
308/// For SlateDB, it uses `DbReader` which does not participate in fencing,
309/// allowing multiple readers to coexist with a single writer.
310///
311/// # Arguments
312///
313/// * `config` - The storage configuration specifying the backend type and settings.
314/// * `semantics` - System-specific semantics like merge operators.
315/// * `reader_options` - SlateDB reader options (e.g., manifest_poll_interval).
316///   These are passed directly to `DbReader::open` for SlateDB storage.
317///   Ignored for InMemory storage.
318///
319/// # Returns
320///
321/// Returns an `Arc<dyn StorageRead>` on success, or a `StorageError` on failure.
322pub async fn create_storage_read(
323    config: &StorageConfig,
324    runtime: StorageReaderRuntime,
325    semantics: StorageSemantics,
326    reader_options: slatedb::config::DbReaderOptions,
327) -> StorageResult<Arc<dyn StorageRead>> {
328    match config {
329        StorageConfig::InMemory => {
330            // InMemory has no fencing, reuse existing implementation
331            let storage = match semantics.merge_operator {
332                Some(op) => InMemoryStorage::with_merge_operator(op),
333                None => InMemoryStorage::new(),
334            };
335            Ok(Arc::new(storage))
336        }
337        StorageConfig::SlateDb(slate_config) => {
338            let object_store = if let Some(object_store) = &runtime.object_store {
339                object_store.clone()
340            } else {
341                create_object_store(&slate_config.object_store)?
342            };
343
344            let mut builder = DbReader::builder(slate_config.path.clone(), object_store)
345                .with_options(reader_options)
346                .with_metrics_recorder(Arc::new(MetricsRsRecorder));
347            if let Some(checkpoint_id) = runtime.checkpoint_id {
348                builder = builder.with_checkpoint_id(checkpoint_id);
349            }
350            if let Some(op) = semantics.merge_operator {
351                let adapter = SlateDbStorage::merge_operator_adapter(op);
352                builder = builder.with_merge_operator(Arc::new(adapter));
353            }
354            // Prefer runtime-provided cache, fall back to config. The
355            // runtime-provided cache is owned by the caller, so we don't hold
356            // a handle to close it on shutdown.
357            let mut managed_cache: Option<OwnedHybridCache> = None;
358            if let Some(cache) = runtime.block_cache {
359                builder = builder.with_db_cache(cache);
360            } else if let Some(managed) =
361                create_block_cache_from_config(&slate_config.block_cache).await?
362            {
363                builder = builder.with_db_cache(managed.db_cache);
364                managed_cache = Some(managed.hybrid);
365            }
366            let reader = builder.build().await.map_err(|e| {
367                StorageError::Storage(format!("Failed to create SlateDB reader: {}", e))
368            })?;
369            Ok(Arc::new(SlateDbStorageReader::new_with_managed_cache(
370                Arc::new(reader),
371                managed_cache,
372            )))
373        }
374    }
375}
376
377/// Creates a block cache from the serializable config, if present. Returns
378/// both the `DbCache` trait object handed to SlateDB and a `HybridCache`
379/// handle the caller keeps so it can close the cache deterministically on
380/// shutdown.
381async fn create_block_cache_from_config(
382    config: &Option<BlockCacheConfig>,
383) -> StorageResult<Option<ManagedBlockCache>> {
384    let Some(config) = config else {
385        return Ok(None);
386    };
387    match config {
388        BlockCacheConfig::FoyerHybrid(foyer_config) => {
389            use foyer::{
390                BlockEngineConfig, DeviceBuilder, FsDeviceBuilder, HybridCacheBuilder,
391                HybridCachePolicy, PsyncIoEngineConfig,
392            };
393
394            let memory_capacity = usize::try_from(foyer_config.memory_capacity).map_err(|_| {
395                StorageError::Storage(format!(
396                    "memory_capacity {} exceeds usize::MAX on this platform",
397                    foyer_config.memory_capacity
398                ))
399            })?;
400            let disk_capacity = usize::try_from(foyer_config.disk_capacity).map_err(|_| {
401                StorageError::Storage(format!(
402                    "disk_capacity {} exceeds usize::MAX on this platform",
403                    foyer_config.disk_capacity
404                ))
405            })?;
406
407            let buffer_pool_size = usize::try_from(foyer_config.effective_buffer_pool_size())
408                .map_err(|_| {
409                    StorageError::Storage(format!(
410                        "buffer_pool_size {} exceeds usize::MAX on this platform",
411                        foyer_config.effective_buffer_pool_size()
412                    ))
413                })?;
414            let submit_queue_size_threshold =
415                usize::try_from(foyer_config.submit_queue_size_threshold).map_err(|_| {
416                    StorageError::Storage(format!(
417                        "submit_queue_size_threshold {} exceeds usize::MAX on this platform",
418                        foyer_config.submit_queue_size_threshold
419                    ))
420                })?;
421
422            let policy = match foyer_config.write_policy {
423                super::config::FoyerWritePolicy::WriteOnInsertion => {
424                    HybridCachePolicy::WriteOnInsertion
425                }
426                super::config::FoyerWritePolicy::WriteOnEviction => {
427                    HybridCachePolicy::WriteOnEviction
428                }
429            };
430
431            let device = {
432                #[cfg(target_os = "linux")]
433                let builder = FsDeviceBuilder::new(&foyer_config.disk_path)
434                    .with_capacity(disk_capacity)
435                    .with_direct(true);
436                #[cfg(not(target_os = "linux"))]
437                let builder =
438                    FsDeviceBuilder::new(&foyer_config.disk_path).with_capacity(disk_capacity);
439                builder.build().map_err(|e| {
440                    StorageError::Storage(format!("Failed to build foyer device: {}", e))
441                })?
442            };
443
444            let cache = HybridCacheBuilder::new()
445                .with_name("slatedb_block_cache")
446                .with_metrics_registry(Box::new(MetricsRsRegistry))
447                .with_policy(policy)
448                .memory(memory_capacity)
449                .with_weighter(|_, v: &CachedEntry| v.size())
450                .storage()
451                .with_io_engine_config(PsyncIoEngineConfig::new())
452                .with_engine_config(
453                    BlockEngineConfig::new(device)
454                        .with_flushers(foyer_config.flushers)
455                        .with_buffer_pool_size(buffer_pool_size)
456                        .with_submit_queue_size_threshold(submit_queue_size_threshold),
457                )
458                .build()
459                .await
460                .map_err(|e| {
461                    StorageError::Storage(format!("Failed to create hybrid cache: {}", e))
462                })?;
463
464            info!(
465                memory_mb = foyer_config.memory_capacity / (1024 * 1024),
466                disk_mb = foyer_config.disk_capacity / (1024 * 1024),
467                disk_path = %foyer_config.disk_path,
468                write_policy = ?foyer_config.write_policy,
469                flushers = foyer_config.flushers,
470                buffer_pool_mb = foyer_config.effective_buffer_pool_size() / (1024 * 1024),
471                submit_queue_threshold_mb =
472                    foyer_config.submit_queue_size_threshold / (1024 * 1024),
473                "hybrid block cache enabled"
474            );
475
476            let db_cache =
477                Arc::new(FoyerHybridCache::new_with_cache(cache.clone())) as Arc<dyn DbCache>;
478            Ok(Some(ManagedBlockCache {
479                db_cache,
480                hybrid: cache,
481            }))
482        }
483    }
484}
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489    use crate::storage::config::{
490        FoyerHybridCacheConfig, LocalObjectStoreConfig, SlateDbStorageConfig,
491    };
492
493    fn foyer_cache_config(
494        memory_capacity: u64,
495        disk_capacity: u64,
496        disk_path: String,
497    ) -> FoyerHybridCacheConfig {
498        FoyerHybridCacheConfig {
499            memory_capacity,
500            disk_capacity,
501            disk_path,
502            write_policy: Default::default(),
503            flushers: 4,
504            buffer_pool_size: None,
505            submit_queue_size_threshold: 1024 * 1024 * 1024,
506        }
507    }
508
509    fn slatedb_config_with_local_dir(dir: &std::path::Path) -> StorageConfig {
510        StorageConfig::SlateDb(SlateDbStorageConfig {
511            path: "data".to_string(),
512            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
513                path: dir.to_str().unwrap().to_string(),
514            }),
515            settings_path: None,
516            block_cache: None,
517        })
518    }
519
520    #[tokio::test]
521    async fn should_create_storage_with_block_cache_from_config() {
522        let tmp = tempfile::tempdir().unwrap();
523        let cache_dir = tmp.path().join("block-cache");
524        std::fs::create_dir_all(&cache_dir).unwrap();
525
526        let config = StorageConfig::SlateDb(SlateDbStorageConfig {
527            path: "data".to_string(),
528            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
529                path: tmp.path().join("obj").to_str().unwrap().to_string(),
530            }),
531            settings_path: None,
532            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
533                1024 * 1024,
534                4 * 1024 * 1024,
535                cache_dir.to_str().unwrap().to_string(),
536            ))),
537        });
538
539        let storage = StorageBuilder::new(&config).await.unwrap().build().await;
540
541        assert!(
542            storage.is_ok(),
543            "expected config-driven block cache to work"
544        );
545    }
546
547    #[tokio::test]
548    async fn should_create_reader_with_block_cache_from_config() {
549        let tmp = tempfile::tempdir().unwrap();
550        let cache_dir = tmp.path().join("block-cache");
551        std::fs::create_dir_all(&cache_dir).unwrap();
552
553        let slate_config = SlateDbStorageConfig {
554            path: "data".to_string(),
555            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
556                path: tmp.path().join("obj").to_str().unwrap().to_string(),
557            }),
558            settings_path: None,
559            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
560                1024 * 1024,
561                4 * 1024 * 1024,
562                cache_dir.to_str().unwrap().to_string(),
563            ))),
564        };
565
566        // First open a writer so the reader has a manifest to read
567        let writer = StorageBuilder::new(&StorageConfig::SlateDb(slate_config.clone()))
568            .await
569            .unwrap()
570            .build()
571            .await
572            .unwrap();
573        // Close writer before opening reader (SlateDB fencing)
574        drop(writer);
575
576        let reader = create_storage_read(
577            &StorageConfig::SlateDb(slate_config),
578            StorageReaderRuntime::new(),
579            StorageSemantics::new(),
580            slatedb::config::DbReaderOptions::default(),
581        )
582        .await;
583
584        assert!(
585            reader.is_ok(),
586            "expected config-driven block cache on reader to work"
587        );
588    }
589
590    #[cfg(target_pointer_width = "32")]
591    #[tokio::test]
592    async fn should_error_when_capacity_exceeds_usize() {
593        // On 32-bit platforms, u64::MAX > usize::MAX triggers our overflow check.
594        // On 64-bit this is a no-op, so gate on 32-bit.
595        let config = BlockCacheConfig::FoyerHybrid(foyer_cache_config(
596            u64::MAX,
597            4 * 1024 * 1024,
598            "/tmp/unused".to_string(),
599        ));
600
601        let result = create_block_cache_from_config(&Some(config)).await;
602        assert!(result.is_err());
603    }
604
605    /// Helper: creates a SlateDb config whose block_cache disk_path is a regular file
606    /// (not a directory), which foyer deterministically rejects.
607    fn config_with_invalid_block_cache_disk_path(
608        obj_dir: &std::path::Path,
609        bad_disk_path: &str,
610    ) -> StorageConfig {
611        StorageConfig::SlateDb(SlateDbStorageConfig {
612            path: "data".to_string(),
613            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
614                path: obj_dir.to_str().unwrap().to_string(),
615            }),
616            settings_path: None,
617            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
618                1024 * 1024,
619                4 * 1024 * 1024,
620                bad_disk_path.to_string(),
621            ))),
622        })
623    }
624
625    // Note: foyer panics (unwrap inside DirectFsDevice) on invalid disk paths
626    // rather than returning an error. We isolate the panic to the create_storage
627    // call via tokio::spawn so setup unwrap() failures don't mask regressions.
628    #[tokio::test]
629    async fn should_fail_when_config_cache_disk_path_is_invalid() {
630        let tmp = tempfile::tempdir().unwrap();
631        // Use a regular file as disk_path — foyer expects a directory
632        let bad_path = tmp.path().join("not-a-dir");
633        std::fs::write(&bad_path, b"").unwrap();
634
635        let config = config_with_invalid_block_cache_disk_path(
636            &tmp.path().join("obj"),
637            bad_path.to_str().unwrap(),
638        );
639
640        // Isolate the expected panic to just the build call
641        let handle = tokio::spawn(async move {
642            let _ = StorageBuilder::new(&config).await.unwrap().build().await;
643        });
644        let result = handle.await;
645        assert!(
646            result.is_err() && result.unwrap_err().is_panic(),
647            "expected foyer to panic on invalid disk_path"
648        );
649    }
650
651    #[tokio::test]
652    async fn should_fail_reader_when_config_cache_disk_path_is_invalid() {
653        let tmp = tempfile::tempdir().unwrap();
654        let bad_path = tmp.path().join("not-a-dir");
655        std::fs::write(&bad_path, b"").unwrap();
656
657        let slate_config = SlateDbStorageConfig {
658            path: "data".to_string(),
659            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
660                path: tmp.path().join("obj").to_str().unwrap().to_string(),
661            }),
662            settings_path: None,
663            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
664                1024 * 1024,
665                4 * 1024 * 1024,
666                bad_path.to_str().unwrap().to_string(),
667            ))),
668        };
669
670        // First open a writer (without cache) so the reader has a manifest
671        let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
672            block_cache: None,
673            ..slate_config.clone()
674        }))
675        .await
676        .unwrap()
677        .build()
678        .await
679        .unwrap();
680        drop(writer);
681
682        // Isolate the expected panic to just the create_storage_read call
683        let handle = tokio::spawn(async move {
684            let _ = create_storage_read(
685                &StorageConfig::SlateDb(slate_config),
686                StorageReaderRuntime::new(),
687                StorageSemantics::new(),
688                slatedb::config::DbReaderOptions::default(),
689            )
690            .await;
691        });
692        let result = handle.await;
693        assert!(
694            result.is_err() && result.unwrap_err().is_panic(),
695            "expected foyer to panic on invalid disk_path for reader"
696        );
697    }
698
699    #[tokio::test]
700    async fn reader_runtime_cache_should_take_precedence_over_config_cache() {
701        let tmp = tempfile::tempdir().unwrap();
702        let bad_path = tmp.path().join("not-a-dir");
703        std::fs::write(&bad_path, b"").unwrap();
704
705        let slate_config = SlateDbStorageConfig {
706            path: "data".to_string(),
707            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
708                path: tmp.path().join("obj").to_str().unwrap().to_string(),
709            }),
710            settings_path: None,
711            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
712                1024 * 1024,
713                4 * 1024 * 1024,
714                bad_path.to_str().unwrap().to_string(),
715            ))),
716        };
717
718        // First open a writer (without cache) so the reader has a manifest
719        let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
720            block_cache: None,
721            ..slate_config.clone()
722        }))
723        .await
724        .unwrap()
725        .build()
726        .await
727        .unwrap();
728        drop(writer);
729
730        // Runtime cache should bypass the invalid config cache
731        let runtime_cache = FoyerCache::new_with_opts(FoyerCacheOptions {
732            max_capacity: 1024 * 1024,
733            shards: 1,
734        });
735        let runtime = StorageReaderRuntime::new().with_block_cache(Arc::new(runtime_cache));
736
737        let result = create_storage_read(
738            &StorageConfig::SlateDb(slate_config),
739            runtime,
740            StorageSemantics::new(),
741            slatedb::config::DbReaderOptions::default(),
742        )
743        .await;
744
745        assert!(
746            result.is_ok(),
747            "reader runtime cache should take precedence, skipping invalid config cache"
748        );
749    }
750
751    #[tokio::test]
752    async fn should_return_none_when_no_block_cache_configured() {
753        let result = create_block_cache_from_config(&None).await.unwrap();
754        assert!(result.is_none());
755    }
756
757    #[tokio::test]
758    async fn should_work_without_block_cache() {
759        let tmp = tempfile::tempdir().unwrap();
760        let config = slatedb_config_with_local_dir(tmp.path());
761
762        let storage = StorageBuilder::new(&config).await.unwrap().build().await;
763
764        assert!(storage.is_ok());
765    }
766}