Skip to main content

common/storage/
config.rs

1//! Storage configuration types.
2//!
3//! This module provides configuration structures for different storage backends,
4//! allowing services to configure storage type (InMemory or SlateDB) via config files
5//! or environment variables.
6
7use serde::{Deserialize, Serialize};
8
9/// Top-level storage configuration.
10///
11/// Defaults to `SlateDb` with a local `/tmp/opendata-storage` directory.
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
13#[serde(tag = "type")]
14pub enum StorageConfig {
15    InMemory,
16    SlateDb(SlateDbStorageConfig),
17}
18
19impl Default for StorageConfig {
20    fn default() -> Self {
21        StorageConfig::SlateDb(SlateDbStorageConfig {
22            path: "data".to_string(),
23            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
24                path: ".data".to_string(),
25            }),
26            settings_path: None,
27            block_cache: None,
28        })
29    }
30}
31
32/// SlateDB-specific configuration.
33#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
34pub struct SlateDbStorageConfig {
35    /// Path prefix for SlateDB data in the object store.
36    pub path: String,
37
38    /// Object store provider configuration.
39    pub object_store: ObjectStoreConfig,
40
41    /// Optional path to SlateDB settings file (TOML/YAML/JSON).
42    ///
43    /// If not provided, uses SlateDB's `Settings::load()` which checks for
44    /// `SlateDb.toml`, `SlateDb.json`, `SlateDb.yaml` in the working directory
45    /// and merges any `SLATEDB_` prefixed environment variables.
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub settings_path: Option<String>,
48
49    /// Optional block cache for SST block lookups.
50    ///
51    /// When configured, reduces object store reads by caching hot blocks
52    /// in memory and/or on local disk.
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub block_cache: Option<BlockCacheConfig>,
55}
56
57/// Block cache configuration for SlateDB.
58#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
59#[serde(tag = "type")]
60pub enum BlockCacheConfig {
61    /// Two-tier cache using foyer: in-memory + on-disk (ideally NVMe).
62    FoyerHybrid(FoyerHybridCacheConfig),
63}
64
65/// Write policy for foyer's hybrid cache.
66///
67/// Controls when entries are written to the disk tier.
68#[derive(Default, Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
69pub enum FoyerWritePolicy {
70    /// Write to disk when an entry is inserted into the memory cache.
71    /// Ensures every cached block is also persisted to the disk tier.
72    #[default]
73    WriteOnInsertion,
74    /// Write to disk only when an entry is evicted from the memory cache.
75    /// This is foyer's default policy.
76    WriteOnEviction,
77}
78
79/// Configuration for foyer's hybrid (memory + disk) block cache.
80#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
81pub struct FoyerHybridCacheConfig {
82    /// In-memory cache capacity in bytes.
83    pub memory_capacity: u64,
84    /// On-disk cache capacity in bytes.
85    pub disk_capacity: u64,
86    /// Path for the on-disk cache directory.
87    pub disk_path: String,
88    /// Write policy for the hybrid cache. Default: `WriteOnInsertion`.
89    #[serde(default)]
90    pub write_policy: FoyerWritePolicy,
91    /// Number of flush threads for the large engine. Default: 4.
92    #[serde(default = "default_flushers")]
93    pub flushers: usize,
94    /// Buffer pool size in bytes for the large engine flush pipeline.
95    /// Each flusher double-buffers, so actual allocation is ~2x this value.
96    /// Default: `memory_capacity / 32` (computed at build time when absent).
97    #[serde(default, skip_serializing_if = "Option::is_none")]
98    pub buffer_pool_size: Option<u64>,
99    /// Submit queue size threshold in bytes. Entries are dropped when
100    /// the queue exceeds this limit. Default: 1 GiB.
101    #[serde(default = "default_submit_queue_size_threshold")]
102    pub submit_queue_size_threshold: u64,
103}
104
105fn default_flushers() -> usize {
106    4
107}
108
109fn default_submit_queue_size_threshold() -> u64 {
110    1024 * 1024 * 1024 // 1 GiB
111}
112
113impl FoyerHybridCacheConfig {
114    /// Returns the effective buffer pool size: explicit value if set,
115    /// otherwise `memory_capacity / 32`.
116    pub fn effective_buffer_pool_size(&self) -> u64 {
117        self.buffer_pool_size.unwrap_or(self.memory_capacity / 32)
118    }
119}
120
121impl Default for SlateDbStorageConfig {
122    fn default() -> Self {
123        Self {
124            path: "data".to_string(),
125            object_store: ObjectStoreConfig::default(),
126            settings_path: None,
127            block_cache: None,
128        }
129    }
130}
131
132impl StorageConfig {
133    /// Returns a new config with the path modified by appending a suffix.
134    ///
135    /// For SlateDB storage, appends the suffix to the path (e.g., "data" -> "data/0").
136    /// For InMemory storage, returns a clone unchanged.
137    pub fn with_path_suffix(&self, suffix: &str) -> Self {
138        match self {
139            StorageConfig::InMemory => StorageConfig::InMemory,
140            StorageConfig::SlateDb(config) => StorageConfig::SlateDb(SlateDbStorageConfig {
141                path: format!("{}/{}", config.path, suffix),
142                object_store: config.object_store.clone(),
143                settings_path: config.settings_path.clone(),
144                block_cache: config.block_cache.clone(),
145            }),
146        }
147    }
148}
149
150/// Object store provider configuration for SlateDB.
151#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)]
152#[serde(tag = "type")]
153pub enum ObjectStoreConfig {
154    /// In-memory object store (useful for testing and development).
155    #[default]
156    InMemory,
157
158    /// AWS S3 object store.
159    Aws(AwsObjectStoreConfig),
160
161    /// Local filesystem object store.
162    Local(LocalObjectStoreConfig),
163}
164
165/// AWS S3 object store configuration.
166#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
167pub struct AwsObjectStoreConfig {
168    /// AWS region (e.g., "us-west-2").
169    pub region: String,
170
171    /// S3 bucket name.
172    pub bucket: String,
173}
174
175/// Local filesystem object store configuration.
176#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
177pub struct LocalObjectStoreConfig {
178    /// Path to the local directory for storage.
179    pub path: String,
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    #[test]
187    fn should_default_to_slatedb_with_local_data_dir() {
188        // given/when
189        let config = StorageConfig::default();
190
191        // then
192        match config {
193            StorageConfig::SlateDb(slate_config) => {
194                assert_eq!(slate_config.path, "data");
195                assert_eq!(
196                    slate_config.object_store,
197                    ObjectStoreConfig::Local(LocalObjectStoreConfig {
198                        path: ".data".to_string()
199                    })
200                );
201            }
202            _ => panic!("Expected SlateDb config as default"),
203        }
204    }
205
206    #[test]
207    fn should_deserialize_in_memory_config() {
208        // given
209        let yaml = r#"type: InMemory"#;
210
211        // when
212        let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
213
214        // then
215        assert_eq!(config, StorageConfig::InMemory);
216    }
217
218    #[test]
219    fn should_deserialize_slatedb_config_with_local_object_store() {
220        // given
221        let yaml = r#"
222type: SlateDb
223path: my-data
224object_store:
225  type: Local
226  path: /tmp/slatedb
227"#;
228
229        // when
230        let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
231
232        // then
233        match config {
234            StorageConfig::SlateDb(slate_config) => {
235                assert_eq!(slate_config.path, "my-data");
236                assert_eq!(
237                    slate_config.object_store,
238                    ObjectStoreConfig::Local(LocalObjectStoreConfig {
239                        path: "/tmp/slatedb".to_string()
240                    })
241                );
242                assert!(slate_config.settings_path.is_none());
243            }
244            _ => panic!("Expected SlateDb config"),
245        }
246    }
247
248    #[test]
249    fn should_deserialize_slatedb_config_with_aws_object_store() {
250        // given
251        let yaml = r#"
252type: SlateDb
253path: my-data
254object_store:
255  type: Aws
256  region: us-west-2
257  bucket: my-bucket
258settings_path: slatedb.toml
259"#;
260
261        // when
262        let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
263
264        // then
265        match config {
266            StorageConfig::SlateDb(slate_config) => {
267                assert_eq!(slate_config.path, "my-data");
268                assert_eq!(
269                    slate_config.object_store,
270                    ObjectStoreConfig::Aws(AwsObjectStoreConfig {
271                        region: "us-west-2".to_string(),
272                        bucket: "my-bucket".to_string()
273                    })
274                );
275                assert_eq!(slate_config.settings_path, Some("slatedb.toml".to_string()));
276            }
277            _ => panic!("Expected SlateDb config"),
278        }
279    }
280
281    #[test]
282    fn should_deserialize_slatedb_config_with_in_memory_object_store() {
283        // given
284        let yaml = r#"
285type: SlateDb
286path: test-data
287object_store:
288  type: InMemory
289"#;
290
291        // when
292        let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
293
294        // then
295        match config {
296            StorageConfig::SlateDb(slate_config) => {
297                assert_eq!(slate_config.path, "test-data");
298                assert_eq!(slate_config.object_store, ObjectStoreConfig::InMemory);
299            }
300            _ => panic!("Expected SlateDb config"),
301        }
302    }
303
304    #[test]
305    fn should_serialize_slatedb_config() {
306        // given
307        let config = StorageConfig::SlateDb(SlateDbStorageConfig {
308            path: "my-data".to_string(),
309            object_store: ObjectStoreConfig::Local(LocalObjectStoreConfig {
310                path: "/tmp/slatedb".to_string(),
311            }),
312            settings_path: None,
313            block_cache: None,
314        });
315
316        // when
317        let yaml = serde_yaml::to_string(&config).unwrap();
318
319        // then
320        assert!(yaml.contains("type: SlateDb"));
321        assert!(yaml.contains("path: my-data"));
322        assert!(yaml.contains("type: Local"));
323        // settings_path and block_cache should be omitted when None
324        assert!(!yaml.contains("settings_path"));
325        assert!(!yaml.contains("block_cache"));
326    }
327
328    #[test]
329    fn should_deserialize_block_cache_config() {
330        let yaml = r#"
331type: SlateDb
332path: data
333object_store:
334  type: InMemory
335block_cache:
336  type: FoyerHybrid
337  memory_capacity: 8589934592
338  disk_capacity: 150323855360
339  disk_path: /mnt/nvme/block-cache
340"#;
341        let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
342        match config {
343            StorageConfig::SlateDb(slate_config) => {
344                let cache = slate_config.block_cache.expect("block_cache should be set");
345                match cache {
346                    BlockCacheConfig::FoyerHybrid(foyer) => {
347                        assert_eq!(foyer.memory_capacity, 8589934592);
348                        assert_eq!(foyer.disk_capacity, 150323855360);
349                        assert_eq!(foyer.disk_path, "/mnt/nvme/block-cache");
350                        // new fields should get defaults
351                        assert_eq!(foyer.write_policy, FoyerWritePolicy::WriteOnInsertion);
352                        assert_eq!(foyer.flushers, 4);
353                        assert!(foyer.buffer_pool_size.is_none());
354                        assert_eq!(foyer.submit_queue_size_threshold, 1024 * 1024 * 1024);
355                        // effective buffer pool = memory_capacity / 32
356                        assert_eq!(foyer.effective_buffer_pool_size(), 8589934592 / 32);
357                    }
358                }
359            }
360            _ => panic!("Expected SlateDb config"),
361        }
362    }
363
364    #[test]
365    fn should_deserialize_block_cache_with_explicit_engine_options() {
366        // given
367        let yaml = r#"
368type: SlateDb
369path: data
370object_store:
371  type: InMemory
372block_cache:
373  type: FoyerHybrid
374  memory_capacity: 4294967296
375  disk_capacity: 10737418240
376  disk_path: /mnt/nvme/cache
377  write_policy: WriteOnEviction
378  flushers: 2
379  buffer_pool_size: 134217728
380  submit_queue_size_threshold: 536870912
381"#;
382
383        // when
384        let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
385
386        // then
387        match config {
388            StorageConfig::SlateDb(slate_config) => {
389                let cache = slate_config.block_cache.expect("block_cache should be set");
390                match cache {
391                    BlockCacheConfig::FoyerHybrid(foyer) => {
392                        assert_eq!(foyer.write_policy, FoyerWritePolicy::WriteOnEviction);
393                        assert_eq!(foyer.flushers, 2);
394                        assert_eq!(foyer.buffer_pool_size, Some(134217728));
395                        assert_eq!(foyer.submit_queue_size_threshold, 536870912);
396                        // explicit value overrides derivation
397                        assert_eq!(foyer.effective_buffer_pool_size(), 134217728);
398                    }
399                }
400            }
401            _ => panic!("Expected SlateDb config"),
402        }
403    }
404
405    #[test]
406    fn should_derive_buffer_pool_size_from_memory_capacity() {
407        // given
408        let config = FoyerHybridCacheConfig {
409            memory_capacity: 8 * 1024 * 1024 * 1024, // 8 GiB
410            disk_capacity: 100 * 1024 * 1024 * 1024,
411            disk_path: "/tmp/cache".to_string(),
412            write_policy: FoyerWritePolicy::default(),
413            flushers: 4,
414            buffer_pool_size: None,
415            submit_queue_size_threshold: 1024 * 1024 * 1024,
416        };
417
418        // when/then
419        assert_eq!(
420            config.effective_buffer_pool_size(),
421            256 * 1024 * 1024 // 256 MiB = 8 GiB / 32
422        );
423    }
424
425    #[test]
426    fn should_default_block_cache_to_none() {
427        let yaml = r#"
428type: SlateDb
429path: data
430object_store:
431  type: InMemory
432"#;
433        let config: StorageConfig = serde_yaml::from_str(yaml).unwrap();
434        match config {
435            StorageConfig::SlateDb(slate_config) => {
436                assert!(slate_config.block_cache.is_none());
437            }
438            _ => panic!("Expected SlateDb config"),
439        }
440    }
441}