common/storage/mod.rs
1pub mod config;
2pub mod factory;
3pub mod in_memory;
4pub mod loader;
5pub mod slate;
6pub mod util;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bytes::Bytes;
12
13use crate::BytesRange;
14
15#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
16pub enum Ttl {
17 #[default]
18 Default,
19 NoExpiry,
20 ExpireAfter(u64),
21}
22
23#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
24pub struct PutOptions {
25 pub ttl: Ttl,
26}
27
28/// Encapsulates a record being put along with options specific to the put.
29#[derive(Clone, Debug)]
30pub struct PutRecordOp {
31 pub record: Record,
32 pub options: PutOptions,
33}
34
35impl PutRecordOp {
36 pub fn new(record: Record) -> Self {
37 Self {
38 record,
39 options: PutOptions::default(),
40 }
41 }
42
43 pub fn new_with_options(record: Record, options: PutOptions) -> Self {
44 Self { record, options }
45 }
46
47 pub fn with_options(self, options: PutOptions) -> Self {
48 Self {
49 record: self.record,
50 options,
51 }
52 }
53}
54
55/// Converts a Record to a PutRecordOp with default options
56impl From<Record> for PutRecordOp {
57 fn from(record: Record) -> Self {
58 Self::new(record)
59 }
60}
61
62#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
63pub struct MergeOptions {
64 pub ttl: Ttl,
65}
66
67/// Encapsulates a record written as part of a merge op along with options specific to the merge.
68#[derive(Clone, Debug)]
69pub struct MergeRecordOp {
70 pub record: Record,
71 pub options: MergeOptions,
72}
73
74impl MergeRecordOp {
75 pub fn new(record: Record) -> Self {
76 Self {
77 record,
78 options: MergeOptions::default(),
79 }
80 }
81
82 pub fn new_with_ttl(record: Record, options: MergeOptions) -> Self {
83 Self { record, options }
84 }
85}
86
87/// Converts a Record to a PutRecordOp with default options
88impl From<Record> for MergeRecordOp {
89 fn from(record: Record) -> Self {
90 Self::new(record)
91 }
92}
93
94#[derive(Clone, Debug)]
95pub struct Record {
96 pub key: Bytes,
97 pub value: Bytes,
98}
99
100impl Record {
101 pub fn new(key: Bytes, value: Bytes) -> Self {
102 Self { key, value }
103 }
104
105 pub fn empty(key: Bytes) -> Self {
106 Self::new(key, Bytes::new())
107 }
108}
109
110#[derive(Clone, Debug)]
111pub enum RecordOp {
112 Put(PutRecordOp),
113 Merge(MergeRecordOp),
114 Delete(Bytes),
115}
116
117/// Options for write operations.
118///
119/// Controls the durability behavior of write operations like [`Storage::put`]
120/// and [`Storage::put_with_options`].
121#[derive(Debug, Clone, Default)]
122pub struct WriteOptions {
123 /// Whether to wait for the write to be durable before returning.
124 ///
125 /// When `true`, the operation will not return until the data has been
126 /// persisted to durable storage (e.g., flushed to the WAL and acknowledged
127 /// by the object store).
128 ///
129 /// When `false` (the default), the operation returns as soon as the data
130 /// is in memory, providing lower latency but risking data loss on crash.
131 pub await_durable: bool,
132}
133
134/// Error type for storage operations
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub enum StorageError {
137 /// Storage-related errors
138 Storage(String),
139 /// Internal errors
140 Internal(String),
141}
142
143impl std::error::Error for StorageError {}
144
145impl std::fmt::Display for StorageError {
146 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
147 match self {
148 StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
149 StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
150 }
151 }
152}
153
154impl StorageError {
155 /// Converts a storage error to StorageError::Storage.
156 pub fn from_storage(e: impl std::fmt::Display) -> Self {
157 StorageError::Storage(e.to_string())
158 }
159}
160
161/// Result type alias for storage operations
162pub type StorageResult<T> = std::result::Result<T, StorageError>;
163
164/// Result of a write operation, containing the sequence number assigned by the storage engine.
165#[derive(Clone, Copy, Debug, PartialEq, Eq)]
166pub struct WriteResult {
167 /// The sequence number assigned to this write by the underlying storage engine.
168 pub seqnum: u64,
169}
170
171/// Trait for merging existing values with new values.
172///
173/// Merge operators must be associative: `merge_batch(merge_batch(a, [b]), [c]) == merge_batch(a, merge_batch(b, [c]))`.
174/// This ensures consistent merging behavior regardless of the order of operations.
175pub trait MergeOperator: Send + Sync {
176 /// Merges a batch of operands with an optional existing value.
177 ///
178 /// # Arguments
179 /// * `key` - The key associated with the values being merged
180 /// * `existing_value` - The current value stored in the database (if any)
181 /// * `operands` - A slice of operands to merge, ordered from oldest to newest
182 fn merge_batch(&self, key: &Bytes, existing_value: Option<Bytes>, operands: &[Bytes]) -> Bytes;
183}
184
185pub fn default_merge_batch(
186 key: &Bytes,
187 existing_value: Option<Bytes>,
188 operands: &[Bytes],
189 merge: impl Fn(&Bytes, Option<Bytes>, Bytes) -> Bytes,
190) -> Bytes {
191 let mut result = existing_value;
192 for operand in operands {
193 result = Some(merge(key, result, operand.clone()));
194 }
195 result.expect("merge_batch called with no existing value and no operands")
196}
197
198/// Iterator over storage records.
199#[async_trait]
200pub trait StorageIterator {
201 /// Returns the next record from this iterator.
202 ///
203 /// Returns `Ok(None)` when the iterator is exhausted.
204 async fn next(&mut self) -> StorageResult<Option<Record>>;
205}
206
207/// Common read operations supported by both Storage and StorageSnapshot.
208///
209/// This trait provides the core read methods that are shared between full storage
210/// access and point-in-time snapshots. By extracting these common operations,
211/// we can write code that works with both storage types.
212#[async_trait]
213pub trait StorageRead: Send + Sync {
214 /// Retrieves a single record by exact key.
215 ///
216 /// Returns `Ok(None)` if the key is not present.
217 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
218
219 /// Returns an iterator over records in the given range.
220 ///
221 /// The returned iterator is owned and does not borrow from the storage,
222 /// allowing it to be stored in structs or passed across await points.
223 async fn scan_iter(
224 &self,
225 range: BytesRange,
226 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
227
228 /// Collects all records in the range into a Vec.
229 #[tracing::instrument(level = "trace", skip_all)]
230 async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
231 let mut iter = self.scan_iter(range).await?;
232 let mut records = Vec::new();
233 while let Some(record) = iter.next().await? {
234 records.push(record);
235 }
236 Ok(records)
237 }
238}
239
240/// A point-in-time snapshot of the storage layer.
241///
242/// Snapshots provide a consistent read-only view of the database at the time
243/// the snapshot was created. Reads from a snapshot will not see any subsequent
244/// writes to the underlying storage.
245#[async_trait]
246pub trait StorageSnapshot: StorageRead {}
247
248/// The storage type encapsulates access to the underlying storage (e.g. SlateDB).
249#[async_trait]
250pub trait Storage: StorageRead {
251 /// Applies a batch of mixed operations (puts, merges, and deletes) atomically.
252 ///
253 /// All operations in the batch are written together in a single `WriteBatch`,
254 /// so either all succeed or none are visible. This is the primary write method
255 /// used by flushers that produce a mix of operation types from a frozen delta.
256 ///
257 /// Uses `WriteOptions::default()` (`await_durable: false`).
258 async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<WriteResult> {
259 self.apply_with_options(ops, WriteOptions::default()).await
260 }
261
262 /// Applies a batch of mixed operations with custom write options.
263 ///
264 /// `options.await_durable` controls whether this call returns only after
265 /// the batch reaches durable storage.
266 ///
267 /// # Errors
268 ///
269 /// Returns an error if the batch cannot be applied.
270 async fn apply_with_options(
271 &self,
272 ops: Vec<RecordOp>,
273 options: WriteOptions,
274 ) -> StorageResult<WriteResult>;
275
276 /// Writes records to storage.
277 ///
278 /// Uses `WriteOptions::default()` (`await_durable: false`).
279 async fn put(&self, records: Vec<PutRecordOp>) -> StorageResult<WriteResult> {
280 self.put_with_options(records, WriteOptions::default())
281 .await
282 }
283
284 /// Writes records to storage with custom options.
285 ///
286 /// This method allows control over durability behavior. Use this when you
287 /// need to specify whether to wait for writes to be durable.
288 ///
289 /// # Arguments
290 ///
291 /// * `records` - The records to write
292 /// * `options` - Write options controlling durability behavior
293 ///
294 /// # Errors
295 ///
296 /// Returns an error if any write in the batch fails.
297 async fn put_with_options(
298 &self,
299 records: Vec<PutRecordOp>,
300 options: WriteOptions,
301 ) -> StorageResult<WriteResult>;
302
303 /// Merges values for the given keys using the configured merge operator.
304 ///
305 /// This method requires the underlying storage engine to be configured with
306 /// a merge operator. If no merge operator is configured, this method will
307 /// return a `StorageError::Storage` error.
308 ///
309 /// The merge operation is atomic - all merges in the batch are applied
310 /// together or not at all.
311 ///
312 /// Uses `WriteOptions::default()` (`await_durable: false`).
313 async fn merge(&self, records: Vec<MergeRecordOp>) -> StorageResult<WriteResult> {
314 self.merge_with_options(records, WriteOptions::default())
315 .await
316 }
317
318 /// Merges values with custom write options.
319 ///
320 /// `options.await_durable` controls whether this call returns only after
321 /// the batch reaches durable storage.
322 ///
323 /// # Errors
324 ///
325 /// Returns an error if no merge operator is configured or if the merge
326 /// operation fails.
327 async fn merge_with_options(
328 &self,
329 records: Vec<MergeRecordOp>,
330 options: WriteOptions,
331 ) -> StorageResult<WriteResult>;
332
333 /// Creates a point-in-time snapshot of the storage.
334 ///
335 /// The snapshot provides a consistent read-only view of the database at the time
336 /// the snapshot was created. Reads from the snapshot will not see any subsequent
337 /// writes to the underlying storage.
338 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
339
340 /// Flushes all pending writes to durable storage.
341 ///
342 /// This ensures that all writes that have been acknowledged are persisted
343 /// to durable storage. For SlateDB, this flushes the memtable to the WAL
344 /// and object store.
345 ///
346 /// # Errors
347 ///
348 /// Returns an error if durability cannot be established.
349 async fn flush(&self) -> StorageResult<()>;
350
351 /// Closes the storage, releasing any resources.
352 ///
353 /// This method should be called before dropping the storage to ensure
354 /// proper cleanup. For SlateDB, this releases the database fence.
355 ///
356 /// # Errors
357 ///
358 /// Returns an error if resource shutdown fails.
359 async fn close(&self) -> StorageResult<()>;
360
361 /// Subscribes to the durable sequence watermark.
362 ///
363 /// Returns a `watch::Receiver<u64>` that tracks the highest sequence number
364 /// confirmed durable by the storage engine. For SlateDB, this maps to
365 /// `DbStatus::durable_seq`. For in-memory storage, writes are immediately
366 /// "durable" so the watermark matches the latest written seqnum.
367 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64>;
368
369 /// Registers storage engine metrics into the given Prometheus registry.
370 ///
371 /// The default implementation is a no-op. Storage backends that expose
372 /// internal metrics (e.g., SlateDB) override this to register gauges
373 /// that read live values on each scrape.
374 #[cfg(feature = "metrics")]
375 fn register_metrics(&self, _registry: &mut prometheus_client::registry::Registry) {}
376}