Skip to main content

common/storage/
mod.rs

1pub mod config;
2pub mod factory;
3pub mod in_memory;
4pub mod loader;
5pub mod metrics_recorder;
6pub mod slate;
7pub mod sst_blocks;
8pub mod util;
9
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use uuid::Uuid;
15
16use crate::BytesRange;
17
18/// Identifies a checkpoint of a storage backend at a point in time.
19///
20/// Checkpoints capture a manifest snapshot that a reader can later open
21/// against to get a consistent view of the database at the time the
22/// checkpoint was created.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub struct CheckpointInfo {
25    pub id: Uuid,
26    pub manifest_id: u64,
27}
28
29#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
30pub enum Ttl {
31    #[default]
32    Default,
33    NoExpiry,
34    ExpireAfter(u64),
35    ExpireAt(i64),
36}
37
38#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
39pub struct PutOptions {
40    pub ttl: Ttl,
41}
42
43/// Encapsulates a record being put along with options specific to the put.
44#[derive(Clone, Debug)]
45pub struct PutRecordOp {
46    pub record: Record,
47    pub options: PutOptions,
48}
49
50impl PutRecordOp {
51    pub fn new(record: Record) -> Self {
52        Self {
53            record,
54            options: PutOptions::default(),
55        }
56    }
57
58    pub fn new_with_options(record: Record, options: PutOptions) -> Self {
59        Self { record, options }
60    }
61
62    pub fn with_options(self, options: PutOptions) -> Self {
63        Self {
64            record: self.record,
65            options,
66        }
67    }
68}
69
70/// Converts a Record to a PutRecordOp with default options
71impl From<Record> for PutRecordOp {
72    fn from(record: Record) -> Self {
73        Self::new(record)
74    }
75}
76
77#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
78pub struct MergeOptions {
79    pub ttl: Ttl,
80}
81
82/// Encapsulates a record written as part of a merge op along with options specific to the merge.
83#[derive(Clone, Debug)]
84pub struct MergeRecordOp {
85    pub record: Record,
86    pub options: MergeOptions,
87}
88
89impl MergeRecordOp {
90    pub fn new(record: Record) -> Self {
91        Self {
92            record,
93            options: MergeOptions::default(),
94        }
95    }
96
97    pub fn new_with_ttl(record: Record, options: MergeOptions) -> Self {
98        Self { record, options }
99    }
100}
101
102/// Converts a Record to a PutRecordOp with default options
103impl From<Record> for MergeRecordOp {
104    fn from(record: Record) -> Self {
105        Self::new(record)
106    }
107}
108
109#[derive(Clone, Debug)]
110pub struct Record {
111    pub key: Bytes,
112    pub value: Bytes,
113}
114
115impl Record {
116    pub fn new(key: Bytes, value: Bytes) -> Self {
117        Self { key, value }
118    }
119
120    pub fn empty(key: Bytes) -> Self {
121        Self::new(key, Bytes::new())
122    }
123}
124
125#[derive(Clone, Debug)]
126pub enum RecordOp {
127    Put(PutRecordOp),
128    Merge(MergeRecordOp),
129    Delete(Bytes),
130}
131
132/// Options for write operations.
133///
134/// Controls the durability behavior of write operations like [`Storage::put`]
135/// and [`Storage::put_with_options`].
136#[derive(Debug, Clone, Default)]
137pub struct WriteOptions {
138    /// Whether to wait for the write to be durable before returning.
139    ///
140    /// When `true`, the operation will not return until the data has been
141    /// persisted to durable storage (e.g., flushed to the WAL and acknowledged
142    /// by the object store).
143    ///
144    /// When `false` (the default), the operation returns as soon as the data
145    /// is in memory, providing lower latency but risking data loss on crash.
146    pub await_durable: bool,
147}
148
149/// Error type for storage operations
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub enum StorageError {
152    /// Storage-related errors
153    Storage(String),
154    /// Internal errors
155    Internal(String),
156}
157
158impl std::error::Error for StorageError {}
159
160impl std::fmt::Display for StorageError {
161    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
162        match self {
163            StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
164            StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
165        }
166    }
167}
168
169impl StorageError {
170    /// Converts a storage error to StorageError::Storage.
171    pub fn from_storage(e: impl std::fmt::Display) -> Self {
172        StorageError::Storage(e.to_string())
173    }
174}
175
176/// Result type alias for storage operations
177pub type StorageResult<T> = std::result::Result<T, StorageError>;
178
179/// Result of a write operation, containing the sequence number assigned by the storage engine.
180#[derive(Clone, Copy, Debug, PartialEq, Eq)]
181pub struct WriteResult {
182    /// The sequence number assigned to this write by the underlying storage engine.
183    pub seqnum: u64,
184}
185
186/// Trait for merging existing values with new values.
187///
188/// Merge operators must be associative: `merge_batch(merge_batch(a, [b]), [c]) == merge_batch(a, merge_batch(b, [c]))`.
189/// This ensures consistent merging behavior regardless of the order of operations.
190pub trait MergeOperator: Send + Sync {
191    /// Merges a batch of operands with an optional existing value.
192    ///
193    /// # Arguments
194    /// * `key` - The key associated with the values being merged
195    /// * `existing_value` - The current value stored in the database (if any)
196    /// * `operands` - A slice of operands to merge, ordered from oldest to newest
197    fn merge_batch(&self, key: &Bytes, existing_value: Option<Bytes>, operands: &[Bytes]) -> Bytes;
198}
199
200pub fn default_merge_batch(
201    key: &Bytes,
202    existing_value: Option<Bytes>,
203    operands: &[Bytes],
204    merge: impl Fn(&Bytes, Option<Bytes>, Bytes) -> Bytes,
205) -> Bytes {
206    let mut result = existing_value;
207    for operand in operands {
208        result = Some(merge(key, result, operand.clone()));
209    }
210    result.expect("merge_batch called with no existing value and no operands")
211}
212
213/// Iterator over storage records.
214#[async_trait]
215pub trait StorageIterator {
216    /// Returns the next record from this iterator.
217    ///
218    /// Returns `Ok(None)` when the iterator is exhausted.
219    async fn next(&mut self) -> StorageResult<Option<Record>>;
220}
221
222/// Common read operations supported by both Storage and StorageSnapshot.
223///
224/// This trait provides the core read methods that are shared between full storage
225/// access and point-in-time snapshots. By extracting these common operations,
226/// we can write code that works with both storage types.
227#[async_trait]
228pub trait StorageRead: Send + Sync {
229    /// Retrieves a single record by exact key.
230    ///
231    /// Returns `Ok(None)` if the key is not present.
232    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
233
234    /// Returns an iterator over records in the given range.
235    ///
236    /// The returned iterator is owned and does not borrow from the storage,
237    /// allowing it to be stored in structs or passed across await points.
238    async fn scan_iter(
239        &self,
240        range: BytesRange,
241    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
242
243    /// Collects all records in the range into a Vec.
244    #[tracing::instrument(level = "trace", skip_all)]
245    async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
246        let mut iter = self.scan_iter(range).await?;
247        let mut records = Vec::new();
248        while let Some(record) = iter.next().await? {
249            records.push(record);
250        }
251        Ok(records)
252    }
253
254    /// Closes the storage, releasing any resources.
255    ///
256    /// This method should be called before dropping the storage to ensure
257    /// proper cleanup. For SlateDB, this releases the database fence.
258    ///
259    /// # Errors
260    ///
261    /// Returns an error if resource shutdown fails.
262    async fn close(&self) -> StorageResult<()> {
263        Ok(())
264    }
265}
266
267/// A point-in-time snapshot of the storage layer.
268///
269/// Snapshots provide a consistent read-only view of the database at the time
270/// the snapshot was created. Reads from a snapshot will not see any subsequent
271/// writes to the underlying storage.
272#[async_trait]
273pub trait StorageSnapshot: StorageRead {}
274
275/// The storage type encapsulates access to the underlying storage (e.g. SlateDB).
276#[async_trait]
277pub trait Storage: StorageRead {
278    /// Applies a batch of mixed operations (puts, merges, and deletes) atomically.
279    ///
280    /// All operations in the batch are written together in a single `WriteBatch`,
281    /// so either all succeed or none are visible. This is the primary write method
282    /// used by flushers that produce a mix of operation types from a frozen delta.
283    ///
284    /// Uses `WriteOptions::default()` (`await_durable: false`).
285    async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<WriteResult> {
286        self.apply_with_options(ops, WriteOptions::default()).await
287    }
288
289    /// Applies a batch of mixed operations with custom write options.
290    ///
291    /// `options.await_durable` controls whether this call returns only after
292    /// the batch reaches durable storage.
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if the batch cannot be applied.
297    async fn apply_with_options(
298        &self,
299        ops: Vec<RecordOp>,
300        options: WriteOptions,
301    ) -> StorageResult<WriteResult>;
302
303    /// Writes records to storage.
304    ///
305    /// Uses `WriteOptions::default()` (`await_durable: false`).
306    async fn put(&self, records: Vec<PutRecordOp>) -> StorageResult<WriteResult> {
307        self.put_with_options(records, WriteOptions::default())
308            .await
309    }
310
311    /// Writes records to storage with custom options.
312    ///
313    /// This method allows control over durability behavior. Use this when you
314    /// need to specify whether to wait for writes to be durable.
315    ///
316    /// # Arguments
317    ///
318    /// * `records` - The records to write
319    /// * `options` - Write options controlling durability behavior
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if any write in the batch fails.
324    async fn put_with_options(
325        &self,
326        records: Vec<PutRecordOp>,
327        options: WriteOptions,
328    ) -> StorageResult<WriteResult>;
329
330    /// Merges values for the given keys using the configured merge operator.
331    ///
332    /// This method requires the underlying storage engine to be configured with
333    /// a merge operator. If no merge operator is configured, this method will
334    /// return a `StorageError::Storage` error.
335    ///
336    /// The merge operation is atomic - all merges in the batch are applied
337    /// together or not at all.
338    ///
339    /// Uses `WriteOptions::default()` (`await_durable: false`).
340    async fn merge(&self, records: Vec<MergeRecordOp>) -> StorageResult<WriteResult> {
341        self.merge_with_options(records, WriteOptions::default())
342            .await
343    }
344
345    /// Merges values with custom write options.
346    ///
347    /// `options.await_durable` controls whether this call returns only after
348    /// the batch reaches durable storage.
349    ///
350    /// # Errors
351    ///
352    /// Returns an error if no merge operator is configured or if the merge
353    /// operation fails.
354    async fn merge_with_options(
355        &self,
356        records: Vec<MergeRecordOp>,
357        options: WriteOptions,
358    ) -> StorageResult<WriteResult>;
359
360    /// Creates a point-in-time snapshot of the storage.
361    ///
362    /// The snapshot provides a consistent read-only view of the database at the time
363    /// the snapshot was created. Reads from the snapshot will not see any subsequent
364    /// writes to the underlying storage.
365    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
366
367    /// Flushes all pending writes to durable storage.
368    ///
369    /// This ensures that all writes that have been acknowledged are persisted
370    /// to durable storage. For SlateDB, this flushes the memtable to the WAL
371    /// and object store.
372    ///
373    /// # Errors
374    ///
375    /// Returns an error if durability cannot be established.
376    async fn flush(&self) -> StorageResult<()>;
377
378    /// Subscribes to the durable sequence watermark.
379    ///
380    /// Returns a `watch::Receiver<u64>` that tracks the highest sequence number
381    /// confirmed durable by the storage engine. For SlateDB, this maps to
382    /// `DbStatus::durable_seq`. For in-memory storage, writes are immediately
383    /// "durable" so the watermark matches the latest written seqnum.
384    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64>;
385
386    /// Creates a durable checkpoint of the current state.
387    ///
388    /// The returned [`CheckpointInfo::id`] can later be passed to a
389    /// `StorageReaderRuntime` to open a reader pinned to this exact view of
390    /// the database. For SlateDB, this maps to `Db::create_checkpoint` with
391    /// `CheckpointScope::All` (flushes WALs and freezes the memtable so the
392    /// checkpoint includes recent writes).
393    ///
394    /// # Errors
395    ///
396    /// Returns an error if the underlying backend does not support
397    /// checkpoints, or if the checkpoint cannot be persisted.
398    async fn create_checkpoint(&self) -> StorageResult<CheckpointInfo>;
399}