Skip to main content

common/storage/
mod.rs

1pub mod config;
2pub mod factory;
3pub mod in_memory;
4pub mod loader;
5pub mod metrics_recorder;
6pub mod slate;
7pub mod util;
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use bytes::Bytes;
13
14use crate::BytesRange;
15
16#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
17pub enum Ttl {
18    #[default]
19    Default,
20    NoExpiry,
21    ExpireAfter(u64),
22}
23
24#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
25pub struct PutOptions {
26    pub ttl: Ttl,
27}
28
29/// Encapsulates a record being put along with options specific to the put.
30#[derive(Clone, Debug)]
31pub struct PutRecordOp {
32    pub record: Record,
33    pub options: PutOptions,
34}
35
36impl PutRecordOp {
37    pub fn new(record: Record) -> Self {
38        Self {
39            record,
40            options: PutOptions::default(),
41        }
42    }
43
44    pub fn new_with_options(record: Record, options: PutOptions) -> Self {
45        Self { record, options }
46    }
47
48    pub fn with_options(self, options: PutOptions) -> Self {
49        Self {
50            record: self.record,
51            options,
52        }
53    }
54}
55
56/// Converts a Record to a PutRecordOp with default options
57impl From<Record> for PutRecordOp {
58    fn from(record: Record) -> Self {
59        Self::new(record)
60    }
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub struct MergeOptions {
65    pub ttl: Ttl,
66}
67
68/// Encapsulates a record written as part of a merge op along with options specific to the merge.
69#[derive(Clone, Debug)]
70pub struct MergeRecordOp {
71    pub record: Record,
72    pub options: MergeOptions,
73}
74
75impl MergeRecordOp {
76    pub fn new(record: Record) -> Self {
77        Self {
78            record,
79            options: MergeOptions::default(),
80        }
81    }
82
83    pub fn new_with_ttl(record: Record, options: MergeOptions) -> Self {
84        Self { record, options }
85    }
86}
87
88/// Converts a Record to a PutRecordOp with default options
89impl From<Record> for MergeRecordOp {
90    fn from(record: Record) -> Self {
91        Self::new(record)
92    }
93}
94
95#[derive(Clone, Debug)]
96pub struct Record {
97    pub key: Bytes,
98    pub value: Bytes,
99}
100
101impl Record {
102    pub fn new(key: Bytes, value: Bytes) -> Self {
103        Self { key, value }
104    }
105
106    pub fn empty(key: Bytes) -> Self {
107        Self::new(key, Bytes::new())
108    }
109}
110
111#[derive(Clone, Debug)]
112pub enum RecordOp {
113    Put(PutRecordOp),
114    Merge(MergeRecordOp),
115    Delete(Bytes),
116}
117
118/// Options for write operations.
119///
120/// Controls the durability behavior of write operations like [`Storage::put`]
121/// and [`Storage::put_with_options`].
122#[derive(Debug, Clone, Default)]
123pub struct WriteOptions {
124    /// Whether to wait for the write to be durable before returning.
125    ///
126    /// When `true`, the operation will not return until the data has been
127    /// persisted to durable storage (e.g., flushed to the WAL and acknowledged
128    /// by the object store).
129    ///
130    /// When `false` (the default), the operation returns as soon as the data
131    /// is in memory, providing lower latency but risking data loss on crash.
132    pub await_durable: bool,
133}
134
135/// Error type for storage operations
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum StorageError {
138    /// Storage-related errors
139    Storage(String),
140    /// Internal errors
141    Internal(String),
142}
143
144impl std::error::Error for StorageError {}
145
146impl std::fmt::Display for StorageError {
147    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
148        match self {
149            StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
150            StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
151        }
152    }
153}
154
155impl StorageError {
156    /// Converts a storage error to StorageError::Storage.
157    pub fn from_storage(e: impl std::fmt::Display) -> Self {
158        StorageError::Storage(e.to_string())
159    }
160}
161
162/// Result type alias for storage operations
163pub type StorageResult<T> = std::result::Result<T, StorageError>;
164
165/// Result of a write operation, containing the sequence number assigned by the storage engine.
166#[derive(Clone, Copy, Debug, PartialEq, Eq)]
167pub struct WriteResult {
168    /// The sequence number assigned to this write by the underlying storage engine.
169    pub seqnum: u64,
170}
171
172/// Trait for merging existing values with new values.
173///
174/// Merge operators must be associative: `merge_batch(merge_batch(a, [b]), [c]) == merge_batch(a, merge_batch(b, [c]))`.
175/// This ensures consistent merging behavior regardless of the order of operations.
176pub trait MergeOperator: Send + Sync {
177    /// Merges a batch of operands with an optional existing value.
178    ///
179    /// # Arguments
180    /// * `key` - The key associated with the values being merged
181    /// * `existing_value` - The current value stored in the database (if any)
182    /// * `operands` - A slice of operands to merge, ordered from oldest to newest
183    fn merge_batch(&self, key: &Bytes, existing_value: Option<Bytes>, operands: &[Bytes]) -> Bytes;
184}
185
186pub fn default_merge_batch(
187    key: &Bytes,
188    existing_value: Option<Bytes>,
189    operands: &[Bytes],
190    merge: impl Fn(&Bytes, Option<Bytes>, Bytes) -> Bytes,
191) -> Bytes {
192    let mut result = existing_value;
193    for operand in operands {
194        result = Some(merge(key, result, operand.clone()));
195    }
196    result.expect("merge_batch called with no existing value and no operands")
197}
198
199/// Iterator over storage records.
200#[async_trait]
201pub trait StorageIterator {
202    /// Returns the next record from this iterator.
203    ///
204    /// Returns `Ok(None)` when the iterator is exhausted.
205    async fn next(&mut self) -> StorageResult<Option<Record>>;
206}
207
208/// Common read operations supported by both Storage and StorageSnapshot.
209///
210/// This trait provides the core read methods that are shared between full storage
211/// access and point-in-time snapshots. By extracting these common operations,
212/// we can write code that works with both storage types.
213#[async_trait]
214pub trait StorageRead: Send + Sync {
215    /// Retrieves a single record by exact key.
216    ///
217    /// Returns `Ok(None)` if the key is not present.
218    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
219
220    /// Returns an iterator over records in the given range.
221    ///
222    /// The returned iterator is owned and does not borrow from the storage,
223    /// allowing it to be stored in structs or passed across await points.
224    async fn scan_iter(
225        &self,
226        range: BytesRange,
227    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
228
229    /// Collects all records in the range into a Vec.
230    #[tracing::instrument(level = "trace", skip_all)]
231    async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
232        let mut iter = self.scan_iter(range).await?;
233        let mut records = Vec::new();
234        while let Some(record) = iter.next().await? {
235            records.push(record);
236        }
237        Ok(records)
238    }
239
240    /// Closes the storage, releasing any resources.
241    ///
242    /// This method should be called before dropping the storage to ensure
243    /// proper cleanup. For SlateDB, this releases the database fence.
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if resource shutdown fails.
248    async fn close(&self) -> StorageResult<()> {
249        Ok(())
250    }
251}
252
253/// A point-in-time snapshot of the storage layer.
254///
255/// Snapshots provide a consistent read-only view of the database at the time
256/// the snapshot was created. Reads from a snapshot will not see any subsequent
257/// writes to the underlying storage.
258#[async_trait]
259pub trait StorageSnapshot: StorageRead {}
260
261/// The storage type encapsulates access to the underlying storage (e.g. SlateDB).
262#[async_trait]
263pub trait Storage: StorageRead {
264    /// Applies a batch of mixed operations (puts, merges, and deletes) atomically.
265    ///
266    /// All operations in the batch are written together in a single `WriteBatch`,
267    /// so either all succeed or none are visible. This is the primary write method
268    /// used by flushers that produce a mix of operation types from a frozen delta.
269    ///
270    /// Uses `WriteOptions::default()` (`await_durable: false`).
271    async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<WriteResult> {
272        self.apply_with_options(ops, WriteOptions::default()).await
273    }
274
275    /// Applies a batch of mixed operations with custom write options.
276    ///
277    /// `options.await_durable` controls whether this call returns only after
278    /// the batch reaches durable storage.
279    ///
280    /// # Errors
281    ///
282    /// Returns an error if the batch cannot be applied.
283    async fn apply_with_options(
284        &self,
285        ops: Vec<RecordOp>,
286        options: WriteOptions,
287    ) -> StorageResult<WriteResult>;
288
289    /// Writes records to storage.
290    ///
291    /// Uses `WriteOptions::default()` (`await_durable: false`).
292    async fn put(&self, records: Vec<PutRecordOp>) -> StorageResult<WriteResult> {
293        self.put_with_options(records, WriteOptions::default())
294            .await
295    }
296
297    /// Writes records to storage with custom options.
298    ///
299    /// This method allows control over durability behavior. Use this when you
300    /// need to specify whether to wait for writes to be durable.
301    ///
302    /// # Arguments
303    ///
304    /// * `records` - The records to write
305    /// * `options` - Write options controlling durability behavior
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if any write in the batch fails.
310    async fn put_with_options(
311        &self,
312        records: Vec<PutRecordOp>,
313        options: WriteOptions,
314    ) -> StorageResult<WriteResult>;
315
316    /// Merges values for the given keys using the configured merge operator.
317    ///
318    /// This method requires the underlying storage engine to be configured with
319    /// a merge operator. If no merge operator is configured, this method will
320    /// return a `StorageError::Storage` error.
321    ///
322    /// The merge operation is atomic - all merges in the batch are applied
323    /// together or not at all.
324    ///
325    /// Uses `WriteOptions::default()` (`await_durable: false`).
326    async fn merge(&self, records: Vec<MergeRecordOp>) -> StorageResult<WriteResult> {
327        self.merge_with_options(records, WriteOptions::default())
328            .await
329    }
330
331    /// Merges values with custom write options.
332    ///
333    /// `options.await_durable` controls whether this call returns only after
334    /// the batch reaches durable storage.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if no merge operator is configured or if the merge
339    /// operation fails.
340    async fn merge_with_options(
341        &self,
342        records: Vec<MergeRecordOp>,
343        options: WriteOptions,
344    ) -> StorageResult<WriteResult>;
345
346    /// Creates a point-in-time snapshot of the storage.
347    ///
348    /// The snapshot provides a consistent read-only view of the database at the time
349    /// the snapshot was created. Reads from the snapshot will not see any subsequent
350    /// writes to the underlying storage.
351    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
352
353    /// Flushes all pending writes to durable storage.
354    ///
355    /// This ensures that all writes that have been acknowledged are persisted
356    /// to durable storage. For SlateDB, this flushes the memtable to the WAL
357    /// and object store.
358    ///
359    /// # Errors
360    ///
361    /// Returns an error if durability cannot be established.
362    async fn flush(&self) -> StorageResult<()>;
363
364    /// Subscribes to the durable sequence watermark.
365    ///
366    /// Returns a `watch::Receiver<u64>` that tracks the highest sequence number
367    /// confirmed durable by the storage engine. For SlateDB, this maps to
368    /// `DbStatus::durable_seq`. For in-memory storage, writes are immediately
369    /// "durable" so the watermark matches the latest written seqnum.
370    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64>;
371}