common/storage/factory.rs
1//! Storage factory for creating storage instances from configuration.
2//!
3//! This module provides factory functions for creating storage backends
4//! based on configuration, supporting both InMemory and SlateDB backends.
5
6use std::sync::Arc;
7
8use slatedb::config::Settings;
9use slatedb::object_store::{self, ObjectStore};
10use slatedb::{DbBuilder, DbReader};
11use tokio::runtime::Handle;
12
13use super::config::{ObjectStoreConfig, SlateDbStorageConfig, StorageConfig};
14use super::in_memory::InMemoryStorage;
15use super::slate::{SlateDbStorage, SlateDbStorageReader};
16use super::{MergeOperator, Storage, StorageError, StorageRead, StorageResult};
17
18/// Runtime options for storage that cannot be serialized.
19///
20/// This struct holds non-serializable runtime configuration like tokio
21/// runtime handles. Users can configure these options and pass them to
22/// system builders (e.g., `LogDbBuilder`, `TsdbBuilder`).
23///
24/// # Example
25///
26/// ```rust,ignore
27/// use common::StorageRuntime;
28///
29/// // Create a separate runtime for compaction
30/// let compaction_runtime = tokio::runtime::Builder::new_multi_thread()
31/// .worker_threads(2)
32/// .enable_all()
33/// .build()
34/// .unwrap();
35///
36/// let runtime = StorageRuntime::new()
37/// .with_compaction_runtime(compaction_runtime.handle().clone());
38///
39/// // Pass to a system builder
40/// let mut builder = LogDbBuilder::new(config);
41/// *builder.storage_mut() = runtime;
42/// let log = builder.build().await?;
43/// ```
44#[derive(Default)]
45pub struct StorageRuntime {
46 pub(crate) compaction_runtime: Option<Handle>,
47}
48
49impl StorageRuntime {
50 /// Creates a new storage runtime with default options.
51 pub fn new() -> Self {
52 Self::default()
53 }
54
55 /// Sets a separate runtime for SlateDB compaction tasks.
56 ///
57 /// When provided, SlateDB's compaction tasks will run on this runtime
58 /// instead of the runtime used for user operations. This is important
59 /// when calling the database from sync code using `block_on`, as it
60 /// prevents deadlocks between user operations and background compaction.
61 ///
62 /// This option only affects SlateDB storage; it is ignored for in-memory storage.
63 pub fn with_compaction_runtime(mut self, handle: Handle) -> Self {
64 self.compaction_runtime = Some(handle);
65 self
66 }
67}
68
69/// Storage semantics configured by system crates.
70///
71/// This struct holds semantic concerns like merge operators that are specific
72/// to each system (log, timeseries, vector). End users should not use this
73/// directly - each system configures its own semantics internally.
74///
75/// # Internal Use Only
76///
77/// This type is public so that system crates (timeseries, vector, log) can
78/// access it, but it is not intended for end-user consumption.
79///
80/// # Example (for system crate implementers)
81///
82/// ```rust,ignore
83/// // In timeseries crate:
84/// let semantics = StorageSemantics::new()
85/// .with_merge_operator(Arc::new(TimeSeriesMergeOperator));
86/// let storage = create_storage(&config, runtime, semantics).await?;
87/// ```
88#[derive(Default)]
89pub struct StorageSemantics {
90 pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
91}
92
93impl StorageSemantics {
94 /// Creates new storage semantics with default values.
95 pub fn new() -> Self {
96 Self::default()
97 }
98
99 /// Sets the merge operator for merge operations.
100 ///
101 /// The merge operator defines how values are combined during compaction.
102 /// Each system (timeseries, vector) defines its own merge semantics.
103 pub fn with_merge_operator(mut self, op: Arc<dyn MergeOperator>) -> Self {
104 self.merge_operator = Some(op);
105 self
106 }
107}
108
109/// Creates an object store from configuration without initializing SlateDB.
110///
111/// This is useful for cleanup operations where you need to access the object store
112/// after the database has been closed.
113pub fn create_object_store(config: &ObjectStoreConfig) -> StorageResult<Arc<dyn ObjectStore>> {
114 match config {
115 ObjectStoreConfig::InMemory => Ok(Arc::new(object_store::memory::InMemory::new())),
116 ObjectStoreConfig::Aws(aws_config) => {
117 let store = object_store::aws::AmazonS3Builder::from_env()
118 .with_region(&aws_config.region)
119 .with_bucket_name(&aws_config.bucket)
120 .build()
121 .map_err(|e| {
122 StorageError::Storage(format!("Failed to create AWS S3 store: {}", e))
123 })?;
124 Ok(Arc::new(store))
125 }
126 ObjectStoreConfig::Local(local_config) => {
127 std::fs::create_dir_all(&local_config.path).map_err(|e| {
128 StorageError::Storage(format!(
129 "Failed to create storage directory '{}': {}",
130 local_config.path, e
131 ))
132 })?;
133 let store = object_store::local::LocalFileSystem::new_with_prefix(&local_config.path)
134 .map_err(|e| {
135 StorageError::Storage(format!("Failed to create local filesystem store: {}", e))
136 })?;
137 Ok(Arc::new(store))
138 }
139 }
140}
141
142/// Creates a storage instance based on configuration, runtime options, and semantics.
143///
144/// This is the primary factory function for creating storage backends. It combines:
145/// - `config`: Serializable storage configuration (backend type, paths, etc.)
146/// - `runtime`: Non-serializable runtime options (compaction runtime handle)
147/// - `semantics`: System-specific semantics (merge operators)
148///
149/// # Arguments
150///
151/// * `config` - The storage configuration specifying the backend type and settings.
152/// * `runtime` - Runtime options like compaction runtime handles.
153/// * `semantics` - System-specific semantics like merge operators.
154///
155/// # Returns
156///
157/// Returns an `Arc<dyn Storage>` on success, or a `StorageError` on failure.
158///
159/// # Example (for system crate implementers)
160///
161/// ```rust,ignore
162/// // In a system crate's builder:
163/// let storage = create_storage(
164/// &self.config.storage,
165/// self.storage_runtime.unwrap_or_default(),
166/// StorageSemantics::new().with_merge_operator(Arc::new(MyMergeOp)),
167/// ).await?;
168/// ```
169pub async fn create_storage(
170 config: &StorageConfig,
171 runtime: StorageRuntime,
172 semantics: StorageSemantics,
173) -> StorageResult<Arc<dyn Storage>> {
174 match config {
175 StorageConfig::InMemory => {
176 let storage = match semantics.merge_operator {
177 Some(op) => InMemoryStorage::with_merge_operator(op),
178 None => InMemoryStorage::new(),
179 };
180 Ok(Arc::new(storage))
181 }
182 StorageConfig::SlateDb(slate_config) => {
183 let storage = create_slatedb_storage(slate_config, runtime, semantics).await?;
184 Ok(Arc::new(storage))
185 }
186 }
187}
188
189/// Creates a read-only storage instance based on configuration.
190///
191/// This function creates a storage backend that only supports read operations.
192/// For SlateDB, it uses `DbReader` which does not participate in fencing,
193/// allowing multiple readers to coexist with a single writer.
194///
195/// # Arguments
196///
197/// * `config` - The storage configuration specifying the backend type and settings.
198/// * `semantics` - System-specific semantics like merge operators.
199/// * `reader_options` - SlateDB reader options (e.g., manifest_poll_interval).
200/// These are passed directly to `DbReader::open` for SlateDB storage.
201/// Ignored for InMemory storage.
202///
203/// # Returns
204///
205/// Returns an `Arc<dyn StorageRead>` on success, or a `StorageError` on failure.
206pub async fn create_storage_read(
207 config: &StorageConfig,
208 semantics: StorageSemantics,
209 reader_options: slatedb::config::DbReaderOptions,
210) -> StorageResult<Arc<dyn StorageRead>> {
211 match config {
212 StorageConfig::InMemory => {
213 // InMemory has no fencing, reuse existing implementation
214 let storage = match semantics.merge_operator {
215 Some(op) => InMemoryStorage::with_merge_operator(op),
216 None => InMemoryStorage::new(),
217 };
218 Ok(Arc::new(storage))
219 }
220 StorageConfig::SlateDb(slate_config) => {
221 let object_store = create_object_store(&slate_config.object_store)?;
222
223 let mut options = reader_options;
224 if let Some(op) = semantics.merge_operator {
225 let adapter = SlateDbStorage::merge_operator_adapter(op);
226 options.merge_operator = Some(Arc::new(adapter));
227 }
228 let reader = DbReader::open(
229 slate_config.path.clone(),
230 object_store,
231 None, // checkpoint_id - use latest state
232 options,
233 )
234 .await
235 .map_err(|e| {
236 StorageError::Storage(format!("Failed to create SlateDB reader: {}", e))
237 })?;
238 Ok(Arc::new(SlateDbStorageReader::new(Arc::new(reader))))
239 }
240 }
241}
242
243async fn create_slatedb_storage(
244 config: &SlateDbStorageConfig,
245 runtime: StorageRuntime,
246 semantics: StorageSemantics,
247) -> StorageResult<SlateDbStorage> {
248 let object_store = create_object_store(&config.object_store)?;
249
250 // Load SlateDB settings
251 let settings = match &config.settings_path {
252 Some(path) => Settings::from_file(path).map_err(|e| {
253 StorageError::Storage(format!(
254 "Failed to load SlateDB settings from {}: {}",
255 path, e
256 ))
257 })?,
258 None => Settings::load().unwrap_or_default(),
259 };
260
261 // Build the database
262 let mut db_builder = DbBuilder::new(config.path.clone(), object_store).with_settings(settings);
263
264 // Add merge operator if provided
265 if let Some(op) = semantics.merge_operator {
266 let adapter = SlateDbStorage::merge_operator_adapter(op);
267 db_builder = db_builder.with_merge_operator(Arc::new(adapter));
268 }
269
270 // Add compaction runtime if provided
271 if let Some(handle) = runtime.compaction_runtime {
272 db_builder = db_builder.with_compaction_runtime(handle);
273 }
274
275 let db = db_builder
276 .build()
277 .await
278 .map_err(|e| StorageError::Storage(format!("Failed to create SlateDB: {}", e)))?;
279
280 Ok(SlateDbStorage::new(Arc::new(db)))
281}