Skip to main content

common/storage/
factory.rs

1//! Storage factory for creating storage instances from configuration.
2//!
3//! This module provides factory functions for creating storage backends
4//! based on configuration, supporting both InMemory and SlateDB backends.
5
6use std::sync::Arc;
7
8use super::config::{BlockCacheConfig, ObjectStoreConfig, StorageConfig};
9use super::in_memory::InMemoryStorage;
10use super::metrics_recorder::{MetricsRsRecorder, MixtricsBridge as MetricsRsRegistry};
11use super::slate::{SlateDbStorage, SlateDbStorageReader};
12use super::{MergeOperator, Storage, StorageError, StorageRead, StorageResult};
13use slatedb::DbReader;
14use slatedb::config::Settings;
15pub use slatedb::db_cache::CachedEntry;
16pub use slatedb::db_cache::foyer::{FoyerCache, FoyerCacheOptions};
17pub use slatedb::db_cache::foyer_hybrid::FoyerHybridCache;
18use slatedb::db_cache::{CachedKey, DbCache};
19use slatedb::object_store::{self, ObjectStore};
20pub use slatedb::{CompactorBuilder, DbBuilder};
21use tracing::info;
22
23/// Handle to a foyer hybrid cache that we own and must close explicitly on
24/// shutdown. Cloneable because foyer's `HybridCache` is Arc-backed.
25///
26/// TODO(slatedb 0.13): remove this and the surrounding plumbing. SlateDB 0.13
27/// adds a `close()` hook to the `DbCache` trait and drives cache shutdown from
28/// `Db::close()` / `DbReader::close()`, so callers won't need to hold a side
29/// handle to the hybrid cache just to close it deterministically.
30pub(crate) type OwnedHybridCache = foyer::HybridCache<CachedKey, CachedEntry>;
31
32/// Block cache we constructed internally — keep the `HybridCache` handle so
33/// we can call `close().await` from `StorageRead::close()` instead of relying
34/// on foyer's Drop-based close, which races the runtime shutdown.
35struct ManagedBlockCache {
36    db_cache: Arc<dyn DbCache>,
37    hybrid: OwnedHybridCache,
38}
39
40/// Builder for creating storage instances from configuration.
41///
42/// `StorageBuilder` provides layered access to the underlying SlateDB
43/// [`DbBuilder`], replacing the previous `StorageRuntime` middleman.
44///
45/// # Example
46///
47/// ```rust,ignore
48/// use common::{StorageBuilder, StorageSemantics, create_object_store};
49/// use common::storage::factory::CompactorBuilder;
50///
51/// // Simple usage:
52/// let storage = StorageBuilder::new(&config.storage).await?
53///     .with_semantics(StorageSemantics::new().with_merge_operator(Arc::new(MyOp)))
54///     .build()
55///     .await?;
56///
57/// // Escape hatch for low-level SlateDB configuration:
58/// let storage = StorageBuilder::new(&config.storage).await?
59///     .map_slatedb(|db| {
60///         let obj_store = create_object_store(&slate_config.object_store).unwrap();
61///         db.with_compactor_builder(
62///             CompactorBuilder::new(slate_config.path.clone(), obj_store)
63///                 .with_runtime(compaction_runtime.handle().clone())
64///         )
65///     })
66///     .build()
67///     .await?;
68/// ```
69pub struct StorageBuilder {
70    inner: StorageBuilderInner,
71    semantics: StorageSemantics,
72    managed_cache: Option<OwnedHybridCache>,
73}
74
75enum StorageBuilderInner {
76    InMemory,
77    SlateDb(Box<DbBuilder<String>>),
78}
79
80impl StorageBuilder {
81    /// Creates a new `StorageBuilder` from a [`StorageConfig`].
82    ///
83    /// For SlateDB configs this creates a [`DbBuilder`] with the configured
84    /// path, object store, settings, and block cache (if configured). For
85    /// InMemory configs it stores a sentinel so that `build()` returns an
86    /// `InMemoryStorage`.
87    pub async fn new(config: &StorageConfig) -> StorageResult<Self> {
88        let mut managed_cache: Option<OwnedHybridCache> = None;
89        let inner = match config {
90            StorageConfig::InMemory => StorageBuilderInner::InMemory,
91            StorageConfig::SlateDb(slate_config) => {
92                let object_store = create_object_store(&slate_config.object_store)?;
93                let settings = match &slate_config.settings_path {
94                    Some(path) => Settings::from_file(path).map_err(|e| {
95                        StorageError::Storage(format!(
96                            "Failed to load SlateDB settings from {}: {}",
97                            path, e
98                        ))
99                    })?,
100                    None => Settings::load().unwrap_or_default(),
101                };
102                info!(
103                    "create slatedb storage with config: {:?}, settings: {:?}",
104                    slate_config, settings
105                );
106                let mut db_builder =
107                    DbBuilder::new(slate_config.path.clone(), object_store).with_settings(settings);
108                if let Some(managed) =
109                    create_block_cache_from_config(&slate_config.block_cache).await?
110                {
111                    db_builder = db_builder.with_db_cache(managed.db_cache);
112                    managed_cache = Some(managed.hybrid);
113                }
114                StorageBuilderInner::SlateDb(Box::new(db_builder))
115            }
116        };
117        Ok(Self {
118            inner,
119            semantics: StorageSemantics::default(),
120            managed_cache,
121        })
122    }
123
124    /// Sets the [`StorageSemantics`] (merge operator, etc.) for this builder.
125    pub fn with_semantics(mut self, semantics: StorageSemantics) -> Self {
126        self.semantics = semantics;
127        self
128    }
129
130    /// Maps over the underlying [`DbBuilder`] for low-level SlateDB configuration.
131    ///
132    /// This is the escape hatch for any SlateDB knob not exposed by
133    /// `StorageBuilder` itself (compactor builder, block cache, GC runtime, etc.).
134    /// Use `db.with_db_cache(...)` inside the closure to override the
135    /// config-driven block cache.
136    ///
137    /// For InMemory storage this is a no-op.
138    pub fn map_slatedb(mut self, f: impl FnOnce(DbBuilder<String>) -> DbBuilder<String>) -> Self {
139        if let StorageBuilderInner::SlateDb(db) = self.inner {
140            self.inner = StorageBuilderInner::SlateDb(Box::new(f(*db)));
141        }
142        self
143    }
144
145    /// Builds the storage instance.
146    ///
147    /// Applies semantics (merge operator) to the `DbBuilder` and calls `.build()`.
148    pub async fn build(self) -> StorageResult<Arc<dyn Storage>> {
149        match self.inner {
150            StorageBuilderInner::InMemory => {
151                let storage = match self.semantics.merge_operator {
152                    Some(op) => InMemoryStorage::with_merge_operator(op),
153                    None => InMemoryStorage::new(),
154                };
155                Ok(Arc::new(storage))
156            }
157            StorageBuilderInner::SlateDb(db_builder) => {
158                let mut db_builder = *db_builder;
159                db_builder = db_builder.with_metrics_recorder(Arc::new(MetricsRsRecorder));
160                if let Some(op) = self.semantics.merge_operator {
161                    let adapter = SlateDbStorage::merge_operator_adapter(op);
162                    db_builder = db_builder.with_merge_operator(Arc::new(adapter));
163                }
164                let db = db_builder.build().await.map_err(|e| {
165                    StorageError::Storage(format!("Failed to create SlateDB: {}", e))
166                })?;
167                Ok(Arc::new(SlateDbStorage::new_with_managed_cache(
168                    Arc::new(db),
169                    self.managed_cache,
170                )))
171            }
172        }
173    }
174}
175
176/// Runtime options for read-only storage instances.
177///
178/// This struct holds non-serializable runtime configuration for `DbReader`.
179/// Unlike `StorageBuilder`, it only exposes options relevant to readers
180/// (currently just block cache).
181#[derive(Default, Clone)]
182pub struct StorageReaderRuntime {
183    pub(crate) block_cache: Option<Arc<dyn DbCache>>,
184    pub(crate) object_store: Option<Arc<dyn ObjectStore>>,
185}
186
187impl StorageReaderRuntime {
188    /// Creates a new reader runtime with default options.
189    pub fn new() -> Self {
190        Self::default()
191    }
192
193    /// Sets a block cache for SlateDB reads.
194    ///
195    /// When provided, the `DbReader` will use this cache for SST block lookups,
196    /// reducing disk I/O on repeated reads. Use `FoyerCache::new_with_opts`
197    /// to control capacity.
198    ///
199    /// This option only affects SlateDB storage; it is ignored for in-memory storage.
200    pub fn with_block_cache(mut self, cache: Arc<dyn DbCache>) -> Self {
201        self.block_cache = Some(cache);
202        self
203    }
204
205    pub fn with_object_store(mut self, object_store: Arc<dyn ObjectStore>) -> Self {
206        self.object_store = Some(object_store);
207        self
208    }
209}
210
211/// Storage semantics configured by system crates.
212///
213/// This struct holds semantic concerns like merge operators that are specific
214/// to each system (log, timeseries, vector). End users should not use this
215/// directly - each system configures its own semantics internally.
216///
217/// # Internal Use Only
218///
219/// This type is public so that system crates (timeseries, vector, log) can
220/// access it, but it is not intended for end-user consumption.
221///
222/// # Example (for system crate implementers)
223///
224/// ```rust,ignore
225/// // In timeseries crate:
226/// let semantics = StorageSemantics::new()
227///     .with_merge_operator(Arc::new(TimeSeriesMergeOperator));
228/// let storage = StorageBuilder::new(&config).await?
229///     .with_semantics(semantics)
230///     .build()
231///     .await?;
232/// ```
233#[derive(Default)]
234pub struct StorageSemantics {
235    pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
236}
237
238impl StorageSemantics {
239    /// Creates new storage semantics with default values.
240    pub fn new() -> Self {
241        Self::default()
242    }
243
244    /// Sets the merge operator for merge operations.
245    ///
246    /// The merge operator defines how values are combined during compaction.
247    /// Each system (timeseries, vector) defines its own merge semantics.
248    pub fn with_merge_operator(mut self, op: Arc<dyn MergeOperator>) -> Self {
249        self.merge_operator = Some(op);
250        self
251    }
252}
253
254/// Creates an object store from configuration without initializing SlateDB.
255///
256/// This is useful for cleanup operations where you need to access the object store
257/// after the database has been closed.
258pub fn create_object_store(config: &ObjectStoreConfig) -> StorageResult<Arc<dyn ObjectStore>> {
259    match config {
260        ObjectStoreConfig::InMemory => Ok(Arc::new(object_store::memory::InMemory::new())),
261        ObjectStoreConfig::Aws(aws_config) => {
262            let store = object_store::aws::AmazonS3Builder::from_env()
263                .with_region(&aws_config.region)
264                .with_bucket_name(&aws_config.bucket)
265                .build()
266                .map_err(|e| {
267                    StorageError::Storage(format!("Failed to create AWS S3 store: {}", e))
268                })?;
269            Ok(Arc::new(store))
270        }
271        ObjectStoreConfig::Local(local_config) => {
272            std::fs::create_dir_all(&local_config.path).map_err(|e| {
273                StorageError::Storage(format!(
274                    "Failed to create storage directory '{}': {}",
275                    local_config.path, e
276                ))
277            })?;
278            let store = object_store::local::LocalFileSystem::new_with_prefix(&local_config.path)
279                .map_err(|e| {
280                StorageError::Storage(format!("Failed to create local filesystem store: {}", e))
281            })?;
282            Ok(Arc::new(store))
283        }
284    }
285}
286
287/// Creates a read-only storage instance based on configuration.
288///
289/// This function creates a storage backend that only supports read operations.
290/// For SlateDB, it uses `DbReader` which does not participate in fencing,
291/// allowing multiple readers to coexist with a single writer.
292///
293/// # Arguments
294///
295/// * `config` - The storage configuration specifying the backend type and settings.
296/// * `semantics` - System-specific semantics like merge operators.
297/// * `reader_options` - SlateDB reader options (e.g., manifest_poll_interval).
298///   These are passed directly to `DbReader::open` for SlateDB storage.
299///   Ignored for InMemory storage.
300///
301/// # Returns
302///
303/// Returns an `Arc<dyn StorageRead>` on success, or a `StorageError` on failure.
304pub async fn create_storage_read(
305    config: &StorageConfig,
306    runtime: StorageReaderRuntime,
307    semantics: StorageSemantics,
308    reader_options: slatedb::config::DbReaderOptions,
309) -> StorageResult<Arc<dyn StorageRead>> {
310    match config {
311        StorageConfig::InMemory => {
312            // InMemory has no fencing, reuse existing implementation
313            let storage = match semantics.merge_operator {
314                Some(op) => InMemoryStorage::with_merge_operator(op),
315                None => InMemoryStorage::new(),
316            };
317            Ok(Arc::new(storage))
318        }
319        StorageConfig::SlateDb(slate_config) => {
320            let object_store = if let Some(object_store) = &runtime.object_store {
321                object_store.clone()
322            } else {
323                create_object_store(&slate_config.object_store)?
324            };
325
326            let mut builder = DbReader::builder(slate_config.path.clone(), object_store)
327                .with_options(reader_options)
328                .with_metrics_recorder(Arc::new(MetricsRsRecorder));
329            if let Some(op) = semantics.merge_operator {
330                let adapter = SlateDbStorage::merge_operator_adapter(op);
331                builder = builder.with_merge_operator(Arc::new(adapter));
332            }
333            // Prefer runtime-provided cache, fall back to config. The
334            // runtime-provided cache is owned by the caller, so we don't hold
335            // a handle to close it on shutdown.
336            let mut managed_cache: Option<OwnedHybridCache> = None;
337            if let Some(cache) = runtime.block_cache {
338                builder = builder.with_db_cache(cache);
339            } else if let Some(managed) =
340                create_block_cache_from_config(&slate_config.block_cache).await?
341            {
342                builder = builder.with_db_cache(managed.db_cache);
343                managed_cache = Some(managed.hybrid);
344            }
345            let reader = builder.build().await.map_err(|e| {
346                StorageError::Storage(format!("Failed to create SlateDB reader: {}", e))
347            })?;
348            Ok(Arc::new(SlateDbStorageReader::new_with_managed_cache(
349                Arc::new(reader),
350                managed_cache,
351            )))
352        }
353    }
354}
355
356/// Creates a block cache from the serializable config, if present. Returns
357/// both the `DbCache` trait object handed to SlateDB and a `HybridCache`
358/// handle the caller keeps so it can close the cache deterministically on
359/// shutdown.
360async fn create_block_cache_from_config(
361    config: &Option<BlockCacheConfig>,
362) -> StorageResult<Option<ManagedBlockCache>> {
363    let Some(config) = config else {
364        return Ok(None);
365    };
366    match config {
367        BlockCacheConfig::FoyerHybrid(foyer_config) => {
368            use foyer::{
369                DirectFsDeviceOptions, Engine, HybridCacheBuilder, HybridCachePolicy,
370                LargeEngineOptions,
371            };
372
373            let memory_capacity = usize::try_from(foyer_config.memory_capacity).map_err(|_| {
374                StorageError::Storage(format!(
375                    "memory_capacity {} exceeds usize::MAX on this platform",
376                    foyer_config.memory_capacity
377                ))
378            })?;
379            let disk_capacity = usize::try_from(foyer_config.disk_capacity).map_err(|_| {
380                StorageError::Storage(format!(
381                    "disk_capacity {} exceeds usize::MAX on this platform",
382                    foyer_config.disk_capacity
383                ))
384            })?;
385
386            let buffer_pool_size = usize::try_from(foyer_config.effective_buffer_pool_size())
387                .map_err(|_| {
388                    StorageError::Storage(format!(
389                        "buffer_pool_size {} exceeds usize::MAX on this platform",
390                        foyer_config.effective_buffer_pool_size()
391                    ))
392                })?;
393            let submit_queue_size_threshold =
394                usize::try_from(foyer_config.submit_queue_size_threshold).map_err(|_| {
395                    StorageError::Storage(format!(
396                        "submit_queue_size_threshold {} exceeds usize::MAX on this platform",
397                        foyer_config.submit_queue_size_threshold
398                    ))
399                })?;
400
401            let policy = match foyer_config.write_policy {
402                super::config::FoyerWritePolicy::WriteOnInsertion => {
403                    HybridCachePolicy::WriteOnInsertion
404                }
405                super::config::FoyerWritePolicy::WriteOnEviction => {
406                    HybridCachePolicy::WriteOnEviction
407                }
408            };
409
410            let cache = HybridCacheBuilder::new()
411                .with_name("slatedb_block_cache")
412                .with_metrics_registry(Box::new(MetricsRsRegistry))
413                .with_policy(policy)
414                .memory(memory_capacity)
415                .with_weighter(|_, v: &CachedEntry| v.size())
416                .storage(Engine::Large(
417                    LargeEngineOptions::new()
418                        .with_flushers(foyer_config.flushers)
419                        .with_buffer_pool_size(buffer_pool_size)
420                        .with_submit_queue_size_threshold(submit_queue_size_threshold),
421                ))
422                .with_device_options(
423                    DirectFsDeviceOptions::new(&foyer_config.disk_path)
424                        .with_capacity(disk_capacity),
425                )
426                .build()
427                .await
428                .map_err(|e| {
429                    StorageError::Storage(format!("Failed to create hybrid cache: {}", e))
430                })?;
431
432            info!(
433                memory_mb = foyer_config.memory_capacity / (1024 * 1024),
434                disk_mb = foyer_config.disk_capacity / (1024 * 1024),
435                disk_path = %foyer_config.disk_path,
436                write_policy = ?foyer_config.write_policy,
437                flushers = foyer_config.flushers,
438                buffer_pool_mb = foyer_config.effective_buffer_pool_size() / (1024 * 1024),
439                submit_queue_threshold_mb =
440                    foyer_config.submit_queue_size_threshold / (1024 * 1024),
441                "hybrid block cache enabled"
442            );
443
444            let db_cache =
445                Arc::new(FoyerHybridCache::new_with_cache(cache.clone())) as Arc<dyn DbCache>;
446            Ok(Some(ManagedBlockCache {
447                db_cache,
448                hybrid: cache,
449            }))
450        }
451    }
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use crate::storage::config::{
458        FoyerHybridCacheConfig, LocalObjectStoreConfig, SlateDbStorageConfig,
459    };
460
461    fn foyer_cache_config(
462        memory_capacity: u64,
463        disk_capacity: u64,
464        disk_path: String,
465    ) -> FoyerHybridCacheConfig {
466        FoyerHybridCacheConfig {
467            memory_capacity,
468            disk_capacity,
469            disk_path,
470            write_policy: Default::default(),
471            flushers: 4,
472            buffer_pool_size: None,
473            submit_queue_size_threshold: 1024 * 1024 * 1024,
474        }
475    }
476
477    fn slatedb_config_with_local_dir(dir: &std::path::Path) -> StorageConfig {
478        StorageConfig::SlateDb(SlateDbStorageConfig {
479            path: "data".to_string(),
480            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
481                path: dir.to_str().unwrap().to_string(),
482            }),
483            settings_path: None,
484            block_cache: None,
485        })
486    }
487
488    #[tokio::test]
489    async fn should_create_storage_with_block_cache_from_config() {
490        let tmp = tempfile::tempdir().unwrap();
491        let cache_dir = tmp.path().join("block-cache");
492        std::fs::create_dir_all(&cache_dir).unwrap();
493
494        let config = StorageConfig::SlateDb(SlateDbStorageConfig {
495            path: "data".to_string(),
496            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
497                path: tmp.path().join("obj").to_str().unwrap().to_string(),
498            }),
499            settings_path: None,
500            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
501                1024 * 1024,
502                4 * 1024 * 1024,
503                cache_dir.to_str().unwrap().to_string(),
504            ))),
505        });
506
507        let storage = StorageBuilder::new(&config).await.unwrap().build().await;
508
509        assert!(
510            storage.is_ok(),
511            "expected config-driven block cache to work"
512        );
513    }
514
515    #[tokio::test]
516    async fn should_create_reader_with_block_cache_from_config() {
517        let tmp = tempfile::tempdir().unwrap();
518        let cache_dir = tmp.path().join("block-cache");
519        std::fs::create_dir_all(&cache_dir).unwrap();
520
521        let slate_config = SlateDbStorageConfig {
522            path: "data".to_string(),
523            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
524                path: tmp.path().join("obj").to_str().unwrap().to_string(),
525            }),
526            settings_path: None,
527            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
528                1024 * 1024,
529                4 * 1024 * 1024,
530                cache_dir.to_str().unwrap().to_string(),
531            ))),
532        };
533
534        // First open a writer so the reader has a manifest to read
535        let writer = StorageBuilder::new(&StorageConfig::SlateDb(slate_config.clone()))
536            .await
537            .unwrap()
538            .build()
539            .await
540            .unwrap();
541        // Close writer before opening reader (SlateDB fencing)
542        drop(writer);
543
544        let reader = create_storage_read(
545            &StorageConfig::SlateDb(slate_config),
546            StorageReaderRuntime::new(),
547            StorageSemantics::new(),
548            slatedb::config::DbReaderOptions::default(),
549        )
550        .await;
551
552        assert!(
553            reader.is_ok(),
554            "expected config-driven block cache on reader to work"
555        );
556    }
557
558    #[cfg(target_pointer_width = "32")]
559    #[tokio::test]
560    async fn should_error_when_capacity_exceeds_usize() {
561        // On 32-bit platforms, u64::MAX > usize::MAX triggers our overflow check.
562        // On 64-bit this is a no-op, so gate on 32-bit.
563        let config = BlockCacheConfig::FoyerHybrid(foyer_cache_config(
564            u64::MAX,
565            4 * 1024 * 1024,
566            "/tmp/unused".to_string(),
567        ));
568
569        let result = create_block_cache_from_config(&Some(config)).await;
570        assert!(result.is_err());
571    }
572
573    /// Helper: creates a SlateDb config whose block_cache disk_path is a regular file
574    /// (not a directory), which foyer deterministically rejects.
575    fn config_with_invalid_block_cache_disk_path(
576        obj_dir: &std::path::Path,
577        bad_disk_path: &str,
578    ) -> StorageConfig {
579        StorageConfig::SlateDb(SlateDbStorageConfig {
580            path: "data".to_string(),
581            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
582                path: obj_dir.to_str().unwrap().to_string(),
583            }),
584            settings_path: None,
585            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
586                1024 * 1024,
587                4 * 1024 * 1024,
588                bad_disk_path.to_string(),
589            ))),
590        })
591    }
592
593    // Note: foyer panics (unwrap inside DirectFsDevice) on invalid disk paths
594    // rather than returning an error. We isolate the panic to the create_storage
595    // call via tokio::spawn so setup unwrap() failures don't mask regressions.
596    #[tokio::test]
597    async fn should_fail_when_config_cache_disk_path_is_invalid() {
598        let tmp = tempfile::tempdir().unwrap();
599        // Use a regular file as disk_path — foyer expects a directory
600        let bad_path = tmp.path().join("not-a-dir");
601        std::fs::write(&bad_path, b"").unwrap();
602
603        let config = config_with_invalid_block_cache_disk_path(
604            &tmp.path().join("obj"),
605            bad_path.to_str().unwrap(),
606        );
607
608        // Isolate the expected panic to just the build call
609        let handle = tokio::spawn(async move {
610            let _ = StorageBuilder::new(&config).await.unwrap().build().await;
611        });
612        let result = handle.await;
613        assert!(
614            result.is_err() && result.unwrap_err().is_panic(),
615            "expected foyer to panic on invalid disk_path"
616        );
617    }
618
619    #[tokio::test]
620    async fn should_fail_reader_when_config_cache_disk_path_is_invalid() {
621        let tmp = tempfile::tempdir().unwrap();
622        let bad_path = tmp.path().join("not-a-dir");
623        std::fs::write(&bad_path, b"").unwrap();
624
625        let slate_config = SlateDbStorageConfig {
626            path: "data".to_string(),
627            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
628                path: tmp.path().join("obj").to_str().unwrap().to_string(),
629            }),
630            settings_path: None,
631            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
632                1024 * 1024,
633                4 * 1024 * 1024,
634                bad_path.to_str().unwrap().to_string(),
635            ))),
636        };
637
638        // First open a writer (without cache) so the reader has a manifest
639        let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
640            block_cache: None,
641            ..slate_config.clone()
642        }))
643        .await
644        .unwrap()
645        .build()
646        .await
647        .unwrap();
648        drop(writer);
649
650        // Isolate the expected panic to just the create_storage_read call
651        let handle = tokio::spawn(async move {
652            let _ = create_storage_read(
653                &StorageConfig::SlateDb(slate_config),
654                StorageReaderRuntime::new(),
655                StorageSemantics::new(),
656                slatedb::config::DbReaderOptions::default(),
657            )
658            .await;
659        });
660        let result = handle.await;
661        assert!(
662            result.is_err() && result.unwrap_err().is_panic(),
663            "expected foyer to panic on invalid disk_path for reader"
664        );
665    }
666
667    #[tokio::test]
668    async fn reader_runtime_cache_should_take_precedence_over_config_cache() {
669        let tmp = tempfile::tempdir().unwrap();
670        let bad_path = tmp.path().join("not-a-dir");
671        std::fs::write(&bad_path, b"").unwrap();
672
673        let slate_config = SlateDbStorageConfig {
674            path: "data".to_string(),
675            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
676                path: tmp.path().join("obj").to_str().unwrap().to_string(),
677            }),
678            settings_path: None,
679            block_cache: Some(BlockCacheConfig::FoyerHybrid(foyer_cache_config(
680                1024 * 1024,
681                4 * 1024 * 1024,
682                bad_path.to_str().unwrap().to_string(),
683            ))),
684        };
685
686        // First open a writer (without cache) so the reader has a manifest
687        let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
688            block_cache: None,
689            ..slate_config.clone()
690        }))
691        .await
692        .unwrap()
693        .build()
694        .await
695        .unwrap();
696        drop(writer);
697
698        // Runtime cache should bypass the invalid config cache
699        let runtime_cache = FoyerCache::new_with_opts(FoyerCacheOptions {
700            max_capacity: 1024 * 1024,
701            shards: 1,
702        });
703        let runtime = StorageReaderRuntime::new().with_block_cache(Arc::new(runtime_cache));
704
705        let result = create_storage_read(
706            &StorageConfig::SlateDb(slate_config),
707            runtime,
708            StorageSemantics::new(),
709            slatedb::config::DbReaderOptions::default(),
710        )
711        .await;
712
713        assert!(
714            result.is_ok(),
715            "reader runtime cache should take precedence, skipping invalid config cache"
716        );
717    }
718
719    #[tokio::test]
720    async fn should_return_none_when_no_block_cache_configured() {
721        let result = create_block_cache_from_config(&None).await.unwrap();
722        assert!(result.is_none());
723    }
724
725    #[tokio::test]
726    async fn should_work_without_block_cache() {
727        let tmp = tempfile::tempdir().unwrap();
728        let config = slatedb_config_with_local_dir(tmp.path());
729
730        let storage = StorageBuilder::new(&config).await.unwrap().build().await;
731
732        assert!(storage.is_ok());
733    }
734}