Skip to main content

common/storage/
factory.rs

1//! Storage factory for creating storage instances from configuration.
2//!
3//! This module provides factory functions for creating storage backends
4//! based on configuration, supporting both InMemory and SlateDB backends.
5
6use std::sync::Arc;
7
8use slatedb::config::Settings;
9use slatedb::object_store::{self, ObjectStore};
10use slatedb::{DbBuilder, DbReader};
11use tokio::runtime::Handle;
12
13use super::config::{ObjectStoreConfig, SlateDbStorageConfig, StorageConfig};
14use super::in_memory::InMemoryStorage;
15use super::slate::{SlateDbStorage, SlateDbStorageReader};
16use super::{MergeOperator, Storage, StorageError, StorageRead, StorageResult};
17
18/// Runtime options for storage that cannot be serialized.
19///
20/// This struct holds non-serializable runtime configuration like tokio
21/// runtime handles. Users can configure these options and pass them to
22/// system builders (e.g., `LogDbBuilder`, `TsdbBuilder`).
23///
24/// # Example
25///
26/// ```rust,ignore
27/// use common::StorageRuntime;
28///
29/// // Create a separate runtime for compaction
30/// let compaction_runtime = tokio::runtime::Builder::new_multi_thread()
31///     .worker_threads(2)
32///     .enable_all()
33///     .build()
34///     .unwrap();
35///
36/// let runtime = StorageRuntime::new()
37///     .with_compaction_runtime(compaction_runtime.handle().clone());
38///
39/// // Pass to a system builder
40/// let mut builder = LogDbBuilder::new(config);
41/// *builder.storage_mut() = runtime;
42/// let log = builder.build().await?;
43/// ```
44#[derive(Default)]
45pub struct StorageRuntime {
46    pub(crate) compaction_runtime: Option<Handle>,
47}
48
49impl StorageRuntime {
50    /// Creates a new storage runtime with default options.
51    pub fn new() -> Self {
52        Self::default()
53    }
54
55    /// Sets a separate runtime for SlateDB compaction tasks.
56    ///
57    /// When provided, SlateDB's compaction tasks will run on this runtime
58    /// instead of the runtime used for user operations. This is important
59    /// when calling the database from sync code using `block_on`, as it
60    /// prevents deadlocks between user operations and background compaction.
61    ///
62    /// This option only affects SlateDB storage; it is ignored for in-memory storage.
63    pub fn with_compaction_runtime(mut self, handle: Handle) -> Self {
64        self.compaction_runtime = Some(handle);
65        self
66    }
67}
68
69/// Storage semantics configured by system crates.
70///
71/// This struct holds semantic concerns like merge operators that are specific
72/// to each system (log, timeseries, vector). End users should not use this
73/// directly - each system configures its own semantics internally.
74///
75/// # Internal Use Only
76///
77/// This type is public so that system crates (timeseries, vector, log) can
78/// access it, but it is not intended for end-user consumption.
79///
80/// # Example (for system crate implementers)
81///
82/// ```rust,ignore
83/// // In timeseries crate:
84/// let semantics = StorageSemantics::new()
85///     .with_merge_operator(Arc::new(TimeSeriesMergeOperator));
86/// let storage = create_storage(&config, runtime, semantics).await?;
87/// ```
88#[derive(Default)]
89pub struct StorageSemantics {
90    pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
91}
92
93impl StorageSemantics {
94    /// Creates new storage semantics with default values.
95    pub fn new() -> Self {
96        Self::default()
97    }
98
99    /// Sets the merge operator for merge operations.
100    ///
101    /// The merge operator defines how values are combined during compaction.
102    /// Each system (timeseries, vector) defines its own merge semantics.
103    pub fn with_merge_operator(mut self, op: Arc<dyn MergeOperator>) -> Self {
104        self.merge_operator = Some(op);
105        self
106    }
107}
108
109/// Creates an object store from configuration without initializing SlateDB.
110///
111/// This is useful for cleanup operations where you need to access the object store
112/// after the database has been closed.
113pub fn create_object_store(config: &ObjectStoreConfig) -> StorageResult<Arc<dyn ObjectStore>> {
114    match config {
115        ObjectStoreConfig::InMemory => Ok(Arc::new(object_store::memory::InMemory::new())),
116        ObjectStoreConfig::Aws(aws_config) => {
117            let store = object_store::aws::AmazonS3Builder::from_env()
118                .with_region(&aws_config.region)
119                .with_bucket_name(&aws_config.bucket)
120                .build()
121                .map_err(|e| {
122                    StorageError::Storage(format!("Failed to create AWS S3 store: {}", e))
123                })?;
124            Ok(Arc::new(store))
125        }
126        ObjectStoreConfig::Local(local_config) => {
127            std::fs::create_dir_all(&local_config.path).map_err(|e| {
128                StorageError::Storage(format!(
129                    "Failed to create storage directory '{}': {}",
130                    local_config.path, e
131                ))
132            })?;
133            let store = object_store::local::LocalFileSystem::new_with_prefix(&local_config.path)
134                .map_err(|e| {
135                StorageError::Storage(format!("Failed to create local filesystem store: {}", e))
136            })?;
137            Ok(Arc::new(store))
138        }
139    }
140}
141
142/// Creates a storage instance based on configuration, runtime options, and semantics.
143///
144/// This is the primary factory function for creating storage backends. It combines:
145/// - `config`: Serializable storage configuration (backend type, paths, etc.)
146/// - `runtime`: Non-serializable runtime options (compaction runtime handle)
147/// - `semantics`: System-specific semantics (merge operators)
148///
149/// # Arguments
150///
151/// * `config` - The storage configuration specifying the backend type and settings.
152/// * `runtime` - Runtime options like compaction runtime handles.
153/// * `semantics` - System-specific semantics like merge operators.
154///
155/// # Returns
156///
157/// Returns an `Arc<dyn Storage>` on success, or a `StorageError` on failure.
158///
159/// # Example (for system crate implementers)
160///
161/// ```rust,ignore
162/// // In a system crate's builder:
163/// let storage = create_storage(
164///     &self.config.storage,
165///     self.storage_runtime.unwrap_or_default(),
166///     StorageSemantics::new().with_merge_operator(Arc::new(MyMergeOp)),
167/// ).await?;
168/// ```
169pub async fn create_storage(
170    config: &StorageConfig,
171    runtime: StorageRuntime,
172    semantics: StorageSemantics,
173) -> StorageResult<Arc<dyn Storage>> {
174    match config {
175        StorageConfig::InMemory => {
176            let storage = match semantics.merge_operator {
177                Some(op) => InMemoryStorage::with_merge_operator(op),
178                None => InMemoryStorage::new(),
179            };
180            Ok(Arc::new(storage))
181        }
182        StorageConfig::SlateDb(slate_config) => {
183            let storage = create_slatedb_storage(slate_config, runtime, semantics).await?;
184            Ok(Arc::new(storage))
185        }
186    }
187}
188
189/// Creates a read-only storage instance based on configuration.
190///
191/// This function creates a storage backend that only supports read operations.
192/// For SlateDB, it uses `DbReader` which does not participate in fencing,
193/// allowing multiple readers to coexist with a single writer.
194///
195/// # Arguments
196///
197/// * `config` - The storage configuration specifying the backend type and settings.
198/// * `semantics` - System-specific semantics like merge operators.
199/// * `reader_options` - SlateDB reader options (e.g., manifest_poll_interval).
200///   These are passed directly to `DbReader::open` for SlateDB storage.
201///   Ignored for InMemory storage.
202///
203/// # Returns
204///
205/// Returns an `Arc<dyn StorageRead>` on success, or a `StorageError` on failure.
206pub async fn create_storage_read(
207    config: &StorageConfig,
208    semantics: StorageSemantics,
209    reader_options: slatedb::config::DbReaderOptions,
210) -> StorageResult<Arc<dyn StorageRead>> {
211    match config {
212        StorageConfig::InMemory => {
213            // InMemory has no fencing, reuse existing implementation
214            let storage = match semantics.merge_operator {
215                Some(op) => InMemoryStorage::with_merge_operator(op),
216                None => InMemoryStorage::new(),
217            };
218            Ok(Arc::new(storage))
219        }
220        StorageConfig::SlateDb(slate_config) => {
221            let object_store = create_object_store(&slate_config.object_store)?;
222
223            let mut options = reader_options;
224            if let Some(op) = semantics.merge_operator {
225                let adapter = SlateDbStorage::merge_operator_adapter(op);
226                options.merge_operator = Some(Arc::new(adapter));
227            }
228            let reader = DbReader::open(
229                slate_config.path.clone(),
230                object_store,
231                None, // checkpoint_id - use latest state
232                options,
233            )
234            .await
235            .map_err(|e| {
236                StorageError::Storage(format!("Failed to create SlateDB reader: {}", e))
237            })?;
238            Ok(Arc::new(SlateDbStorageReader::new(Arc::new(reader))))
239        }
240    }
241}
242
243async fn create_slatedb_storage(
244    config: &SlateDbStorageConfig,
245    runtime: StorageRuntime,
246    semantics: StorageSemantics,
247) -> StorageResult<SlateDbStorage> {
248    let object_store = create_object_store(&config.object_store)?;
249
250    // Load SlateDB settings
251    let settings = match &config.settings_path {
252        Some(path) => Settings::from_file(path).map_err(|e| {
253            StorageError::Storage(format!(
254                "Failed to load SlateDB settings from {}: {}",
255                path, e
256            ))
257        })?,
258        None => Settings::load().unwrap_or_default(),
259    };
260
261    // Build the database
262    let mut db_builder = DbBuilder::new(config.path.clone(), object_store).with_settings(settings);
263
264    // Add merge operator if provided
265    if let Some(op) = semantics.merge_operator {
266        let adapter = SlateDbStorage::merge_operator_adapter(op);
267        db_builder = db_builder.with_merge_operator(Arc::new(adapter));
268    }
269
270    // Add compaction runtime if provided
271    if let Some(handle) = runtime.compaction_runtime {
272        db_builder = db_builder.with_compaction_runtime(handle);
273    }
274
275    let db = db_builder
276        .build()
277        .await
278        .map_err(|e| StorageError::Storage(format!("Failed to create SlateDB: {}", e)))?;
279
280    Ok(SlateDbStorage::new(Arc::new(db)))
281}