Skip to main content

common/storage/
factory.rs

1//! Storage factory for creating storage instances from configuration.
2//!
3//! This module provides factory functions for creating storage backends
4//! based on configuration, supporting both InMemory and SlateDB backends.
5
6use std::sync::Arc;
7
8use super::config::{BlockCacheConfig, ObjectStoreConfig, StorageConfig};
9use super::in_memory::InMemoryStorage;
10use super::slate::{SlateDbStorage, SlateDbStorageReader};
11use super::{MergeOperator, Storage, StorageError, StorageRead, StorageResult};
12use slatedb::DbReader;
13use slatedb::config::Settings;
14pub use slatedb::db_cache::CachedEntry;
15use slatedb::db_cache::DbCache;
16pub use slatedb::db_cache::foyer::{FoyerCache, FoyerCacheOptions};
17pub use slatedb::db_cache::foyer_hybrid::FoyerHybridCache;
18use slatedb::object_store::{self, ObjectStore};
19pub use slatedb::{CompactorBuilder, DbBuilder};
20use tracing::info;
21
22/// Builder for creating storage instances from configuration.
23///
24/// `StorageBuilder` provides layered access to the underlying SlateDB
25/// [`DbBuilder`], replacing the previous `StorageRuntime` middleman.
26///
27/// # Example
28///
29/// ```rust,ignore
30/// use common::{StorageBuilder, StorageSemantics, create_object_store};
31/// use common::storage::factory::CompactorBuilder;
32///
33/// // Simple usage:
34/// let storage = StorageBuilder::new(&config.storage).await?
35///     .with_semantics(StorageSemantics::new().with_merge_operator(Arc::new(MyOp)))
36///     .build()
37///     .await?;
38///
39/// // Escape hatch for low-level SlateDB configuration:
40/// let storage = StorageBuilder::new(&config.storage).await?
41///     .map_slatedb(|db| {
42///         let obj_store = create_object_store(&slate_config.object_store).unwrap();
43///         db.with_compactor_builder(
44///             CompactorBuilder::new(slate_config.path.clone(), obj_store)
45///                 .with_runtime(compaction_runtime.handle().clone())
46///         )
47///     })
48///     .build()
49///     .await?;
50/// ```
51pub struct StorageBuilder {
52    inner: StorageBuilderInner,
53    semantics: StorageSemantics,
54}
55
56enum StorageBuilderInner {
57    InMemory,
58    SlateDb(Box<DbBuilder<String>>),
59}
60
61impl StorageBuilder {
62    /// Creates a new `StorageBuilder` from a [`StorageConfig`].
63    ///
64    /// For SlateDB configs this creates a [`DbBuilder`] with the configured
65    /// path, object store, settings, and block cache (if configured). For
66    /// InMemory configs it stores a sentinel so that `build()` returns an
67    /// `InMemoryStorage`.
68    pub async fn new(config: &StorageConfig) -> StorageResult<Self> {
69        let inner = match config {
70            StorageConfig::InMemory => StorageBuilderInner::InMemory,
71            StorageConfig::SlateDb(slate_config) => {
72                let object_store = create_object_store(&slate_config.object_store)?;
73                let settings = match &slate_config.settings_path {
74                    Some(path) => Settings::from_file(path).map_err(|e| {
75                        StorageError::Storage(format!(
76                            "Failed to load SlateDB settings from {}: {}",
77                            path, e
78                        ))
79                    })?,
80                    None => Settings::load().unwrap_or_default(),
81                };
82                info!(
83                    "create slatedb storage with config: {:?}, settings: {:?}",
84                    slate_config, settings
85                );
86                let mut db_builder =
87                    DbBuilder::new(slate_config.path.clone(), object_store).with_settings(settings);
88                if let Some(cache) =
89                    create_block_cache_from_config(&slate_config.block_cache).await?
90                {
91                    db_builder = db_builder.with_db_cache(cache);
92                }
93                StorageBuilderInner::SlateDb(Box::new(db_builder))
94            }
95        };
96        Ok(Self {
97            inner,
98            semantics: StorageSemantics::default(),
99        })
100    }
101
102    /// Sets the [`StorageSemantics`] (merge operator, etc.) for this builder.
103    pub fn with_semantics(mut self, semantics: StorageSemantics) -> Self {
104        self.semantics = semantics;
105        self
106    }
107
108    /// Maps over the underlying [`DbBuilder`] for low-level SlateDB configuration.
109    ///
110    /// This is the escape hatch for any SlateDB knob not exposed by
111    /// `StorageBuilder` itself (compactor builder, block cache, GC runtime, etc.).
112    /// Use `db.with_db_cache(...)` inside the closure to override the
113    /// config-driven block cache.
114    ///
115    /// For InMemory storage this is a no-op.
116    pub fn map_slatedb(mut self, f: impl FnOnce(DbBuilder<String>) -> DbBuilder<String>) -> Self {
117        if let StorageBuilderInner::SlateDb(db) = self.inner {
118            self.inner = StorageBuilderInner::SlateDb(Box::new(f(*db)));
119        }
120        self
121    }
122
123    /// Builds the storage instance.
124    ///
125    /// Applies semantics (merge operator) to the `DbBuilder` and calls `.build()`.
126    pub async fn build(self) -> StorageResult<Arc<dyn Storage>> {
127        match self.inner {
128            StorageBuilderInner::InMemory => {
129                let storage = match self.semantics.merge_operator {
130                    Some(op) => InMemoryStorage::with_merge_operator(op),
131                    None => InMemoryStorage::new(),
132                };
133                Ok(Arc::new(storage))
134            }
135            StorageBuilderInner::SlateDb(db_builder) => {
136                let mut db_builder = *db_builder;
137                if let Some(op) = self.semantics.merge_operator {
138                    let adapter = SlateDbStorage::merge_operator_adapter(op);
139                    db_builder = db_builder.with_merge_operator(Arc::new(adapter));
140                }
141                let db = db_builder.build().await.map_err(|e| {
142                    StorageError::Storage(format!("Failed to create SlateDB: {}", e))
143                })?;
144                Ok(Arc::new(SlateDbStorage::new(Arc::new(db))))
145            }
146        }
147    }
148}
149
150/// Runtime options for read-only storage instances.
151///
152/// This struct holds non-serializable runtime configuration for `DbReader`.
153/// Unlike `StorageBuilder`, it only exposes options relevant to readers
154/// (currently just block cache).
155#[derive(Default)]
156pub struct StorageReaderRuntime {
157    pub(crate) block_cache: Option<Arc<dyn DbCache>>,
158}
159
160impl StorageReaderRuntime {
161    /// Creates a new reader runtime with default options.
162    pub fn new() -> Self {
163        Self::default()
164    }
165
166    /// Sets a block cache for SlateDB reads.
167    ///
168    /// When provided, the `DbReader` will use this cache for SST block lookups,
169    /// reducing disk I/O on repeated reads. Use `FoyerCache::new_with_opts`
170    /// to control capacity.
171    ///
172    /// This option only affects SlateDB storage; it is ignored for in-memory storage.
173    pub fn with_block_cache(mut self, cache: Arc<dyn DbCache>) -> Self {
174        self.block_cache = Some(cache);
175        self
176    }
177}
178
179/// Storage semantics configured by system crates.
180///
181/// This struct holds semantic concerns like merge operators that are specific
182/// to each system (log, timeseries, vector). End users should not use this
183/// directly - each system configures its own semantics internally.
184///
185/// # Internal Use Only
186///
187/// This type is public so that system crates (timeseries, vector, log) can
188/// access it, but it is not intended for end-user consumption.
189///
190/// # Example (for system crate implementers)
191///
192/// ```rust,ignore
193/// // In timeseries crate:
194/// let semantics = StorageSemantics::new()
195///     .with_merge_operator(Arc::new(TimeSeriesMergeOperator));
196/// let storage = StorageBuilder::new(&config).await?
197///     .with_semantics(semantics)
198///     .build()
199///     .await?;
200/// ```
201#[derive(Default)]
202pub struct StorageSemantics {
203    pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
204}
205
206impl StorageSemantics {
207    /// Creates new storage semantics with default values.
208    pub fn new() -> Self {
209        Self::default()
210    }
211
212    /// Sets the merge operator for merge operations.
213    ///
214    /// The merge operator defines how values are combined during compaction.
215    /// Each system (timeseries, vector) defines its own merge semantics.
216    pub fn with_merge_operator(mut self, op: Arc<dyn MergeOperator>) -> Self {
217        self.merge_operator = Some(op);
218        self
219    }
220}
221
222/// Creates an object store from configuration without initializing SlateDB.
223///
224/// This is useful for cleanup operations where you need to access the object store
225/// after the database has been closed.
226pub fn create_object_store(config: &ObjectStoreConfig) -> StorageResult<Arc<dyn ObjectStore>> {
227    match config {
228        ObjectStoreConfig::InMemory => Ok(Arc::new(object_store::memory::InMemory::new())),
229        ObjectStoreConfig::Aws(aws_config) => {
230            let store = object_store::aws::AmazonS3Builder::from_env()
231                .with_region(&aws_config.region)
232                .with_bucket_name(&aws_config.bucket)
233                .build()
234                .map_err(|e| {
235                    StorageError::Storage(format!("Failed to create AWS S3 store: {}", e))
236                })?;
237            Ok(Arc::new(store))
238        }
239        ObjectStoreConfig::Local(local_config) => {
240            std::fs::create_dir_all(&local_config.path).map_err(|e| {
241                StorageError::Storage(format!(
242                    "Failed to create storage directory '{}': {}",
243                    local_config.path, e
244                ))
245            })?;
246            let store = object_store::local::LocalFileSystem::new_with_prefix(&local_config.path)
247                .map_err(|e| {
248                StorageError::Storage(format!("Failed to create local filesystem store: {}", e))
249            })?;
250            Ok(Arc::new(store))
251        }
252    }
253}
254
255/// Creates a read-only storage instance based on configuration.
256///
257/// This function creates a storage backend that only supports read operations.
258/// For SlateDB, it uses `DbReader` which does not participate in fencing,
259/// allowing multiple readers to coexist with a single writer.
260///
261/// # Arguments
262///
263/// * `config` - The storage configuration specifying the backend type and settings.
264/// * `semantics` - System-specific semantics like merge operators.
265/// * `reader_options` - SlateDB reader options (e.g., manifest_poll_interval).
266///   These are passed directly to `DbReader::open` for SlateDB storage.
267///   Ignored for InMemory storage.
268///
269/// # Returns
270///
271/// Returns an `Arc<dyn StorageRead>` on success, or a `StorageError` on failure.
272pub async fn create_storage_read(
273    config: &StorageConfig,
274    runtime: StorageReaderRuntime,
275    semantics: StorageSemantics,
276    reader_options: slatedb::config::DbReaderOptions,
277) -> StorageResult<Arc<dyn StorageRead>> {
278    match config {
279        StorageConfig::InMemory => {
280            // InMemory has no fencing, reuse existing implementation
281            let storage = match semantics.merge_operator {
282                Some(op) => InMemoryStorage::with_merge_operator(op),
283                None => InMemoryStorage::new(),
284            };
285            Ok(Arc::new(storage))
286        }
287        StorageConfig::SlateDb(slate_config) => {
288            let object_store = create_object_store(&slate_config.object_store)?;
289
290            let mut options = reader_options;
291            if let Some(op) = semantics.merge_operator {
292                let adapter = SlateDbStorage::merge_operator_adapter(op);
293                options.merge_operator = Some(Arc::new(adapter));
294            }
295            // Prefer runtime-provided cache, fall back to config
296            if let Some(cache) = runtime.block_cache {
297                options.block_cache = Some(cache);
298            } else if let Some(cache) =
299                create_block_cache_from_config(&slate_config.block_cache).await?
300            {
301                options.block_cache = Some(cache);
302            }
303            let reader = DbReader::open(
304                slate_config.path.clone(),
305                object_store,
306                None, // checkpoint_id - use latest state
307                options,
308            )
309            .await
310            .map_err(|e| {
311                StorageError::Storage(format!("Failed to create SlateDB reader: {}", e))
312            })?;
313            Ok(Arc::new(SlateDbStorageReader::new(Arc::new(reader))))
314        }
315    }
316}
317
318/// Creates a block cache from the serializable config, if present.
319async fn create_block_cache_from_config(
320    config: &Option<BlockCacheConfig>,
321) -> StorageResult<Option<Arc<dyn DbCache>>> {
322    let Some(config) = config else {
323        return Ok(None);
324    };
325    match config {
326        BlockCacheConfig::FoyerHybrid(foyer_config) => {
327            use foyer::{DirectFsDeviceOptions, Engine, HybridCacheBuilder};
328
329            let memory_capacity = usize::try_from(foyer_config.memory_capacity).map_err(|_| {
330                StorageError::Storage(format!(
331                    "memory_capacity {} exceeds usize::MAX on this platform",
332                    foyer_config.memory_capacity
333                ))
334            })?;
335            let disk_capacity = usize::try_from(foyer_config.disk_capacity).map_err(|_| {
336                StorageError::Storage(format!(
337                    "disk_capacity {} exceeds usize::MAX on this platform",
338                    foyer_config.disk_capacity
339                ))
340            })?;
341
342            let cache = HybridCacheBuilder::new()
343                .with_name("slatedb_block_cache")
344                .memory(memory_capacity)
345                .with_weighter(|_, v: &CachedEntry| v.size())
346                .storage(Engine::large())
347                .with_device_options(
348                    DirectFsDeviceOptions::new(&foyer_config.disk_path)
349                        .with_capacity(disk_capacity),
350                )
351                .build()
352                .await
353                .map_err(|e| {
354                    StorageError::Storage(format!("Failed to create hybrid cache: {}", e))
355                })?;
356
357            info!(
358                memory_mb = foyer_config.memory_capacity / (1024 * 1024),
359                disk_mb = foyer_config.disk_capacity / (1024 * 1024),
360                disk_path = %foyer_config.disk_path,
361                "hybrid block cache enabled"
362            );
363
364            Ok(Some(
365                Arc::new(FoyerHybridCache::new_with_cache(cache)) as Arc<dyn DbCache>
366            ))
367        }
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::storage::config::{
375        FoyerHybridCacheConfig, LocalObjectStoreConfig, SlateDbStorageConfig,
376    };
377
378    fn slatedb_config_with_local_dir(dir: &std::path::Path) -> StorageConfig {
379        StorageConfig::SlateDb(SlateDbStorageConfig {
380            path: "data".to_string(),
381            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
382                path: dir.to_str().unwrap().to_string(),
383            }),
384            settings_path: None,
385            block_cache: None,
386        })
387    }
388
389    #[tokio::test]
390    async fn should_create_storage_with_block_cache_from_config() {
391        let tmp = tempfile::tempdir().unwrap();
392        let cache_dir = tmp.path().join("block-cache");
393        std::fs::create_dir_all(&cache_dir).unwrap();
394
395        let config = StorageConfig::SlateDb(SlateDbStorageConfig {
396            path: "data".to_string(),
397            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
398                path: tmp.path().join("obj").to_str().unwrap().to_string(),
399            }),
400            settings_path: None,
401            block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
402                memory_capacity: 1024 * 1024,
403                disk_capacity: 4 * 1024 * 1024,
404                disk_path: cache_dir.to_str().unwrap().to_string(),
405            })),
406        });
407
408        let storage = StorageBuilder::new(&config).await.unwrap().build().await;
409
410        assert!(
411            storage.is_ok(),
412            "expected config-driven block cache to work"
413        );
414    }
415
416    #[tokio::test]
417    async fn should_create_reader_with_block_cache_from_config() {
418        let tmp = tempfile::tempdir().unwrap();
419        let cache_dir = tmp.path().join("block-cache");
420        std::fs::create_dir_all(&cache_dir).unwrap();
421
422        let slate_config = SlateDbStorageConfig {
423            path: "data".to_string(),
424            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
425                path: tmp.path().join("obj").to_str().unwrap().to_string(),
426            }),
427            settings_path: None,
428            block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
429                memory_capacity: 1024 * 1024,
430                disk_capacity: 4 * 1024 * 1024,
431                disk_path: cache_dir.to_str().unwrap().to_string(),
432            })),
433        };
434
435        // First open a writer so the reader has a manifest to read
436        let writer = StorageBuilder::new(&StorageConfig::SlateDb(slate_config.clone()))
437            .await
438            .unwrap()
439            .build()
440            .await
441            .unwrap();
442        // Close writer before opening reader (SlateDB fencing)
443        drop(writer);
444
445        let reader = create_storage_read(
446            &StorageConfig::SlateDb(slate_config),
447            StorageReaderRuntime::new(),
448            StorageSemantics::new(),
449            slatedb::config::DbReaderOptions::default(),
450        )
451        .await;
452
453        assert!(
454            reader.is_ok(),
455            "expected config-driven block cache on reader to work"
456        );
457    }
458
459    #[cfg(target_pointer_width = "32")]
460    #[tokio::test]
461    async fn should_error_when_capacity_exceeds_usize() {
462        // On 32-bit platforms, u64::MAX > usize::MAX triggers our overflow check.
463        // On 64-bit this is a no-op, so gate on 32-bit.
464        let config = BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
465            memory_capacity: u64::MAX,
466            disk_capacity: 4 * 1024 * 1024,
467            disk_path: "/tmp/unused".to_string(),
468        });
469
470        let result = create_block_cache_from_config(&Some(config)).await;
471        assert!(result.is_err());
472    }
473
474    /// Helper: creates a SlateDb config whose block_cache disk_path is a regular file
475    /// (not a directory), which foyer deterministically rejects.
476    fn config_with_invalid_block_cache_disk_path(
477        obj_dir: &std::path::Path,
478        bad_disk_path: &str,
479    ) -> StorageConfig {
480        StorageConfig::SlateDb(SlateDbStorageConfig {
481            path: "data".to_string(),
482            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
483                path: obj_dir.to_str().unwrap().to_string(),
484            }),
485            settings_path: None,
486            block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
487                memory_capacity: 1024 * 1024,
488                disk_capacity: 4 * 1024 * 1024,
489                disk_path: bad_disk_path.to_string(),
490            })),
491        })
492    }
493
494    // Note: foyer panics (unwrap inside DirectFsDevice) on invalid disk paths
495    // rather than returning an error. We isolate the panic to the create_storage
496    // call via tokio::spawn so setup unwrap() failures don't mask regressions.
497    #[tokio::test]
498    async fn should_fail_when_config_cache_disk_path_is_invalid() {
499        let tmp = tempfile::tempdir().unwrap();
500        // Use a regular file as disk_path — foyer expects a directory
501        let bad_path = tmp.path().join("not-a-dir");
502        std::fs::write(&bad_path, b"").unwrap();
503
504        let config = config_with_invalid_block_cache_disk_path(
505            &tmp.path().join("obj"),
506            bad_path.to_str().unwrap(),
507        );
508
509        // Isolate the expected panic to just the build call
510        let handle = tokio::spawn(async move {
511            let _ = StorageBuilder::new(&config).await.unwrap().build().await;
512        });
513        let result = handle.await;
514        assert!(
515            result.is_err() && result.unwrap_err().is_panic(),
516            "expected foyer to panic on invalid disk_path"
517        );
518    }
519
520    #[tokio::test]
521    async fn should_fail_reader_when_config_cache_disk_path_is_invalid() {
522        let tmp = tempfile::tempdir().unwrap();
523        let bad_path = tmp.path().join("not-a-dir");
524        std::fs::write(&bad_path, b"").unwrap();
525
526        let slate_config = SlateDbStorageConfig {
527            path: "data".to_string(),
528            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
529                path: tmp.path().join("obj").to_str().unwrap().to_string(),
530            }),
531            settings_path: None,
532            block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
533                memory_capacity: 1024 * 1024,
534                disk_capacity: 4 * 1024 * 1024,
535                disk_path: bad_path.to_str().unwrap().to_string(),
536            })),
537        };
538
539        // First open a writer (without cache) so the reader has a manifest
540        let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
541            block_cache: None,
542            ..slate_config.clone()
543        }))
544        .await
545        .unwrap()
546        .build()
547        .await
548        .unwrap();
549        drop(writer);
550
551        // Isolate the expected panic to just the create_storage_read call
552        let handle = tokio::spawn(async move {
553            let _ = create_storage_read(
554                &StorageConfig::SlateDb(slate_config),
555                StorageReaderRuntime::new(),
556                StorageSemantics::new(),
557                slatedb::config::DbReaderOptions::default(),
558            )
559            .await;
560        });
561        let result = handle.await;
562        assert!(
563            result.is_err() && result.unwrap_err().is_panic(),
564            "expected foyer to panic on invalid disk_path for reader"
565        );
566    }
567
568    #[tokio::test]
569    async fn reader_runtime_cache_should_take_precedence_over_config_cache() {
570        let tmp = tempfile::tempdir().unwrap();
571        let bad_path = tmp.path().join("not-a-dir");
572        std::fs::write(&bad_path, b"").unwrap();
573
574        let slate_config = SlateDbStorageConfig {
575            path: "data".to_string(),
576            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
577                path: tmp.path().join("obj").to_str().unwrap().to_string(),
578            }),
579            settings_path: None,
580            block_cache: Some(BlockCacheConfig::FoyerHybrid(FoyerHybridCacheConfig {
581                memory_capacity: 1024 * 1024,
582                disk_capacity: 4 * 1024 * 1024,
583                disk_path: bad_path.to_str().unwrap().to_string(),
584            })),
585        };
586
587        // First open a writer (without cache) so the reader has a manifest
588        let writer = StorageBuilder::new(&StorageConfig::SlateDb(SlateDbStorageConfig {
589            block_cache: None,
590            ..slate_config.clone()
591        }))
592        .await
593        .unwrap()
594        .build()
595        .await
596        .unwrap();
597        drop(writer);
598
599        // Runtime cache should bypass the invalid config cache
600        let runtime_cache = FoyerCache::new_with_opts(FoyerCacheOptions {
601            max_capacity: 1024 * 1024,
602            shards: 1,
603        });
604        let runtime = StorageReaderRuntime::new().with_block_cache(Arc::new(runtime_cache));
605
606        let result = create_storage_read(
607            &StorageConfig::SlateDb(slate_config),
608            runtime,
609            StorageSemantics::new(),
610            slatedb::config::DbReaderOptions::default(),
611        )
612        .await;
613
614        assert!(
615            result.is_ok(),
616            "reader runtime cache should take precedence, skipping invalid config cache"
617        );
618    }
619
620    #[tokio::test]
621    async fn should_return_none_when_no_block_cache_configured() {
622        let result = create_block_cache_from_config(&None).await.unwrap();
623        assert!(result.is_none());
624    }
625
626    #[tokio::test]
627    async fn should_work_without_block_cache() {
628        let tmp = tempfile::tempdir().unwrap();
629        let config = slatedb_config_with_local_dir(tmp.path());
630
631        let storage = StorageBuilder::new(&config).await.unwrap().build().await;
632
633        assert!(storage.is_ok());
634    }
635}