Skip to main content

common/storage/
mod.rs

1pub mod config;
2pub mod factory;
3pub mod in_memory;
4pub mod loader;
5pub mod slate;
6pub mod util;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bytes::Bytes;
12
13use crate::BytesRange;
14
15#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
16pub enum Ttl {
17    #[default]
18    Default,
19    NoExpiry,
20    ExpireAfter(u64),
21}
22
23#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
24pub struct PutOptions {
25    pub ttl: Ttl,
26}
27
28/// Encapsulates a record being put along with options specific to the put.
29#[derive(Clone, Debug)]
30pub struct PutRecordOp {
31    pub record: Record,
32    pub options: PutOptions,
33}
34
35impl PutRecordOp {
36    pub fn new(record: Record) -> Self {
37        Self {
38            record,
39            options: PutOptions::default(),
40        }
41    }
42
43    pub fn new_with_options(record: Record, options: PutOptions) -> Self {
44        Self { record, options }
45    }
46
47    pub fn with_options(self, options: PutOptions) -> Self {
48        Self {
49            record: self.record,
50            options,
51        }
52    }
53}
54
55/// Converts a Record to a PutRecordOp with default options
56impl From<Record> for PutRecordOp {
57    fn from(record: Record) -> Self {
58        Self::new(record)
59    }
60}
61
62#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
63pub struct MergeOptions {
64    pub ttl: Ttl,
65}
66
67/// Encapsulates a record written as part of a merge op along with options specific to the merge.
68#[derive(Clone, Debug)]
69pub struct MergeRecordOp {
70    pub record: Record,
71    pub options: MergeOptions,
72}
73
74impl MergeRecordOp {
75    pub fn new(record: Record) -> Self {
76        Self {
77            record,
78            options: MergeOptions::default(),
79        }
80    }
81
82    pub fn new_with_ttl(record: Record, options: MergeOptions) -> Self {
83        Self { record, options }
84    }
85}
86
87/// Converts a Record to a PutRecordOp with default options
88impl From<Record> for MergeRecordOp {
89    fn from(record: Record) -> Self {
90        Self::new(record)
91    }
92}
93
94#[derive(Clone, Debug)]
95pub struct Record {
96    pub key: Bytes,
97    pub value: Bytes,
98}
99
100impl Record {
101    pub fn new(key: Bytes, value: Bytes) -> Self {
102        Self { key, value }
103    }
104
105    pub fn empty(key: Bytes) -> Self {
106        Self::new(key, Bytes::new())
107    }
108}
109
110#[derive(Clone, Debug)]
111pub enum RecordOp {
112    Put(PutRecordOp),
113    Merge(MergeRecordOp),
114    Delete(Bytes),
115}
116
117/// Options for write operations.
118///
119/// Controls the durability behavior of write operations like [`Storage::put`]
120/// and [`Storage::put_with_options`].
121#[derive(Debug, Clone, Default)]
122pub struct WriteOptions {
123    /// Whether to wait for the write to be durable before returning.
124    ///
125    /// When `true`, the operation will not return until the data has been
126    /// persisted to durable storage (e.g., flushed to the WAL and acknowledged
127    /// by the object store).
128    ///
129    /// When `false` (the default), the operation returns as soon as the data
130    /// is in memory, providing lower latency but risking data loss on crash.
131    pub await_durable: bool,
132}
133
134/// Error type for storage operations
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub enum StorageError {
137    /// Storage-related errors
138    Storage(String),
139    /// Internal errors
140    Internal(String),
141}
142
143impl std::error::Error for StorageError {}
144
145impl std::fmt::Display for StorageError {
146    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
147        match self {
148            StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
149            StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
150        }
151    }
152}
153
154impl StorageError {
155    /// Converts a storage error to StorageError::Storage.
156    pub fn from_storage(e: impl std::fmt::Display) -> Self {
157        StorageError::Storage(e.to_string())
158    }
159}
160
161/// Result type alias for storage operations
162pub type StorageResult<T> = std::result::Result<T, StorageError>;
163
164/// Result of a write operation, containing the sequence number assigned by the storage engine.
165#[derive(Clone, Copy, Debug, PartialEq, Eq)]
166pub struct WriteResult {
167    /// The sequence number assigned to this write by the underlying storage engine.
168    pub seqnum: u64,
169}
170
171/// Trait for merging existing values with new values.
172///
173/// Merge operators must be associative: `merge(merge(a, b), c) == merge(a, merge(b, c))`.
174/// This ensures consistent merging behavior regardless of the order of operations.
175pub trait MergeOperator: Send + Sync {
176    /// Merges an existing value with a new value to produce a merged result.
177    ///
178    /// # Arguments
179    /// * `key` - The key associated with the values being merged
180    /// * `existing_value` - The current value stored in the database (if any)
181    /// * `new_value` - The new value to merge with the existing value
182    ///
183    /// # Returns
184    /// The merged value.
185    fn merge(&self, key: &Bytes, existing_value: Option<Bytes>, new_value: Bytes) -> Bytes;
186
187    /// Merges a batch of operands with an optional existing value.
188    ///
189    /// The default implementation applies pairwise merging. Implementations can
190    /// override this for better performance by constructing a single merge result once
191    /// for multiple input values.
192    ///
193    /// # Arguments
194    /// * `key` - The key associated with the values being merged
195    /// * `existing_value` - The current value stored in the database (if any)
196    /// * `operands` - A slice of operands to merge, ordered from oldest to newest
197    fn merge_batch(&self, key: &Bytes, existing_value: Option<Bytes>, operands: &[Bytes]) -> Bytes {
198        default_merge_batch(key, existing_value, operands, |k, e, v| self.merge(k, e, v))
199    }
200}
201
202pub fn default_merge_batch(
203    key: &Bytes,
204    existing_value: Option<Bytes>,
205    operands: &[Bytes],
206    merge: impl Fn(&Bytes, Option<Bytes>, Bytes) -> Bytes,
207) -> Bytes {
208    let mut result = existing_value;
209    for operand in operands {
210        result = Some(merge(key, result, operand.clone()));
211    }
212    result.expect("merge_batch called with no existing value and no operands")
213}
214
215/// Iterator over storage records.
216#[async_trait]
217pub trait StorageIterator {
218    /// Returns the next record from this iterator.
219    ///
220    /// Returns `Ok(None)` when the iterator is exhausted.
221    async fn next(&mut self) -> StorageResult<Option<Record>>;
222}
223
224/// Common read operations supported by both Storage and StorageSnapshot.
225///
226/// This trait provides the core read methods that are shared between full storage
227/// access and point-in-time snapshots. By extracting these common operations,
228/// we can write code that works with both storage types.
229#[async_trait]
230pub trait StorageRead: Send + Sync {
231    /// Retrieves a single record by exact key.
232    ///
233    /// Returns `Ok(None)` if the key is not present.
234    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
235
236    /// Returns an iterator over records in the given range.
237    ///
238    /// The returned iterator is owned and does not borrow from the storage,
239    /// allowing it to be stored in structs or passed across await points.
240    async fn scan_iter(
241        &self,
242        range: BytesRange,
243    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
244
245    /// Collects all records in the range into a Vec.
246    #[tracing::instrument(level = "trace", skip_all)]
247    async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
248        let mut iter = self.scan_iter(range).await?;
249        let mut records = Vec::new();
250        while let Some(record) = iter.next().await? {
251            records.push(record);
252        }
253        Ok(records)
254    }
255}
256
257/// A point-in-time snapshot of the storage layer.
258///
259/// Snapshots provide a consistent read-only view of the database at the time
260/// the snapshot was created. Reads from a snapshot will not see any subsequent
261/// writes to the underlying storage.
262#[async_trait]
263pub trait StorageSnapshot: StorageRead {}
264
265/// The storage type encapsulates access to the underlying storage (e.g. SlateDB).
266#[async_trait]
267pub trait Storage: StorageRead {
268    /// Applies a batch of mixed operations (puts, merges, and deletes) atomically.
269    ///
270    /// All operations in the batch are written together in a single `WriteBatch`,
271    /// so either all succeed or none are visible. This is the primary write method
272    /// used by flushers that produce a mix of operation types from a frozen delta.
273    ///
274    /// Uses `WriteOptions::default()` (`await_durable: false`).
275    async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<WriteResult> {
276        self.apply_with_options(ops, WriteOptions::default()).await
277    }
278
279    /// Applies a batch of mixed operations with custom write options.
280    ///
281    /// `options.await_durable` controls whether this call returns only after
282    /// the batch reaches durable storage.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if the batch cannot be applied.
287    async fn apply_with_options(
288        &self,
289        ops: Vec<RecordOp>,
290        options: WriteOptions,
291    ) -> StorageResult<WriteResult>;
292
293    /// Writes records to storage.
294    ///
295    /// Uses `WriteOptions::default()` (`await_durable: false`).
296    async fn put(&self, records: Vec<PutRecordOp>) -> StorageResult<WriteResult> {
297        self.put_with_options(records, WriteOptions::default())
298            .await
299    }
300
301    /// Writes records to storage with custom options.
302    ///
303    /// This method allows control over durability behavior. Use this when you
304    /// need to specify whether to wait for writes to be durable.
305    ///
306    /// # Arguments
307    ///
308    /// * `records` - The records to write
309    /// * `options` - Write options controlling durability behavior
310    ///
311    /// # Errors
312    ///
313    /// Returns an error if any write in the batch fails.
314    async fn put_with_options(
315        &self,
316        records: Vec<PutRecordOp>,
317        options: WriteOptions,
318    ) -> StorageResult<WriteResult>;
319
320    /// Merges values for the given keys using the configured merge operator.
321    ///
322    /// This method requires the underlying storage engine to be configured with
323    /// a merge operator. If no merge operator is configured, this method will
324    /// return a `StorageError::Storage` error.
325    ///
326    /// The merge operation is atomic - all merges in the batch are applied
327    /// together or not at all.
328    ///
329    /// Uses `WriteOptions::default()` (`await_durable: false`).
330    async fn merge(&self, records: Vec<MergeRecordOp>) -> StorageResult<WriteResult> {
331        self.merge_with_options(records, WriteOptions::default())
332            .await
333    }
334
335    /// Merges values with custom write options.
336    ///
337    /// `options.await_durable` controls whether this call returns only after
338    /// the batch reaches durable storage.
339    ///
340    /// # Errors
341    ///
342    /// Returns an error if no merge operator is configured or if the merge
343    /// operation fails.
344    async fn merge_with_options(
345        &self,
346        records: Vec<MergeRecordOp>,
347        options: WriteOptions,
348    ) -> StorageResult<WriteResult>;
349
350    /// Creates a point-in-time snapshot of the storage.
351    ///
352    /// The snapshot provides a consistent read-only view of the database at the time
353    /// the snapshot was created. Reads from the snapshot will not see any subsequent
354    /// writes to the underlying storage.
355    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
356
357    /// Flushes all pending writes to durable storage.
358    ///
359    /// This ensures that all writes that have been acknowledged are persisted
360    /// to durable storage. For SlateDB, this flushes the memtable to the WAL
361    /// and object store.
362    ///
363    /// # Errors
364    ///
365    /// Returns an error if durability cannot be established.
366    async fn flush(&self) -> StorageResult<()>;
367
368    /// Closes the storage, releasing any resources.
369    ///
370    /// This method should be called before dropping the storage to ensure
371    /// proper cleanup. For SlateDB, this releases the database fence.
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if resource shutdown fails.
376    async fn close(&self) -> StorageResult<()>;
377
378    /// Subscribes to the durable sequence watermark.
379    ///
380    /// Returns a `watch::Receiver<u64>` that tracks the highest sequence number
381    /// confirmed durable by the storage engine. For SlateDB, this maps to
382    /// `DbStatus::durable_seq`. For in-memory storage, writes are immediately
383    /// "durable" so the watermark matches the latest written seqnum.
384    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64>;
385
386    /// Registers storage engine metrics into the given Prometheus registry.
387    ///
388    /// The default implementation is a no-op. Storage backends that expose
389    /// internal metrics (e.g., SlateDB) override this to register gauges
390    /// that read live values on each scrape.
391    #[cfg(feature = "metrics")]
392    fn register_metrics(&self, _registry: &mut prometheus_client::registry::Registry) {}
393}