Skip to main content

common/storage/
factory.rs

1//! Storage factory for creating storage instances from configuration.
2//!
3//! This module provides factory functions for creating storage backends
4//! based on configuration, supporting both InMemory and SlateDB backends.
5
6use std::sync::Arc;
7
8use super::config::{BlockCacheConfig, ObjectStoreConfig, StorageConfig};
9use super::in_memory::InMemoryStorage;
10use super::metrics_recorder::{MetricsRsRecorder, MixtricsBridge as MetricsRsRegistry};
11use super::slate::{SlateDbStorage, SlateDbStorageReader};
12use super::{MergeOperator, Storage, StorageError, StorageRead, StorageResult};
13use slatedb::DbReader;
14use slatedb::config::Settings;
15pub use slatedb::db_cache::CachedEntry;
16pub use slatedb::db_cache::foyer::{FoyerCache, FoyerCacheOptions};
17pub use slatedb::db_cache::foyer_hybrid::FoyerHybridCache;
18use slatedb::db_cache::{CachedKey, DbCache};
19use slatedb::object_store::{self, ObjectStore};
20pub use slatedb::{CompactorBuilder, DbBuilder};
21use tracing::info;
22use uuid::Uuid;
23
24/// Handle to a foyer hybrid cache that we own and must close explicitly on
25/// shutdown. Cloneable because foyer's `HybridCache` is Arc-backed.
26///
27/// TODO(slatedb 0.13): remove this and the surrounding plumbing. SlateDB 0.13
28/// adds a `close()` hook to the `DbCache` trait and drives cache shutdown from
29/// `Db::close()` / `DbReader::close()`, so callers won't need to hold a side
30/// handle to the hybrid cache just to close it deterministically.
31pub(crate) type OwnedHybridCache = foyer::HybridCache<CachedKey, CachedEntry>;
32
33/// Block cache we constructed internally — keep the `HybridCache` handle so
34/// we can call `close().await` from `StorageRead::close()` instead of relying
35/// on foyer's Drop-based close, which races the runtime shutdown.
36struct ManagedBlockCache {
37    db_cache: Arc<dyn DbCache>,
38    hybrid: OwnedHybridCache,
39}
40
41/// Builder for creating storage instances from configuration.
42///
43/// `StorageBuilder` provides layered access to the underlying SlateDB
44/// [`DbBuilder`], replacing the previous `StorageRuntime` middleman.
45///
46/// # Example
47///
48/// ```rust,ignore
49/// use common::{StorageBuilder, StorageSemantics, create_object_store};
50/// use common::storage::factory::CompactorBuilder;
51///
52/// // Simple usage:
53/// let storage = StorageBuilder::new(&config.storage).await?
54///     .with_semantics(StorageSemantics::new().with_merge_operator(Arc::new(MyOp)))
55///     .build()
56///     .await?;
57///
58/// // Escape hatch for low-level SlateDB configuration:
59/// let storage = StorageBuilder::new(&config.storage).await?
60///     .map_slatedb(|db| {
61///         let obj_store = create_object_store(&slate_config.object_store).unwrap();
62///         db.with_compactor_builder(
63///             CompactorBuilder::new(slate_config.path.clone(), obj_store)
64///                 .with_runtime(compaction_runtime.handle().clone())
65///         )
66///     })
67///     .build()
68///     .await?;
69/// ```
70pub struct StorageBuilder {
71    inner: StorageBuilderInner,
72    semantics: StorageSemantics,
73    managed_cache: Option<OwnedHybridCache>,
74}
75
76enum StorageBuilderInner {
77    InMemory,
78    SlateDb(Box<DbBuilder<String>>),
79}
80
81impl StorageBuilder {
82    /// Creates a new `StorageBuilder` from a [`StorageConfig`].
83    ///
84    /// For SlateDB configs this creates a [`DbBuilder`] with the configured
85    /// path, object store, settings, and block cache (if configured). For
86    /// InMemory configs it stores a sentinel so that `build()` returns an
87    /// `InMemoryStorage`.
88    pub async fn new(config: &StorageConfig) -> StorageResult<Self> {
89        let mut managed_cache: Option<OwnedHybridCache> = None;
90        let inner = match config {
91            StorageConfig::InMemory => StorageBuilderInner::InMemory,
92            StorageConfig::SlateDb(slate_config) => {
93                let object_store = create_object_store(&slate_config.object_store)?;
94                let settings = match &slate_config.settings_path {
95                    Some(path) => Settings::from_file(path).map_err(|e| {
96                        StorageError::Storage(format!(
97                            "Failed to load SlateDB settings from {}: {}",
98                            path, e
99                        ))
100                    })?,
101                    None => Settings::load().unwrap_or_default(),
102                };
103                info!(
104                    "create slatedb storage with config: {:?}, settings: {:?}",
105                    slate_config, settings
106                );
107                let mut db_builder =
108                    DbBuilder::new(slate_config.path.clone(), object_store).with_settings(settings);
109                if let Some(managed) =
110                    create_block_cache_from_config(&slate_config.block_cache).await?
111                {
112                    db_builder = db_builder.with_db_cache(managed.db_cache);
113                    managed_cache = Some(managed.hybrid);
114                }
115                StorageBuilderInner::SlateDb(Box::new(db_builder))
116            }
117        };
118        Ok(Self {
119            inner,
120            semantics: StorageSemantics::default(),
121            managed_cache,
122        })
123    }
124
125    /// Sets the [`StorageSemantics`] (merge operator, etc.) for this builder.
126    pub fn with_semantics(mut self, semantics: StorageSemantics) -> Self {
127        self.semantics = semantics;
128        self
129    }
130
131    /// Maps over the underlying [`DbBuilder`] for low-level SlateDB configuration.
132    ///
133    /// This is the escape hatch for any SlateDB knob not exposed by
134    /// `StorageBuilder` itself (compactor builder, block cache, GC runtime, etc.).
135    /// Use `db.with_db_cache(...)` inside the closure to override the
136    /// config-driven block cache.
137    ///
138    /// For InMemory storage this is a no-op.
139    pub fn map_slatedb(mut self, f: impl FnOnce(DbBuilder<String>) -> DbBuilder<String>) -> Self {
140        if let StorageBuilderInner::SlateDb(db) = self.inner {
141            self.inner = StorageBuilderInner::SlateDb(Box::new(f(*db)));
142        }
143        self
144    }
145
146    /// Builds the storage instance.
147    ///
148    /// Applies semantics (merge operator) to the `DbBuilder` and calls `.build()`.
149    pub async fn build(self) -> StorageResult<Arc<dyn Storage>> {
150        match self.inner {
151            StorageBuilderInner::InMemory => {
152                let storage = match self.semantics.merge_operator {
153                    Some(op) => InMemoryStorage::with_merge_operator(op),
154                    None => InMemoryStorage::new(),
155                };
156                Ok(Arc::new(storage))
157            }
158            StorageBuilderInner::SlateDb(db_builder) => {
159                let mut db_builder = *db_builder;
160                db_builder = db_builder.with_metrics_recorder(Arc::new(MetricsRsRecorder));
161                if let Some(op) = self.semantics.merge_operator {
162                    let adapter = SlateDbStorage::merge_operator_adapter(op);
163                    db_builder = db_builder.with_merge_operator(Arc::new(adapter));
164                }
165                let db = db_builder.build().await.map_err(|e| {
166                    StorageError::Storage(format!("Failed to create SlateDB: {}", e))
167                })?;
168                Ok(Arc::new(SlateDbStorage::new_with_managed_cache(
169                    Arc::new(db),
170                    self.managed_cache,
171                )))
172            }
173        }
174    }
175}
176
177/// Runtime options for read-only storage instances.
178///
179/// This struct holds non-serializable runtime configuration for `DbReader`.
180/// Unlike `StorageBuilder`, it only exposes options relevant to readers
181/// (currently just block cache).
182#[derive(Default, Clone)]
183pub struct StorageReaderRuntime {
184    pub(crate) block_cache: Option<Arc<dyn DbCache>>,
185    pub(crate) object_store: Option<Arc<dyn ObjectStore>>,
186    pub(crate) checkpoint_id: Option<Uuid>,
187}
188
189impl StorageReaderRuntime {
190    /// Creates a new reader runtime with default options.
191    pub fn new() -> Self {
192        Self::default()
193    }
194
195    /// Sets a block cache for SlateDB reads.
196    ///
197    /// When provided, the `DbReader` will use this cache for SST block lookups,
198    /// reducing disk I/O on repeated reads. Use `FoyerCache::new_with_opts`
199    /// to control capacity.
200    ///
201    /// This option only affects SlateDB storage; it is ignored for in-memory storage.
202    pub fn with_block_cache(mut self, cache: Arc<dyn DbCache>) -> Self {
203        self.block_cache = Some(cache);
204        self
205    }
206
207    pub fn with_object_store(mut self, object_store: Arc<dyn ObjectStore>) -> Self {
208        self.object_store = Some(object_store);
209        self
210    }
211
212    /// Pins this reader to a specific SlateDB checkpoint.
213    ///
214    /// When set, the reader serves a consistent view of the database as of
215    /// the checkpoint and does not advance with newer writes. The checkpoint
216    /// must already exist in storage (typically created via
217    /// [`crate::Storage::create_checkpoint`]). Only meaningful for SlateDB
218    /// storage.
219    pub fn with_checkpoint_id(mut self, id: Uuid) -> Self {
220        self.checkpoint_id = Some(id);
221        self
222    }
223}
224
225/// Storage semantics configured by system crates.
226///
227/// This struct holds semantic concerns like merge operators that are specific
228/// to each system (log, timeseries, vector). End users should not use this
229/// directly - each system configures its own semantics internally.
230///
231/// # Internal Use Only
232///
233/// This type is public so that system crates (timeseries, vector, log) can
234/// access it, but it is not intended for end-user consumption.
235///
236/// # Example (for system crate implementers)
237///
238/// ```rust,ignore
239/// // In timeseries crate:
240/// let semantics = StorageSemantics::new()
241///     .with_merge_operator(Arc::new(TimeSeriesMergeOperator));
242/// let storage = StorageBuilder::new(&config).await?
243///     .with_semantics(semantics)
244///     .build()
245///     .await?;
246/// ```
247#[derive(Default)]
248pub struct StorageSemantics {
249    pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
250}
251
252impl StorageSemantics {
253    /// Creates new storage semantics with default values.
254    pub fn new() -> Self {
255        Self::default()
256    }
257
258    /// Sets the merge operator for merge operations.
259    ///
260    /// The merge operator defines how values are combined during compaction.
261    /// Each system (timeseries, vector) defines its own merge semantics.
262    pub fn with_merge_operator(mut self, op: Arc<dyn MergeOperator>) -> Self {
263        self.merge_operator = Some(op);
264        self
265    }
266}
267
268/// Creates an object store from configuration without initializing SlateDB.
269///
270/// This is useful for cleanup operations where you need to access the object store
271/// after the database has been closed.
272pub fn create_object_store(config: &ObjectStoreConfig) -> StorageResult<Arc<dyn ObjectStore>> {
273    match config {
274        ObjectStoreConfig::InMemory => Ok(Arc::new(object_store::memory::InMemory::new())),
275        ObjectStoreConfig::Aws(aws_config) => {
276            let store = object_store::aws::AmazonS3Builder::from_env()
277                .with_region(&aws_config.region)
278                .with_bucket_name(&aws_config.bucket)
279                .build()
280                .map_err(|e| {
281                    StorageError::Storage(format!("Failed to create AWS S3 store: {}", e))
282                })?;
283            Ok(Arc::new(store))
284        }
285        ObjectStoreConfig::Local(local_config) => {
286            std::fs::create_dir_all(&local_config.path).map_err(|e| {
287                StorageError::Storage(format!(
288                    "Failed to create storage directory '{}': {}",
289                    local_config.path, e
290                ))
291            })?;
292            let store = object_store::local::LocalFileSystem::new_with_prefix(&local_config.path)
293                .map_err(|e| {
294                StorageError::Storage(format!("Failed to create local filesystem store: {}", e))
295            })?;
296            Ok(Arc::new(store))
297        }
298    }
299}
300
301/// Creates a read-only storage instance based on configuration.
302///
303/// This function creates a storage backend that only supports read operations.
304/// For SlateDB, it uses `DbReader` which does not participate in fencing,
305/// allowing multiple readers to coexist with a single writer.
306///
307/// # Arguments
308///
309/// * `config` - The storage configuration specifying the backend type and settings.
310/// * `semantics` - System-specific semantics like merge operators.
311/// * `reader_options` - SlateDB reader options (e.g., manifest_poll_interval).
312///   These are passed directly to `DbReader::open` for SlateDB storage.
313///   Ignored for InMemory storage.
314///
315/// # Returns
316///
317/// Returns an `Arc<dyn StorageRead>` on success, or a `StorageError` on failure.
318pub async fn create_storage_read(
319    config: &StorageConfig,
320    runtime: StorageReaderRuntime,
321    semantics: StorageSemantics,
322    reader_options: slatedb::config::DbReaderOptions,
323) -> StorageResult<Arc<dyn StorageRead>> {
324    match config {
325        StorageConfig::InMemory => {
326            // InMemory has no fencing, reuse existing implementation
327            let storage = match semantics.merge_operator {
328                Some(op) => InMemoryStorage::with_merge_operator(op),
329                None => InMemoryStorage::new(),
330            };
331            Ok(Arc::new(storage))
332        }
333        StorageConfig::SlateDb(slate_config) => {
334            let object_store = if let Some(object_store) = &runtime.object_store {
335                object_store.clone()
336            } else {
337                create_object_store(&slate_config.object_store)?
338            };
339
340            let mut builder = DbReader::builder(slate_config.path.clone(), object_store)
341                .with_options(reader_options)
342                .with_metrics_recorder(Arc::new(MetricsRsRecorder));
343            if let Some(checkpoint_id) = runtime.checkpoint_id {
344                builder = builder.with_checkpoint_id(checkpoint_id);
345            }
346            if let Some(op) = semantics.merge_operator {
347                let adapter = SlateDbStorage::merge_operator_adapter(op);
348                builder = builder.with_merge_operator(Arc::new(adapter));
349            }
350            // Prefer runtime-provided cache, fall back to config. The
351            // runtime-provided cache is owned by the caller, so we don't hold
352            // a handle to close it on shutdown.
353            let mut managed_cache: Option<OwnedHybridCache> = None;
354            if let Some(cache) = runtime.block_cache {
355                builder = builder.with_db_cache(cache);
356            } else if let Some(managed) =
357                create_block_cache_from_config(&slate_config.block_cache).await?
358            {
359                builder = builder.with_db_cache(managed.db_cache);
360                managed_cache = Some(managed.hybrid);
361            }
362            let reader = builder.build().await.map_err(|e| {
363                StorageError::Storage(format!("Failed to create SlateDB reader: {}", e))
364            })?;
365            Ok(Arc::new(SlateDbStorageReader::new_with_managed_cache(
366                Arc::new(reader),
367                managed_cache,
368            )))
369        }
370    }
371}
372
373/// Creates a block cache from the serializable config, if present. Returns
374/// both the `DbCache` trait object handed to SlateDB and a `HybridCache`
375/// handle the caller keeps so it can close the cache deterministically on
376/// shutdown.
377async fn create_block_cache_from_config(
378    config: &Option<BlockCacheConfig>,
379) -> StorageResult<Option<ManagedBlockCache>> {
380    let Some(config) = config else {
381        return Ok(None);
382    };
383    match config {
384        BlockCacheConfig::FoyerHybrid(foyer_config) => {
385            use foyer::{
386                DirectFsDeviceOptions, Engine, HybridCacheBuilder, HybridCachePolicy,
387                LargeEngineOptions,
388            };
389
390            let memory_capacity = usize::try_from(foyer_config.memory_capacity).map_err(|_| {
391                StorageError::Storage(format!(
392                    "memory_capacity {} exceeds usize::MAX on this platform",
393                    foyer_config.memory_capacity
394                ))
395            })?;
396            let disk_capacity = usize::try_from(foyer_config.disk_capacity).map_err(|_| {
397                StorageError::Storage(format!(
398                    "disk_capacity {} exceeds usize::MAX on this platform",
399                    foyer_config.disk_capacity
400                ))
401            })?;
402
403            let buffer_pool_size = usize::try_from(foyer_config.effective_buffer_pool_size())
404                .map_err(|_| {
405                    StorageError::Storage(format!(
406                        "buffer_pool_size {} exceeds usize::MAX on this platform",
407                        foyer_config.effective_buffer_pool_size()
408                    ))
409                })?;
410            let submit_queue_size_threshold =
411                usize::try_from(foyer_config.submit_queue_size_threshold).map_err(|_| {
412                    StorageError::Storage(format!(
413                        "submit_queue_size_threshold {} exceeds usize::MAX on this platform",
414                        foyer_config.submit_queue_size_threshold
415                    ))
416                })?;
417
418            let policy = match foyer_config.write_policy {
419                super::config::FoyerWritePolicy::WriteOnInsertion => {
420                    HybridCachePolicy::WriteOnInsertion
421                }
422                super::config::FoyerWritePolicy::WriteOnEviction => {
423                    HybridCachePolicy::WriteOnEviction
424                }
425            };
426
427            let cache = HybridCacheBuilder::new()
428                .with_name("slatedb_block_cache")
429                .with_metrics_registry(Box::new(MetricsRsRegistry))
430                .with_policy(policy)
431                .memory(memory_capacity)
432                .with_weighter(|_, v: &CachedEntry| v.size())
433                .storage(Engine::Large(
434                    LargeEngineOptions::new()
435                        .with_flushers(foyer_config.flushers)
436                        .with_buffer_pool_size(buffer_pool_size)
437                        .with_submit_queue_size_threshold(submit_queue_size_threshold),
438                ))
439                .with_device_options(
440                    DirectFsDeviceOptions::new(&foyer_config.disk_path)
441                        .with_capacity(disk_capacity),
442                )
443                .build()
444                .await
445                .map_err(|e| {
446                    StorageError::Storage(format!("Failed to create hybrid cache: {}", e))
447                })?;
448
449            info!(
450                memory_mb = foyer_config.memory_capacity / (1024 * 1024),
451                disk_mb = foyer_config.disk_capacity / (1024 * 1024),
452                disk_path = %foyer_config.disk_path,
453                write_policy = ?foyer_config.write_policy,
454                flushers = foyer_config.flushers,
455                buffer_pool_mb = foyer_config.effective_buffer_pool_size() / (1024 * 1024),
456                submit_queue_threshold_mb =
457                    foyer_config.submit_queue_size_threshold / (1024 * 1024),
458                "hybrid block cache enabled"
459            );
460
461            let db_cache =
462                Arc::new(FoyerHybridCache::new_with_cache(cache.clone())) as Arc<dyn DbCache>;
463            Ok(Some(ManagedBlockCache {
464                db_cache,
465                hybrid: cache,
466            }))
467        }
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474    use crate::storage::config::{
475        FoyerHybridCacheConfig, LocalObjectStoreConfig, SlateDbStorageConfig,
476    };
477
478    fn foyer_cache_config(
479        memory_capacity: u64,
480        disk_capacity: u64,
481        disk_path: String,
482    ) -> FoyerHybridCacheConfig {
483        FoyerHybridCacheConfig {
484            memory_capacity,
485            disk_capacity,
486            disk_path,
487            write_policy: Default::default(),
488            flushers: 4,
489            buffer_pool_size: None,
490            submit_queue_size_threshold: 1024 * 1024 * 1024,
491        }
492    }
493
494    fn slatedb_config_with_local_dir(dir: &std::path::Path) -> StorageConfig {
495        StorageConfig::SlateDb(SlateDbStorageConfig {
496            path: "data".to_string(),
497            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
498                path: dir.to_str().unwrap().to_string(),
499            }),
500            settings_path: None,
501            block_cache: None,
502        })
503    }
504
505    #[tokio::test]
506    async fn should_create_storage_with_block_cache_from_config() {
507        let tmp = tempfile::tempdir().unwrap();
508        let cache_dir = tmp.path().join("block-cache");
509        std::fs::create_dir_all(&cache_dir).unwrap();
510
511        let config = StorageConfig::SlateDb(SlateDbStorageConfig {
512            path: "data".to_string(),
513            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
514                path: tmp.path().join("obj").to_str().unwrap().to_string(),
515            }),
516            settings_path: None,
517            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
518                1024 * 1024,
519                4 * 1024 * 1024,
520                cache_dir.to_str().unwrap().to_string(),
521            ))),
522        });
523
524        let storage = StorageBuilder::new(&config).await.unwrap().build().await;
525
526        assert!(
527            storage.is_ok(),
528            "expected config-driven block cache to work"
529        );
530    }
531
532    #[tokio::test]
533    async fn should_create_reader_with_block_cache_from_config() {
534        let tmp = tempfile::tempdir().unwrap();
535        let cache_dir = tmp.path().join("block-cache");
536        std::fs::create_dir_all(&cache_dir).unwrap();
537
538        let slate_config = SlateDbStorageConfig {
539            path: "data".to_string(),
540            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
541                path: tmp.path().join("obj").to_str().unwrap().to_string(),
542            }),
543            settings_path: None,
544            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
545                1024 * 1024,
546                4 * 1024 * 1024,
547                cache_dir.to_str().unwrap().to_string(),
548            ))),
549        };
550
551        // First open a writer so the reader has a manifest to read
552        let writer = StorageBuilder::new(&StorageConfig::SlateDb(slate_config.clone()))
553            .await
554            .unwrap()
555            .build()
556            .await
557            .unwrap();
558        // Close writer before opening reader (SlateDB fencing)
559        drop(writer);
560
561        let reader = create_storage_read(
562            &StorageConfig::SlateDb(slate_config),
563            StorageReaderRuntime::new(),
564            StorageSemantics::new(),
565            slatedb::config::DbReaderOptions::default(),
566        )
567        .await;
568
569        assert!(
570            reader.is_ok(),
571            "expected config-driven block cache on reader to work"
572        );
573    }
574
575    #[cfg(target_pointer_width = "32")]
576    #[tokio::test]
577    async fn should_error_when_capacity_exceeds_usize() {
578        // On 32-bit platforms, u64::MAX > usize::MAX triggers our overflow check.
579        // On 64-bit this is a no-op, so gate on 32-bit.
580        let config = BlockCacheConfig::FoyerHybrid(foyer_cache_config(
581            u64::MAX,
582            4 * 1024 * 1024,
583            "/tmp/unused".to_string(),
584        ));
585
586        let result = create_block_cache_from_config(&Some(config)).await;
587        assert!(result.is_err());
588    }
589
590    /// Helper: creates a SlateDb config whose block_cache disk_path is a regular file
591    /// (not a directory), which foyer deterministically rejects.
592    fn config_with_invalid_block_cache_disk_path(
593        obj_dir: &std::path::Path,
594        bad_disk_path: &str,
595    ) -> StorageConfig {
596        StorageConfig::SlateDb(SlateDbStorageConfig {
597            path: "data".to_string(),
598            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
599                path: obj_dir.to_str().unwrap().to_string(),
600            }),
601            settings_path: None,
602            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
603                1024 * 1024,
604                4 * 1024 * 1024,
605                bad_disk_path.to_string(),
606            ))),
607        })
608    }
609
610    // Note: foyer panics (unwrap inside DirectFsDevice) on invalid disk paths
611    // rather than returning an error. We isolate the panic to the create_storage
612    // call via tokio::spawn so setup unwrap() failures don't mask regressions.
613    #[tokio::test]
614    async fn should_fail_when_config_cache_disk_path_is_invalid() {
615        let tmp = tempfile::tempdir().unwrap();
616        // Use a regular file as disk_path — foyer expects a directory
617        let bad_path = tmp.path().join("not-a-dir");
618        std::fs::write(&bad_path, b"").unwrap();
619
620        let config = config_with_invalid_block_cache_disk_path(
621            &tmp.path().join("obj"),
622            bad_path.to_str().unwrap(),
623        );
624
625        // Isolate the expected panic to just the build call
626        let handle = tokio::spawn(async move {
627            let _ = StorageBuilder::new(&config).await.unwrap().build().await;
628        });
629        let result = handle.await;
630        assert!(
631            result.is_err() && result.unwrap_err().is_panic(),
632            "expected foyer to panic on invalid disk_path"
633        );
634    }
635
636    #[tokio::test]
637    async fn should_fail_reader_when_config_cache_disk_path_is_invalid() {
638        let tmp = tempfile::tempdir().unwrap();
639        let bad_path = tmp.path().join("not-a-dir");
640        std::fs::write(&bad_path, b"").unwrap();
641
642        let slate_config = SlateDbStorageConfig {
643            path: "data".to_string(),
644            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
645                path: tmp.path().join("obj").to_str().unwrap().to_string(),
646            }),
647            settings_path: None,
648            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
649                1024 * 1024,
650                4 * 1024 * 1024,
651                bad_path.to_str().unwrap().to_string(),
652            ))),
653        };
654
655        // First open a writer (without cache) so the reader has a manifest
656        let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
657            block_cache: None,
658            ..slate_config.clone()
659        }))
660        .await
661        .unwrap()
662        .build()
663        .await
664        .unwrap();
665        drop(writer);
666
667        // Isolate the expected panic to just the create_storage_read call
668        let handle = tokio::spawn(async move {
669            let _ = create_storage_read(
670                &StorageConfig::SlateDb(slate_config),
671                StorageReaderRuntime::new(),
672                StorageSemantics::new(),
673                slatedb::config::DbReaderOptions::default(),
674            )
675            .await;
676        });
677        let result = handle.await;
678        assert!(
679            result.is_err() && result.unwrap_err().is_panic(),
680            "expected foyer to panic on invalid disk_path for reader"
681        );
682    }
683
684    #[tokio::test]
685    async fn reader_runtime_cache_should_take_precedence_over_config_cache() {
686        let tmp = tempfile::tempdir().unwrap();
687        let bad_path = tmp.path().join("not-a-dir");
688        std::fs::write(&bad_path, b"").unwrap();
689
690        let slate_config = SlateDbStorageConfig {
691            path: "data".to_string(),
692            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
693                path: tmp.path().join("obj").to_str().unwrap().to_string(),
694            }),
695            settings_path: None,
696            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
697                1024 * 1024,
698                4 * 1024 * 1024,
699                bad_path.to_str().unwrap().to_string(),
700            ))),
701        };
702
703        // First open a writer (without cache) so the reader has a manifest
704        let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
705            block_cache: None,
706            ..slate_config.clone()
707        }))
708        .await
709        .unwrap()
710        .build()
711        .await
712        .unwrap();
713        drop(writer);
714
715        // Runtime cache should bypass the invalid config cache
716        let runtime_cache = FoyerCache::new_with_opts(FoyerCacheOptions {
717            max_capacity: 1024 * 1024,
718            shards: 1,
719        });
720        let runtime = StorageReaderRuntime::new().with_block_cache(Arc::new(runtime_cache));
721
722        let result = create_storage_read(
723            &StorageConfig::SlateDb(slate_config),
724            runtime,
725            StorageSemantics::new(),
726            slatedb::config::DbReaderOptions::default(),
727        )
728        .await;
729
730        assert!(
731            result.is_ok(),
732            "reader runtime cache should take precedence, skipping invalid config cache"
733        );
734    }
735
736    #[tokio::test]
737    async fn should_return_none_when_no_block_cache_configured() {
738        let result = create_block_cache_from_config(&None).await.unwrap();
739        assert!(result.is_none());
740    }
741
742    #[tokio::test]
743    async fn should_work_without_block_cache() {
744        let tmp = tempfile::tempdir().unwrap();
745        let config = slatedb_config_with_local_dir(tmp.path());
746
747        let storage = StorageBuilder::new(&config).await.unwrap().build().await;
748
749        assert!(storage.is_ok());
750    }
751}