common/storage/mod.rs
1pub mod config;
2pub mod factory;
3pub mod in_memory;
4pub mod loader;
5pub mod metrics_recorder;
6pub mod slate;
7pub mod util;
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use bytes::Bytes;
13
14use crate::BytesRange;
15
16#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
17pub enum Ttl {
18 #[default]
19 Default,
20 NoExpiry,
21 ExpireAfter(u64),
22}
23
24#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
25pub struct PutOptions {
26 pub ttl: Ttl,
27}
28
29/// Encapsulates a record being put along with options specific to the put.
30#[derive(Clone, Debug)]
31pub struct PutRecordOp {
32 pub record: Record,
33 pub options: PutOptions,
34}
35
36impl PutRecordOp {
37 pub fn new(record: Record) -> Self {
38 Self {
39 record,
40 options: PutOptions::default(),
41 }
42 }
43
44 pub fn new_with_options(record: Record, options: PutOptions) -> Self {
45 Self { record, options }
46 }
47
48 pub fn with_options(self, options: PutOptions) -> Self {
49 Self {
50 record: self.record,
51 options,
52 }
53 }
54}
55
56/// Converts a Record to a PutRecordOp with default options
57impl From<Record> for PutRecordOp {
58 fn from(record: Record) -> Self {
59 Self::new(record)
60 }
61}
62
63#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
64pub struct MergeOptions {
65 pub ttl: Ttl,
66}
67
68/// Encapsulates a record written as part of a merge op along with options specific to the merge.
69#[derive(Clone, Debug)]
70pub struct MergeRecordOp {
71 pub record: Record,
72 pub options: MergeOptions,
73}
74
75impl MergeRecordOp {
76 pub fn new(record: Record) -> Self {
77 Self {
78 record,
79 options: MergeOptions::default(),
80 }
81 }
82
83 pub fn new_with_ttl(record: Record, options: MergeOptions) -> Self {
84 Self { record, options }
85 }
86}
87
88/// Converts a Record to a PutRecordOp with default options
89impl From<Record> for MergeRecordOp {
90 fn from(record: Record) -> Self {
91 Self::new(record)
92 }
93}
94
95#[derive(Clone, Debug)]
96pub struct Record {
97 pub key: Bytes,
98 pub value: Bytes,
99}
100
101impl Record {
102 pub fn new(key: Bytes, value: Bytes) -> Self {
103 Self { key, value }
104 }
105
106 pub fn empty(key: Bytes) -> Self {
107 Self::new(key, Bytes::new())
108 }
109}
110
111#[derive(Clone, Debug)]
112pub enum RecordOp {
113 Put(PutRecordOp),
114 Merge(MergeRecordOp),
115 Delete(Bytes),
116}
117
118/// Options for write operations.
119///
120/// Controls the durability behavior of write operations like [`Storage::put`]
121/// and [`Storage::put_with_options`].
122#[derive(Debug, Clone, Default)]
123pub struct WriteOptions {
124 /// Whether to wait for the write to be durable before returning.
125 ///
126 /// When `true`, the operation will not return until the data has been
127 /// persisted to durable storage (e.g., flushed to the WAL and acknowledged
128 /// by the object store).
129 ///
130 /// When `false` (the default), the operation returns as soon as the data
131 /// is in memory, providing lower latency but risking data loss on crash.
132 pub await_durable: bool,
133}
134
135/// Error type for storage operations
136#[derive(Debug, Clone, PartialEq, Eq)]
137pub enum StorageError {
138 /// Storage-related errors
139 Storage(String),
140 /// Internal errors
141 Internal(String),
142}
143
144impl std::error::Error for StorageError {}
145
146impl std::fmt::Display for StorageError {
147 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
148 match self {
149 StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
150 StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
151 }
152 }
153}
154
155impl StorageError {
156 /// Converts a storage error to StorageError::Storage.
157 pub fn from_storage(e: impl std::fmt::Display) -> Self {
158 StorageError::Storage(e.to_string())
159 }
160}
161
162/// Result type alias for storage operations
163pub type StorageResult<T> = std::result::Result<T, StorageError>;
164
165/// Result of a write operation, containing the sequence number assigned by the storage engine.
166#[derive(Clone, Copy, Debug, PartialEq, Eq)]
167pub struct WriteResult {
168 /// The sequence number assigned to this write by the underlying storage engine.
169 pub seqnum: u64,
170}
171
172/// Trait for merging existing values with new values.
173///
174/// Merge operators must be associative: `merge_batch(merge_batch(a, [b]), [c]) == merge_batch(a, merge_batch(b, [c]))`.
175/// This ensures consistent merging behavior regardless of the order of operations.
176pub trait MergeOperator: Send + Sync {
177 /// Merges a batch of operands with an optional existing value.
178 ///
179 /// # Arguments
180 /// * `key` - The key associated with the values being merged
181 /// * `existing_value` - The current value stored in the database (if any)
182 /// * `operands` - A slice of operands to merge, ordered from oldest to newest
183 fn merge_batch(&self, key: &Bytes, existing_value: Option<Bytes>, operands: &[Bytes]) -> Bytes;
184}
185
186pub fn default_merge_batch(
187 key: &Bytes,
188 existing_value: Option<Bytes>,
189 operands: &[Bytes],
190 merge: impl Fn(&Bytes, Option<Bytes>, Bytes) -> Bytes,
191) -> Bytes {
192 let mut result = existing_value;
193 for operand in operands {
194 result = Some(merge(key, result, operand.clone()));
195 }
196 result.expect("merge_batch called with no existing value and no operands")
197}
198
199/// Iterator over storage records.
200#[async_trait]
201pub trait StorageIterator {
202 /// Returns the next record from this iterator.
203 ///
204 /// Returns `Ok(None)` when the iterator is exhausted.
205 async fn next(&mut self) -> StorageResult<Option<Record>>;
206}
207
208/// Common read operations supported by both Storage and StorageSnapshot.
209///
210/// This trait provides the core read methods that are shared between full storage
211/// access and point-in-time snapshots. By extracting these common operations,
212/// we can write code that works with both storage types.
213#[async_trait]
214pub trait StorageRead: Send + Sync {
215 /// Retrieves a single record by exact key.
216 ///
217 /// Returns `Ok(None)` if the key is not present.
218 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
219
220 /// Returns an iterator over records in the given range.
221 ///
222 /// The returned iterator is owned and does not borrow from the storage,
223 /// allowing it to be stored in structs or passed across await points.
224 async fn scan_iter(
225 &self,
226 range: BytesRange,
227 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
228
229 /// Collects all records in the range into a Vec.
230 #[tracing::instrument(level = "trace", skip_all)]
231 async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
232 let mut iter = self.scan_iter(range).await?;
233 let mut records = Vec::new();
234 while let Some(record) = iter.next().await? {
235 records.push(record);
236 }
237 Ok(records)
238 }
239
240 /// Closes the storage, releasing any resources.
241 ///
242 /// This method should be called before dropping the storage to ensure
243 /// proper cleanup. For SlateDB, this releases the database fence.
244 ///
245 /// # Errors
246 ///
247 /// Returns an error if resource shutdown fails.
248 async fn close(&self) -> StorageResult<()> {
249 Ok(())
250 }
251}
252
253/// A point-in-time snapshot of the storage layer.
254///
255/// Snapshots provide a consistent read-only view of the database at the time
256/// the snapshot was created. Reads from a snapshot will not see any subsequent
257/// writes to the underlying storage.
258#[async_trait]
259pub trait StorageSnapshot: StorageRead {}
260
261/// The storage type encapsulates access to the underlying storage (e.g. SlateDB).
262#[async_trait]
263pub trait Storage: StorageRead {
264 /// Applies a batch of mixed operations (puts, merges, and deletes) atomically.
265 ///
266 /// All operations in the batch are written together in a single `WriteBatch`,
267 /// so either all succeed or none are visible. This is the primary write method
268 /// used by flushers that produce a mix of operation types from a frozen delta.
269 ///
270 /// Uses `WriteOptions::default()` (`await_durable: false`).
271 async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<WriteResult> {
272 self.apply_with_options(ops, WriteOptions::default()).await
273 }
274
275 /// Applies a batch of mixed operations with custom write options.
276 ///
277 /// `options.await_durable` controls whether this call returns only after
278 /// the batch reaches durable storage.
279 ///
280 /// # Errors
281 ///
282 /// Returns an error if the batch cannot be applied.
283 async fn apply_with_options(
284 &self,
285 ops: Vec<RecordOp>,
286 options: WriteOptions,
287 ) -> StorageResult<WriteResult>;
288
289 /// Writes records to storage.
290 ///
291 /// Uses `WriteOptions::default()` (`await_durable: false`).
292 async fn put(&self, records: Vec<PutRecordOp>) -> StorageResult<WriteResult> {
293 self.put_with_options(records, WriteOptions::default())
294 .await
295 }
296
297 /// Writes records to storage with custom options.
298 ///
299 /// This method allows control over durability behavior. Use this when you
300 /// need to specify whether to wait for writes to be durable.
301 ///
302 /// # Arguments
303 ///
304 /// * `records` - The records to write
305 /// * `options` - Write options controlling durability behavior
306 ///
307 /// # Errors
308 ///
309 /// Returns an error if any write in the batch fails.
310 async fn put_with_options(
311 &self,
312 records: Vec<PutRecordOp>,
313 options: WriteOptions,
314 ) -> StorageResult<WriteResult>;
315
316 /// Merges values for the given keys using the configured merge operator.
317 ///
318 /// This method requires the underlying storage engine to be configured with
319 /// a merge operator. If no merge operator is configured, this method will
320 /// return a `StorageError::Storage` error.
321 ///
322 /// The merge operation is atomic - all merges in the batch are applied
323 /// together or not at all.
324 ///
325 /// Uses `WriteOptions::default()` (`await_durable: false`).
326 async fn merge(&self, records: Vec<MergeRecordOp>) -> StorageResult<WriteResult> {
327 self.merge_with_options(records, WriteOptions::default())
328 .await
329 }
330
331 /// Merges values with custom write options.
332 ///
333 /// `options.await_durable` controls whether this call returns only after
334 /// the batch reaches durable storage.
335 ///
336 /// # Errors
337 ///
338 /// Returns an error if no merge operator is configured or if the merge
339 /// operation fails.
340 async fn merge_with_options(
341 &self,
342 records: Vec<MergeRecordOp>,
343 options: WriteOptions,
344 ) -> StorageResult<WriteResult>;
345
346 /// Creates a point-in-time snapshot of the storage.
347 ///
348 /// The snapshot provides a consistent read-only view of the database at the time
349 /// the snapshot was created. Reads from the snapshot will not see any subsequent
350 /// writes to the underlying storage.
351 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
352
353 /// Flushes all pending writes to durable storage.
354 ///
355 /// This ensures that all writes that have been acknowledged are persisted
356 /// to durable storage. For SlateDB, this flushes the memtable to the WAL
357 /// and object store.
358 ///
359 /// # Errors
360 ///
361 /// Returns an error if durability cannot be established.
362 async fn flush(&self) -> StorageResult<()>;
363
364 /// Subscribes to the durable sequence watermark.
365 ///
366 /// Returns a `watch::Receiver<u64>` that tracks the highest sequence number
367 /// confirmed durable by the storage engine. For SlateDB, this maps to
368 /// `DbStatus::durable_seq`. For in-memory storage, writes are immediately
369 /// "durable" so the watermark matches the latest written seqnum.
370 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64>;
371}