Skip to main content

common/storage/
mod.rs

1pub mod config;
2pub mod factory;
3pub mod in_memory;
4pub mod loader;
5pub mod slate;
6pub mod util;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bytes::Bytes;
12
13use crate::BytesRange;
14
15#[derive(Clone, Debug)]
16pub struct Record {
17    pub key: Bytes,
18    pub value: Bytes,
19}
20
21impl Record {
22    pub fn new(key: Bytes, value: Bytes) -> Self {
23        Self { key, value }
24    }
25
26    pub fn empty(key: Bytes) -> Self {
27        Self::new(key, Bytes::new())
28    }
29}
30
31#[derive(Clone, Debug)]
32pub enum RecordOp {
33    Put(Record),
34    Merge(Record),
35    Delete(Bytes),
36}
37
38/// Options for write operations.
39///
40/// Controls the durability behavior of write operations like [`Storage::put`]
41/// and [`Storage::put_with_options`].
42#[derive(Debug, Clone, Default)]
43pub struct WriteOptions {
44    /// Whether to wait for the write to be durable before returning.
45    ///
46    /// When `true`, the operation will not return until the data has been
47    /// persisted to durable storage (e.g., flushed to the WAL and acknowledged
48    /// by the object store).
49    ///
50    /// When `false` (the default), the operation returns as soon as the data
51    /// is in memory, providing lower latency but risking data loss on crash.
52    pub await_durable: bool,
53}
54
55/// Error type for storage operations
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum StorageError {
58    /// Storage-related errors
59    Storage(String),
60    /// Internal errors
61    Internal(String),
62}
63
64impl std::error::Error for StorageError {}
65
66impl std::fmt::Display for StorageError {
67    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
68        match self {
69            StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
70            StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
71        }
72    }
73}
74
75impl StorageError {
76    /// Converts a storage error to StorageError::Storage.
77    pub fn from_storage(e: impl std::fmt::Display) -> Self {
78        StorageError::Storage(e.to_string())
79    }
80}
81
82/// Result type alias for storage operations
83pub type StorageResult<T> = std::result::Result<T, StorageError>;
84
85/// Trait for merging existing values with new values.
86///
87/// Merge operators must be associative: `merge(merge(a, b), c) == merge(a, merge(b, c))`.
88/// This ensures consistent merging behavior regardless of the order of operations.
89pub trait MergeOperator: Send + Sync {
90    /// Merges an existing value with a new value to produce a merged result.
91    ///
92    /// # Arguments
93    /// * `key` - The key associated with the values being merged
94    /// * `existing_value` - The current value stored in the database (if any)
95    /// * `new_value` - The new value to merge with the existing value
96    ///
97    /// # Returns
98    /// The merged value.
99    fn merge(&self, key: &Bytes, existing_value: Option<Bytes>, new_value: Bytes) -> Bytes;
100}
101
102/// Iterator over storage records.
103#[async_trait]
104pub trait StorageIterator {
105    async fn next(&mut self) -> StorageResult<Option<Record>>;
106}
107
108/// Common read operations supported by both Storage and StorageSnapshot.
109///
110/// This trait provides the core read methods that are shared between full storage
111/// access and point-in-time snapshots. By extracting these common operations,
112/// we can write code that works with both storage types.
113#[async_trait]
114pub trait StorageRead: Send + Sync {
115    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
116
117    /// Returns an iterator over records in the given range.
118    ///
119    /// The returned iterator is owned and does not borrow from the storage,
120    /// allowing it to be stored in structs or passed across await points.
121    async fn scan_iter(
122        &self,
123        range: BytesRange,
124    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
125
126    /// Collects all records in the range into a Vec.
127    #[tracing::instrument(level = "trace", skip_all)]
128    async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
129        let mut iter = self.scan_iter(range).await?;
130        let mut records = Vec::new();
131        while let Some(record) = iter.next().await? {
132            records.push(record);
133        }
134        Ok(records)
135    }
136}
137
138/// A point-in-time snapshot of the storage layer.
139///
140/// Snapshots provide a consistent read-only view of the database at the time
141/// the snapshot was created. Reads from a snapshot will not see any subsequent
142/// writes to the underlying storage.
143#[async_trait]
144pub trait StorageSnapshot: StorageRead {}
145
146/// The storage type encapsulates access to the underlying storage (e.g. SlateDB).
147#[async_trait]
148pub trait Storage: StorageRead {
149    async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<()>;
150
151    async fn put(&self, records: Vec<Record>) -> StorageResult<()>;
152
153    /// Writes records to storage with custom options.
154    ///
155    /// This method allows control over durability behavior. Use this when you
156    /// need to specify whether to wait for writes to be durable.
157    ///
158    /// # Arguments
159    ///
160    /// * `records` - The records to write
161    /// * `options` - Write options controlling durability behavior
162    async fn put_with_options(
163        &self,
164        records: Vec<Record>,
165        options: WriteOptions,
166    ) -> StorageResult<()>;
167
168    /// Merges values for the given keys using the configured merge operator.
169    ///
170    /// This method requires the underlying storage engine to be configured with
171    /// a merge operator. If no merge operator is configured, this method will
172    /// return a `StorageError::Storage` error.
173    ///
174    /// The merge operation is atomic - all merges in the batch are applied
175    /// together or not at all.
176    async fn merge(&self, records: Vec<Record>) -> StorageResult<()>;
177
178    /// Creates a point-in-time snapshot of the storage.
179    ///
180    /// The snapshot provides a consistent read-only view of the database at the time
181    /// the snapshot was created. Reads from the snapshot will not see any subsequent
182    /// writes to the underlying storage.
183    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
184
185    /// Flushes all pending writes to durable storage.
186    ///
187    /// This ensures that all writes that have been acknowledged are persisted
188    /// to durable storage. For SlateDB, this flushes the memtable to the WAL
189    /// and object store.
190    async fn flush(&self) -> StorageResult<()>;
191
192    /// Closes the storage, releasing any resources.
193    ///
194    /// This method should be called before dropping the storage to ensure
195    /// proper cleanup. For SlateDB, this releases the database fence.
196    async fn close(&self) -> StorageResult<()>;
197}