common/storage/mod.rs
1pub mod config;
2pub mod factory;
3pub mod in_memory;
4pub mod loader;
5pub mod metrics_recorder;
6pub mod slate;
7pub mod util;
8
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use uuid::Uuid;
14
15use crate::BytesRange;
16
17/// Identifies a checkpoint of a storage backend at a point in time.
18///
19/// Checkpoints capture a manifest snapshot that a reader can later open
20/// against to get a consistent view of the database at the time the
21/// checkpoint was created.
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
23pub struct CheckpointInfo {
24 pub id: Uuid,
25 pub manifest_id: u64,
26}
27
28#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
29pub enum Ttl {
30 #[default]
31 Default,
32 NoExpiry,
33 ExpireAfter(u64),
34 ExpireAt(i64),
35}
36
37#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
38pub struct PutOptions {
39 pub ttl: Ttl,
40}
41
42/// Encapsulates a record being put along with options specific to the put.
43#[derive(Clone, Debug)]
44pub struct PutRecordOp {
45 pub record: Record,
46 pub options: PutOptions,
47}
48
49impl PutRecordOp {
50 pub fn new(record: Record) -> Self {
51 Self {
52 record,
53 options: PutOptions::default(),
54 }
55 }
56
57 pub fn new_with_options(record: Record, options: PutOptions) -> Self {
58 Self { record, options }
59 }
60
61 pub fn with_options(self, options: PutOptions) -> Self {
62 Self {
63 record: self.record,
64 options,
65 }
66 }
67}
68
69/// Converts a Record to a PutRecordOp with default options
70impl From<Record> for PutRecordOp {
71 fn from(record: Record) -> Self {
72 Self::new(record)
73 }
74}
75
76#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
77pub struct MergeOptions {
78 pub ttl: Ttl,
79}
80
81/// Encapsulates a record written as part of a merge op along with options specific to the merge.
82#[derive(Clone, Debug)]
83pub struct MergeRecordOp {
84 pub record: Record,
85 pub options: MergeOptions,
86}
87
88impl MergeRecordOp {
89 pub fn new(record: Record) -> Self {
90 Self {
91 record,
92 options: MergeOptions::default(),
93 }
94 }
95
96 pub fn new_with_ttl(record: Record, options: MergeOptions) -> Self {
97 Self { record, options }
98 }
99}
100
101/// Converts a Record to a PutRecordOp with default options
102impl From<Record> for MergeRecordOp {
103 fn from(record: Record) -> Self {
104 Self::new(record)
105 }
106}
107
108#[derive(Clone, Debug)]
109pub struct Record {
110 pub key: Bytes,
111 pub value: Bytes,
112}
113
114impl Record {
115 pub fn new(key: Bytes, value: Bytes) -> Self {
116 Self { key, value }
117 }
118
119 pub fn empty(key: Bytes) -> Self {
120 Self::new(key, Bytes::new())
121 }
122}
123
124#[derive(Clone, Debug)]
125pub enum RecordOp {
126 Put(PutRecordOp),
127 Merge(MergeRecordOp),
128 Delete(Bytes),
129}
130
131/// Options for write operations.
132///
133/// Controls the durability behavior of write operations like [`Storage::put`]
134/// and [`Storage::put_with_options`].
135#[derive(Debug, Clone, Default)]
136pub struct WriteOptions {
137 /// Whether to wait for the write to be durable before returning.
138 ///
139 /// When `true`, the operation will not return until the data has been
140 /// persisted to durable storage (e.g., flushed to the WAL and acknowledged
141 /// by the object store).
142 ///
143 /// When `false` (the default), the operation returns as soon as the data
144 /// is in memory, providing lower latency but risking data loss on crash.
145 pub await_durable: bool,
146}
147
148/// Error type for storage operations
149#[derive(Debug, Clone, PartialEq, Eq)]
150pub enum StorageError {
151 /// Storage-related errors
152 Storage(String),
153 /// Internal errors
154 Internal(String),
155}
156
157impl std::error::Error for StorageError {}
158
159impl std::fmt::Display for StorageError {
160 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
161 match self {
162 StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
163 StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
164 }
165 }
166}
167
168impl StorageError {
169 /// Converts a storage error to StorageError::Storage.
170 pub fn from_storage(e: impl std::fmt::Display) -> Self {
171 StorageError::Storage(e.to_string())
172 }
173}
174
175/// Result type alias for storage operations
176pub type StorageResult<T> = std::result::Result<T, StorageError>;
177
178/// Result of a write operation, containing the sequence number assigned by the storage engine.
179#[derive(Clone, Copy, Debug, PartialEq, Eq)]
180pub struct WriteResult {
181 /// The sequence number assigned to this write by the underlying storage engine.
182 pub seqnum: u64,
183}
184
185/// Trait for merging existing values with new values.
186///
187/// Merge operators must be associative: `merge_batch(merge_batch(a, [b]), [c]) == merge_batch(a, merge_batch(b, [c]))`.
188/// This ensures consistent merging behavior regardless of the order of operations.
189pub trait MergeOperator: Send + Sync {
190 /// Merges a batch of operands with an optional existing value.
191 ///
192 /// # Arguments
193 /// * `key` - The key associated with the values being merged
194 /// * `existing_value` - The current value stored in the database (if any)
195 /// * `operands` - A slice of operands to merge, ordered from oldest to newest
196 fn merge_batch(&self, key: &Bytes, existing_value: Option<Bytes>, operands: &[Bytes]) -> Bytes;
197}
198
199pub fn default_merge_batch(
200 key: &Bytes,
201 existing_value: Option<Bytes>,
202 operands: &[Bytes],
203 merge: impl Fn(&Bytes, Option<Bytes>, Bytes) -> Bytes,
204) -> Bytes {
205 let mut result = existing_value;
206 for operand in operands {
207 result = Some(merge(key, result, operand.clone()));
208 }
209 result.expect("merge_batch called with no existing value and no operands")
210}
211
212/// Iterator over storage records.
213#[async_trait]
214pub trait StorageIterator {
215 /// Returns the next record from this iterator.
216 ///
217 /// Returns `Ok(None)` when the iterator is exhausted.
218 async fn next(&mut self) -> StorageResult<Option<Record>>;
219}
220
221/// Common read operations supported by both Storage and StorageSnapshot.
222///
223/// This trait provides the core read methods that are shared between full storage
224/// access and point-in-time snapshots. By extracting these common operations,
225/// we can write code that works with both storage types.
226#[async_trait]
227pub trait StorageRead: Send + Sync {
228 /// Retrieves a single record by exact key.
229 ///
230 /// Returns `Ok(None)` if the key is not present.
231 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
232
233 /// Returns an iterator over records in the given range.
234 ///
235 /// The returned iterator is owned and does not borrow from the storage,
236 /// allowing it to be stored in structs or passed across await points.
237 async fn scan_iter(
238 &self,
239 range: BytesRange,
240 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
241
242 /// Collects all records in the range into a Vec.
243 #[tracing::instrument(level = "trace", skip_all)]
244 async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
245 let mut iter = self.scan_iter(range).await?;
246 let mut records = Vec::new();
247 while let Some(record) = iter.next().await? {
248 records.push(record);
249 }
250 Ok(records)
251 }
252
253 /// Closes the storage, releasing any resources.
254 ///
255 /// This method should be called before dropping the storage to ensure
256 /// proper cleanup. For SlateDB, this releases the database fence.
257 ///
258 /// # Errors
259 ///
260 /// Returns an error if resource shutdown fails.
261 async fn close(&self) -> StorageResult<()> {
262 Ok(())
263 }
264}
265
266/// A point-in-time snapshot of the storage layer.
267///
268/// Snapshots provide a consistent read-only view of the database at the time
269/// the snapshot was created. Reads from a snapshot will not see any subsequent
270/// writes to the underlying storage.
271#[async_trait]
272pub trait StorageSnapshot: StorageRead {}
273
274/// The storage type encapsulates access to the underlying storage (e.g. SlateDB).
275#[async_trait]
276pub trait Storage: StorageRead {
277 /// Applies a batch of mixed operations (puts, merges, and deletes) atomically.
278 ///
279 /// All operations in the batch are written together in a single `WriteBatch`,
280 /// so either all succeed or none are visible. This is the primary write method
281 /// used by flushers that produce a mix of operation types from a frozen delta.
282 ///
283 /// Uses `WriteOptions::default()` (`await_durable: false`).
284 async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<WriteResult> {
285 self.apply_with_options(ops, WriteOptions::default()).await
286 }
287
288 /// Applies a batch of mixed operations with custom write options.
289 ///
290 /// `options.await_durable` controls whether this call returns only after
291 /// the batch reaches durable storage.
292 ///
293 /// # Errors
294 ///
295 /// Returns an error if the batch cannot be applied.
296 async fn apply_with_options(
297 &self,
298 ops: Vec<RecordOp>,
299 options: WriteOptions,
300 ) -> StorageResult<WriteResult>;
301
302 /// Writes records to storage.
303 ///
304 /// Uses `WriteOptions::default()` (`await_durable: false`).
305 async fn put(&self, records: Vec<PutRecordOp>) -> StorageResult<WriteResult> {
306 self.put_with_options(records, WriteOptions::default())
307 .await
308 }
309
310 /// Writes records to storage with custom options.
311 ///
312 /// This method allows control over durability behavior. Use this when you
313 /// need to specify whether to wait for writes to be durable.
314 ///
315 /// # Arguments
316 ///
317 /// * `records` - The records to write
318 /// * `options` - Write options controlling durability behavior
319 ///
320 /// # Errors
321 ///
322 /// Returns an error if any write in the batch fails.
323 async fn put_with_options(
324 &self,
325 records: Vec<PutRecordOp>,
326 options: WriteOptions,
327 ) -> StorageResult<WriteResult>;
328
329 /// Merges values for the given keys using the configured merge operator.
330 ///
331 /// This method requires the underlying storage engine to be configured with
332 /// a merge operator. If no merge operator is configured, this method will
333 /// return a `StorageError::Storage` error.
334 ///
335 /// The merge operation is atomic - all merges in the batch are applied
336 /// together or not at all.
337 ///
338 /// Uses `WriteOptions::default()` (`await_durable: false`).
339 async fn merge(&self, records: Vec<MergeRecordOp>) -> StorageResult<WriteResult> {
340 self.merge_with_options(records, WriteOptions::default())
341 .await
342 }
343
344 /// Merges values with custom write options.
345 ///
346 /// `options.await_durable` controls whether this call returns only after
347 /// the batch reaches durable storage.
348 ///
349 /// # Errors
350 ///
351 /// Returns an error if no merge operator is configured or if the merge
352 /// operation fails.
353 async fn merge_with_options(
354 &self,
355 records: Vec<MergeRecordOp>,
356 options: WriteOptions,
357 ) -> StorageResult<WriteResult>;
358
359 /// Creates a point-in-time snapshot of the storage.
360 ///
361 /// The snapshot provides a consistent read-only view of the database at the time
362 /// the snapshot was created. Reads from the snapshot will not see any subsequent
363 /// writes to the underlying storage.
364 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
365
366 /// Flushes all pending writes to durable storage.
367 ///
368 /// This ensures that all writes that have been acknowledged are persisted
369 /// to durable storage. For SlateDB, this flushes the memtable to the WAL
370 /// and object store.
371 ///
372 /// # Errors
373 ///
374 /// Returns an error if durability cannot be established.
375 async fn flush(&self) -> StorageResult<()>;
376
377 /// Subscribes to the durable sequence watermark.
378 ///
379 /// Returns a `watch::Receiver<u64>` that tracks the highest sequence number
380 /// confirmed durable by the storage engine. For SlateDB, this maps to
381 /// `DbStatus::durable_seq`. For in-memory storage, writes are immediately
382 /// "durable" so the watermark matches the latest written seqnum.
383 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64>;
384
385 /// Creates a durable checkpoint of the current state.
386 ///
387 /// The returned [`CheckpointInfo::id`] can later be passed to a
388 /// `StorageReaderRuntime` to open a reader pinned to this exact view of
389 /// the database. For SlateDB, this maps to `Db::create_checkpoint` with
390 /// `CheckpointScope::All` (flushes WALs and freezes the memtable so the
391 /// checkpoint includes recent writes).
392 ///
393 /// # Errors
394 ///
395 /// Returns an error if the underlying backend does not support
396 /// checkpoints, or if the checkpoint cannot be persisted.
397 async fn create_checkpoint(&self) -> StorageResult<CheckpointInfo>;
398}