Skip to main content

common/storage/
mod.rs

1pub mod config;
2pub mod factory;
3pub mod in_memory;
4pub mod loader;
5pub mod metrics_recorder;
6pub mod slate;
7pub mod util;
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use uuid::Uuid;
14
15use crate::BytesRange;
16
17/// Identifies a checkpoint of a storage backend at a point in time.
18///
19/// Checkpoints capture a manifest snapshot that a reader can later open
20/// against to get a consistent view of the database at the time the
21/// checkpoint was created.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub struct CheckpointInfo {
24    pub id: Uuid,
25    pub manifest_id: u64,
26}
27
28#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
29pub enum Ttl {
30    #[default]
31    Default,
32    NoExpiry,
33    ExpireAfter(u64),
34    ExpireAt(i64),
35}
36
37#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
38pub struct PutOptions {
39    pub ttl: Ttl,
40}
41
42/// Encapsulates a record being put along with options specific to the put.
43#[derive(Clone, Debug)]
44pub struct PutRecordOp {
45    pub record: Record,
46    pub options: PutOptions,
47}
48
49impl PutRecordOp {
50    pub fn new(record: Record) -> Self {
51        Self {
52            record,
53            options: PutOptions::default(),
54        }
55    }
56
57    pub fn new_with_options(record: Record, options: PutOptions) -> Self {
58        Self { record, options }
59    }
60
61    pub fn with_options(self, options: PutOptions) -> Self {
62        Self {
63            record: self.record,
64            options,
65        }
66    }
67}
68
69/// Converts a Record to a PutRecordOp with default options
70impl From<Record> for PutRecordOp {
71    fn from(record: Record) -> Self {
72        Self::new(record)
73    }
74}
75
76#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
77pub struct MergeOptions {
78    pub ttl: Ttl,
79}
80
81/// Encapsulates a record written as part of a merge op along with options specific to the merge.
82#[derive(Clone, Debug)]
83pub struct MergeRecordOp {
84    pub record: Record,
85    pub options: MergeOptions,
86}
87
88impl MergeRecordOp {
89    pub fn new(record: Record) -> Self {
90        Self {
91            record,
92            options: MergeOptions::default(),
93        }
94    }
95
96    pub fn new_with_ttl(record: Record, options: MergeOptions) -> Self {
97        Self { record, options }
98    }
99}
100
101/// Converts a Record to a PutRecordOp with default options
102impl From<Record> for MergeRecordOp {
103    fn from(record: Record) -> Self {
104        Self::new(record)
105    }
106}
107
108#[derive(Clone, Debug)]
109pub struct Record {
110    pub key: Bytes,
111    pub value: Bytes,
112}
113
114impl Record {
115    pub fn new(key: Bytes, value: Bytes) -> Self {
116        Self { key, value }
117    }
118
119    pub fn empty(key: Bytes) -> Self {
120        Self::new(key, Bytes::new())
121    }
122}
123
124#[derive(Clone, Debug)]
125pub enum RecordOp {
126    Put(PutRecordOp),
127    Merge(MergeRecordOp),
128    Delete(Bytes),
129}
130
131/// Options for write operations.
132///
133/// Controls the durability behavior of write operations like [`Storage::put`]
134/// and [`Storage::put_with_options`].
135#[derive(Debug, Clone, Default)]
136pub struct WriteOptions {
137    /// Whether to wait for the write to be durable before returning.
138    ///
139    /// When `true`, the operation will not return until the data has been
140    /// persisted to durable storage (e.g., flushed to the WAL and acknowledged
141    /// by the object store).
142    ///
143    /// When `false` (the default), the operation returns as soon as the data
144    /// is in memory, providing lower latency but risking data loss on crash.
145    pub await_durable: bool,
146}
147
148/// Error type for storage operations
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub enum StorageError {
151    /// Storage-related errors
152    Storage(String),
153    /// Internal errors
154    Internal(String),
155}
156
157impl std::error::Error for StorageError {}
158
159impl std::fmt::Display for StorageError {
160    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
161        match self {
162            StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
163            StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
164        }
165    }
166}
167
168impl StorageError {
169    /// Converts a storage error to StorageError::Storage.
170    pub fn from_storage(e: impl std::fmt::Display) -> Self {
171        StorageError::Storage(e.to_string())
172    }
173}
174
175/// Result type alias for storage operations
176pub type StorageResult<T> = std::result::Result<T, StorageError>;
177
178/// Result of a write operation, containing the sequence number assigned by the storage engine.
179#[derive(Clone, Copy, Debug, PartialEq, Eq)]
180pub struct WriteResult {
181    /// The sequence number assigned to this write by the underlying storage engine.
182    pub seqnum: u64,
183}
184
185/// Trait for merging existing values with new values.
186///
187/// Merge operators must be associative: `merge_batch(merge_batch(a, [b]), [c]) == merge_batch(a, merge_batch(b, [c]))`.
188/// This ensures consistent merging behavior regardless of the order of operations.
189pub trait MergeOperator: Send + Sync {
190    /// Merges a batch of operands with an optional existing value.
191    ///
192    /// # Arguments
193    /// * `key` - The key associated with the values being merged
194    /// * `existing_value` - The current value stored in the database (if any)
195    /// * `operands` - A slice of operands to merge, ordered from oldest to newest
196    fn merge_batch(&self, key: &Bytes, existing_value: Option<Bytes>, operands: &[Bytes]) -> Bytes;
197}
198
199pub fn default_merge_batch(
200    key: &Bytes,
201    existing_value: Option<Bytes>,
202    operands: &[Bytes],
203    merge: impl Fn(&Bytes, Option<Bytes>, Bytes) -> Bytes,
204) -> Bytes {
205    let mut result = existing_value;
206    for operand in operands {
207        result = Some(merge(key, result, operand.clone()));
208    }
209    result.expect("merge_batch called with no existing value and no operands")
210}
211
212/// Iterator over storage records.
213#[async_trait]
214pub trait StorageIterator {
215    /// Returns the next record from this iterator.
216    ///
217    /// Returns `Ok(None)` when the iterator is exhausted.
218    async fn next(&mut self) -> StorageResult<Option<Record>>;
219}
220
221/// Common read operations supported by both Storage and StorageSnapshot.
222///
223/// This trait provides the core read methods that are shared between full storage
224/// access and point-in-time snapshots. By extracting these common operations,
225/// we can write code that works with both storage types.
226#[async_trait]
227pub trait StorageRead: Send + Sync {
228    /// Retrieves a single record by exact key.
229    ///
230    /// Returns `Ok(None)` if the key is not present.
231    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
232
233    /// Returns an iterator over records in the given range.
234    ///
235    /// The returned iterator is owned and does not borrow from the storage,
236    /// allowing it to be stored in structs or passed across await points.
237    async fn scan_iter(
238        &self,
239        range: BytesRange,
240    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
241
242    /// Collects all records in the range into a Vec.
243    #[tracing::instrument(level = "trace", skip_all)]
244    async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
245        let mut iter = self.scan_iter(range).await?;
246        let mut records = Vec::new();
247        while let Some(record) = iter.next().await? {
248            records.push(record);
249        }
250        Ok(records)
251    }
252
253    /// Closes the storage, releasing any resources.
254    ///
255    /// This method should be called before dropping the storage to ensure
256    /// proper cleanup. For SlateDB, this releases the database fence.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if resource shutdown fails.
261    async fn close(&self) -> StorageResult<()> {
262        Ok(())
263    }
264}
265
266/// A point-in-time snapshot of the storage layer.
267///
268/// Snapshots provide a consistent read-only view of the database at the time
269/// the snapshot was created. Reads from a snapshot will not see any subsequent
270/// writes to the underlying storage.
271#[async_trait]
272pub trait StorageSnapshot: StorageRead {}
273
274/// The storage type encapsulates access to the underlying storage (e.g. SlateDB).
275#[async_trait]
276pub trait Storage: StorageRead {
277    /// Applies a batch of mixed operations (puts, merges, and deletes) atomically.
278    ///
279    /// All operations in the batch are written together in a single `WriteBatch`,
280    /// so either all succeed or none are visible. This is the primary write method
281    /// used by flushers that produce a mix of operation types from a frozen delta.
282    ///
283    /// Uses `WriteOptions::default()` (`await_durable: false`).
284    async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<WriteResult> {
285        self.apply_with_options(ops, WriteOptions::default()).await
286    }
287
288    /// Applies a batch of mixed operations with custom write options.
289    ///
290    /// `options.await_durable` controls whether this call returns only after
291    /// the batch reaches durable storage.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the batch cannot be applied.
296    async fn apply_with_options(
297        &self,
298        ops: Vec<RecordOp>,
299        options: WriteOptions,
300    ) -> StorageResult<WriteResult>;
301
302    /// Writes records to storage.
303    ///
304    /// Uses `WriteOptions::default()` (`await_durable: false`).
305    async fn put(&self, records: Vec<PutRecordOp>) -> StorageResult<WriteResult> {
306        self.put_with_options(records, WriteOptions::default())
307            .await
308    }
309
310    /// Writes records to storage with custom options.
311    ///
312    /// This method allows control over durability behavior. Use this when you
313    /// need to specify whether to wait for writes to be durable.
314    ///
315    /// # Arguments
316    ///
317    /// * `records` - The records to write
318    /// * `options` - Write options controlling durability behavior
319    ///
320    /// # Errors
321    ///
322    /// Returns an error if any write in the batch fails.
323    async fn put_with_options(
324        &self,
325        records: Vec<PutRecordOp>,
326        options: WriteOptions,
327    ) -> StorageResult<WriteResult>;
328
329    /// Merges values for the given keys using the configured merge operator.
330    ///
331    /// This method requires the underlying storage engine to be configured with
332    /// a merge operator. If no merge operator is configured, this method will
333    /// return a `StorageError::Storage` error.
334    ///
335    /// The merge operation is atomic - all merges in the batch are applied
336    /// together or not at all.
337    ///
338    /// Uses `WriteOptions::default()` (`await_durable: false`).
339    async fn merge(&self, records: Vec<MergeRecordOp>) -> StorageResult<WriteResult> {
340        self.merge_with_options(records, WriteOptions::default())
341            .await
342    }
343
344    /// Merges values with custom write options.
345    ///
346    /// `options.await_durable` controls whether this call returns only after
347    /// the batch reaches durable storage.
348    ///
349    /// # Errors
350    ///
351    /// Returns an error if no merge operator is configured or if the merge
352    /// operation fails.
353    async fn merge_with_options(
354        &self,
355        records: Vec<MergeRecordOp>,
356        options: WriteOptions,
357    ) -> StorageResult<WriteResult>;
358
359    /// Creates a point-in-time snapshot of the storage.
360    ///
361    /// The snapshot provides a consistent read-only view of the database at the time
362    /// the snapshot was created. Reads from the snapshot will not see any subsequent
363    /// writes to the underlying storage.
364    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
365
366    /// Flushes all pending writes to durable storage.
367    ///
368    /// This ensures that all writes that have been acknowledged are persisted
369    /// to durable storage. For SlateDB, this flushes the memtable to the WAL
370    /// and object store.
371    ///
372    /// # Errors
373    ///
374    /// Returns an error if durability cannot be established.
375    async fn flush(&self) -> StorageResult<()>;
376
377    /// Subscribes to the durable sequence watermark.
378    ///
379    /// Returns a `watch::Receiver<u64>` that tracks the highest sequence number
380    /// confirmed durable by the storage engine. For SlateDB, this maps to
381    /// `DbStatus::durable_seq`. For in-memory storage, writes are immediately
382    /// "durable" so the watermark matches the latest written seqnum.
383    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64>;
384
385    /// Creates a durable checkpoint of the current state.
386    ///
387    /// The returned [`CheckpointInfo::id`] can later be passed to a
388    /// `StorageReaderRuntime` to open a reader pinned to this exact view of
389    /// the database. For SlateDB, this maps to `Db::create_checkpoint` with
390    /// `CheckpointScope::All` (flushes WALs and freezes the memtable so the
391    /// checkpoint includes recent writes).
392    ///
393    /// # Errors
394    ///
395    /// Returns an error if the underlying backend does not support
396    /// checkpoints, or if the checkpoint cannot be persisted.
397    async fn create_checkpoint(&self) -> StorageResult<CheckpointInfo>;
398}