Skip to main content

common/storage/
mod.rs

1pub mod config;
2pub mod factory;
3pub mod in_memory;
4pub mod loader;
5pub mod slate;
6pub mod util;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bytes::Bytes;
12
13use crate::BytesRange;
14
15#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
16pub enum Ttl {
17    #[default]
18    Default,
19    NoExpiry,
20    ExpireAfter(u64),
21}
22
23#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
24pub struct PutOptions {
25    pub ttl: Ttl,
26}
27
28/// Encapsulates a record being put along with options specific to the put.
29#[derive(Clone, Debug)]
30pub struct PutRecordOp {
31    pub record: Record,
32    pub options: PutOptions,
33}
34
35impl PutRecordOp {
36    pub fn new(record: Record) -> Self {
37        Self {
38            record,
39            options: PutOptions::default(),
40        }
41    }
42
43    pub fn new_with_options(record: Record, options: PutOptions) -> Self {
44        Self { record, options }
45    }
46
47    pub fn with_options(self, options: PutOptions) -> Self {
48        Self {
49            record: self.record,
50            options,
51        }
52    }
53}
54
55/// Converts a Record to a PutRecordOp with default options
56impl From<Record> for PutRecordOp {
57    fn from(record: Record) -> Self {
58        Self::new(record)
59    }
60}
61
62#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
63pub struct MergeOptions {
64    pub ttl: Ttl,
65}
66
67/// Encapsulates a record written as part of a merge op along with options specific to the merge.
68#[derive(Clone, Debug)]
69pub struct MergeRecordOp {
70    pub record: Record,
71    pub options: MergeOptions,
72}
73
74impl MergeRecordOp {
75    pub fn new(record: Record) -> Self {
76        Self {
77            record,
78            options: MergeOptions::default(),
79        }
80    }
81
82    pub fn new_with_ttl(record: Record, options: MergeOptions) -> Self {
83        Self { record, options }
84    }
85}
86
87/// Converts a Record to a PutRecordOp with default options
88impl From<Record> for MergeRecordOp {
89    fn from(record: Record) -> Self {
90        Self::new(record)
91    }
92}
93
94#[derive(Clone, Debug)]
95pub struct Record {
96    pub key: Bytes,
97    pub value: Bytes,
98}
99
100impl Record {
101    pub fn new(key: Bytes, value: Bytes) -> Self {
102        Self { key, value }
103    }
104
105    pub fn empty(key: Bytes) -> Self {
106        Self::new(key, Bytes::new())
107    }
108}
109
110#[derive(Clone, Debug)]
111pub enum RecordOp {
112    Put(PutRecordOp),
113    Merge(MergeRecordOp),
114    Delete(Bytes),
115}
116
117/// Options for write operations.
118///
119/// Controls the durability behavior of write operations like [`Storage::put`]
120/// and [`Storage::put_with_options`].
121#[derive(Debug, Clone, Default)]
122pub struct WriteOptions {
123    /// Whether to wait for the write to be durable before returning.
124    ///
125    /// When `true`, the operation will not return until the data has been
126    /// persisted to durable storage (e.g., flushed to the WAL and acknowledged
127    /// by the object store).
128    ///
129    /// When `false` (the default), the operation returns as soon as the data
130    /// is in memory, providing lower latency but risking data loss on crash.
131    pub await_durable: bool,
132}
133
134/// Error type for storage operations
135#[derive(Debug, Clone, PartialEq, Eq)]
136pub enum StorageError {
137    /// Storage-related errors
138    Storage(String),
139    /// Internal errors
140    Internal(String),
141}
142
143impl std::error::Error for StorageError {}
144
145impl std::fmt::Display for StorageError {
146    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
147        match self {
148            StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
149            StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
150        }
151    }
152}
153
154impl StorageError {
155    /// Converts a storage error to StorageError::Storage.
156    pub fn from_storage(e: impl std::fmt::Display) -> Self {
157        StorageError::Storage(e.to_string())
158    }
159}
160
161/// Result type alias for storage operations
162pub type StorageResult<T> = std::result::Result<T, StorageError>;
163
164/// Result of a write operation, containing the sequence number assigned by the storage engine.
165#[derive(Clone, Copy, Debug, PartialEq, Eq)]
166pub struct WriteResult {
167    /// The sequence number assigned to this write by the underlying storage engine.
168    pub seqnum: u64,
169}
170
171/// Trait for merging existing values with new values.
172///
173/// Merge operators must be associative: `merge_batch(merge_batch(a, [b]), [c]) == merge_batch(a, merge_batch(b, [c]))`.
174/// This ensures consistent merging behavior regardless of the order of operations.
175pub trait MergeOperator: Send + Sync {
176    /// Merges a batch of operands with an optional existing value.
177    ///
178    /// # Arguments
179    /// * `key` - The key associated with the values being merged
180    /// * `existing_value` - The current value stored in the database (if any)
181    /// * `operands` - A slice of operands to merge, ordered from oldest to newest
182    fn merge_batch(&self, key: &Bytes, existing_value: Option<Bytes>, operands: &[Bytes]) -> Bytes;
183}
184
185pub fn default_merge_batch(
186    key: &Bytes,
187    existing_value: Option<Bytes>,
188    operands: &[Bytes],
189    merge: impl Fn(&Bytes, Option<Bytes>, Bytes) -> Bytes,
190) -> Bytes {
191    let mut result = existing_value;
192    for operand in operands {
193        result = Some(merge(key, result, operand.clone()));
194    }
195    result.expect("merge_batch called with no existing value and no operands")
196}
197
198/// Iterator over storage records.
199#[async_trait]
200pub trait StorageIterator {
201    /// Returns the next record from this iterator.
202    ///
203    /// Returns `Ok(None)` when the iterator is exhausted.
204    async fn next(&mut self) -> StorageResult<Option<Record>>;
205}
206
207/// Common read operations supported by both Storage and StorageSnapshot.
208///
209/// This trait provides the core read methods that are shared between full storage
210/// access and point-in-time snapshots. By extracting these common operations,
211/// we can write code that works with both storage types.
212#[async_trait]
213pub trait StorageRead: Send + Sync {
214    /// Retrieves a single record by exact key.
215    ///
216    /// Returns `Ok(None)` if the key is not present.
217    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
218
219    /// Returns an iterator over records in the given range.
220    ///
221    /// The returned iterator is owned and does not borrow from the storage,
222    /// allowing it to be stored in structs or passed across await points.
223    async fn scan_iter(
224        &self,
225        range: BytesRange,
226    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
227
228    /// Collects all records in the range into a Vec.
229    #[tracing::instrument(level = "trace", skip_all)]
230    async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
231        let mut iter = self.scan_iter(range).await?;
232        let mut records = Vec::new();
233        while let Some(record) = iter.next().await? {
234            records.push(record);
235        }
236        Ok(records)
237    }
238}
239
240/// A point-in-time snapshot of the storage layer.
241///
242/// Snapshots provide a consistent read-only view of the database at the time
243/// the snapshot was created. Reads from a snapshot will not see any subsequent
244/// writes to the underlying storage.
245#[async_trait]
246pub trait StorageSnapshot: StorageRead {}
247
248/// The storage type encapsulates access to the underlying storage (e.g. SlateDB).
249#[async_trait]
250pub trait Storage: StorageRead {
251    /// Applies a batch of mixed operations (puts, merges, and deletes) atomically.
252    ///
253    /// All operations in the batch are written together in a single `WriteBatch`,
254    /// so either all succeed or none are visible. This is the primary write method
255    /// used by flushers that produce a mix of operation types from a frozen delta.
256    ///
257    /// Uses `WriteOptions::default()` (`await_durable: false`).
258    async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<WriteResult> {
259        self.apply_with_options(ops, WriteOptions::default()).await
260    }
261
262    /// Applies a batch of mixed operations with custom write options.
263    ///
264    /// `options.await_durable` controls whether this call returns only after
265    /// the batch reaches durable storage.
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if the batch cannot be applied.
270    async fn apply_with_options(
271        &self,
272        ops: Vec<RecordOp>,
273        options: WriteOptions,
274    ) -> StorageResult<WriteResult>;
275
276    /// Writes records to storage.
277    ///
278    /// Uses `WriteOptions::default()` (`await_durable: false`).
279    async fn put(&self, records: Vec<PutRecordOp>) -> StorageResult<WriteResult> {
280        self.put_with_options(records, WriteOptions::default())
281            .await
282    }
283
284    /// Writes records to storage with custom options.
285    ///
286    /// This method allows control over durability behavior. Use this when you
287    /// need to specify whether to wait for writes to be durable.
288    ///
289    /// # Arguments
290    ///
291    /// * `records` - The records to write
292    /// * `options` - Write options controlling durability behavior
293    ///
294    /// # Errors
295    ///
296    /// Returns an error if any write in the batch fails.
297    async fn put_with_options(
298        &self,
299        records: Vec<PutRecordOp>,
300        options: WriteOptions,
301    ) -> StorageResult<WriteResult>;
302
303    /// Merges values for the given keys using the configured merge operator.
304    ///
305    /// This method requires the underlying storage engine to be configured with
306    /// a merge operator. If no merge operator is configured, this method will
307    /// return a `StorageError::Storage` error.
308    ///
309    /// The merge operation is atomic - all merges in the batch are applied
310    /// together or not at all.
311    ///
312    /// Uses `WriteOptions::default()` (`await_durable: false`).
313    async fn merge(&self, records: Vec<MergeRecordOp>) -> StorageResult<WriteResult> {
314        self.merge_with_options(records, WriteOptions::default())
315            .await
316    }
317
318    /// Merges values with custom write options.
319    ///
320    /// `options.await_durable` controls whether this call returns only after
321    /// the batch reaches durable storage.
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if no merge operator is configured or if the merge
326    /// operation fails.
327    async fn merge_with_options(
328        &self,
329        records: Vec<MergeRecordOp>,
330        options: WriteOptions,
331    ) -> StorageResult<WriteResult>;
332
333    /// Creates a point-in-time snapshot of the storage.
334    ///
335    /// The snapshot provides a consistent read-only view of the database at the time
336    /// the snapshot was created. Reads from the snapshot will not see any subsequent
337    /// writes to the underlying storage.
338    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
339
340    /// Flushes all pending writes to durable storage.
341    ///
342    /// This ensures that all writes that have been acknowledged are persisted
343    /// to durable storage. For SlateDB, this flushes the memtable to the WAL
344    /// and object store.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if durability cannot be established.
349    async fn flush(&self) -> StorageResult<()>;
350
351    /// Closes the storage, releasing any resources.
352    ///
353    /// This method should be called before dropping the storage to ensure
354    /// proper cleanup. For SlateDB, this releases the database fence.
355    ///
356    /// # Errors
357    ///
358    /// Returns an error if resource shutdown fails.
359    async fn close(&self) -> StorageResult<()>;
360
361    /// Subscribes to the durable sequence watermark.
362    ///
363    /// Returns a `watch::Receiver<u64>` that tracks the highest sequence number
364    /// confirmed durable by the storage engine. For SlateDB, this maps to
365    /// `DbStatus::durable_seq`. For in-memory storage, writes are immediately
366    /// "durable" so the watermark matches the latest written seqnum.
367    fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64>;
368
369    /// Registers storage engine metrics into the given Prometheus registry.
370    ///
371    /// The default implementation is a no-op. Storage backends that expose
372    /// internal metrics (e.g., SlateDB) override this to register gauges
373    /// that read live values on each scrape.
374    #[cfg(feature = "metrics")]
375    fn register_metrics(&self, _registry: &mut prometheus_client::registry::Registry) {}
376}