common/storage/mod.rs
1pub mod config;
2pub mod factory;
3pub mod in_memory;
4pub mod loader;
5pub mod metrics_recorder;
6pub mod slate;
7pub mod sst_blocks;
8pub mod util;
9
10use std::sync::Arc;
11
12use async_trait::async_trait;
13use bytes::Bytes;
14use uuid::Uuid;
15
16use crate::BytesRange;
17
18/// Identifies a checkpoint of a storage backend at a point in time.
19///
20/// Checkpoints capture a manifest snapshot that a reader can later open
21/// against to get a consistent view of the database at the time the
22/// checkpoint was created.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub struct CheckpointInfo {
25 pub id: Uuid,
26 pub manifest_id: u64,
27}
28
29#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
30pub enum Ttl {
31 #[default]
32 Default,
33 NoExpiry,
34 ExpireAfter(u64),
35 ExpireAt(i64),
36}
37
38#[derive(Clone, Copy, Default, Debug, PartialEq, Eq)]
39pub struct PutOptions {
40 pub ttl: Ttl,
41}
42
43/// Encapsulates a record being put along with options specific to the put.
44#[derive(Clone, Debug)]
45pub struct PutRecordOp {
46 pub record: Record,
47 pub options: PutOptions,
48}
49
50impl PutRecordOp {
51 pub fn new(record: Record) -> Self {
52 Self {
53 record,
54 options: PutOptions::default(),
55 }
56 }
57
58 pub fn new_with_options(record: Record, options: PutOptions) -> Self {
59 Self { record, options }
60 }
61
62 pub fn with_options(self, options: PutOptions) -> Self {
63 Self {
64 record: self.record,
65 options,
66 }
67 }
68}
69
70/// Converts a Record to a PutRecordOp with default options
71impl From<Record> for PutRecordOp {
72 fn from(record: Record) -> Self {
73 Self::new(record)
74 }
75}
76
77#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
78pub struct MergeOptions {
79 pub ttl: Ttl,
80}
81
82/// Encapsulates a record written as part of a merge op along with options specific to the merge.
83#[derive(Clone, Debug)]
84pub struct MergeRecordOp {
85 pub record: Record,
86 pub options: MergeOptions,
87}
88
89impl MergeRecordOp {
90 pub fn new(record: Record) -> Self {
91 Self {
92 record,
93 options: MergeOptions::default(),
94 }
95 }
96
97 pub fn new_with_ttl(record: Record, options: MergeOptions) -> Self {
98 Self { record, options }
99 }
100}
101
102/// Converts a Record to a PutRecordOp with default options
103impl From<Record> for MergeRecordOp {
104 fn from(record: Record) -> Self {
105 Self::new(record)
106 }
107}
108
109#[derive(Clone, Debug)]
110pub struct Record {
111 pub key: Bytes,
112 pub value: Bytes,
113}
114
115impl Record {
116 pub fn new(key: Bytes, value: Bytes) -> Self {
117 Self { key, value }
118 }
119
120 pub fn empty(key: Bytes) -> Self {
121 Self::new(key, Bytes::new())
122 }
123}
124
125#[derive(Clone, Debug)]
126pub enum RecordOp {
127 Put(PutRecordOp),
128 Merge(MergeRecordOp),
129 Delete(Bytes),
130}
131
132/// Options for write operations.
133///
134/// Controls the durability behavior of write operations like [`Storage::put`]
135/// and [`Storage::put_with_options`].
136#[derive(Debug, Clone, Default)]
137pub struct WriteOptions {
138 /// Whether to wait for the write to be durable before returning.
139 ///
140 /// When `true`, the operation will not return until the data has been
141 /// persisted to durable storage (e.g., flushed to the WAL and acknowledged
142 /// by the object store).
143 ///
144 /// When `false` (the default), the operation returns as soon as the data
145 /// is in memory, providing lower latency but risking data loss on crash.
146 pub await_durable: bool,
147}
148
149/// Error type for storage operations
150#[derive(Debug, Clone, PartialEq, Eq)]
151pub enum StorageError {
152 /// Storage-related errors
153 Storage(String),
154 /// Internal errors
155 Internal(String),
156}
157
158impl std::error::Error for StorageError {}
159
160impl std::fmt::Display for StorageError {
161 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
162 match self {
163 StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
164 StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
165 }
166 }
167}
168
169impl StorageError {
170 /// Converts a storage error to StorageError::Storage.
171 pub fn from_storage(e: impl std::fmt::Display) -> Self {
172 StorageError::Storage(e.to_string())
173 }
174}
175
176/// Result type alias for storage operations
177pub type StorageResult<T> = std::result::Result<T, StorageError>;
178
179/// Result of a write operation, containing the sequence number assigned by the storage engine.
180#[derive(Clone, Copy, Debug, PartialEq, Eq)]
181pub struct WriteResult {
182 /// The sequence number assigned to this write by the underlying storage engine.
183 pub seqnum: u64,
184}
185
186/// Trait for merging existing values with new values.
187///
188/// Merge operators must be associative: `merge_batch(merge_batch(a, [b]), [c]) == merge_batch(a, merge_batch(b, [c]))`.
189/// This ensures consistent merging behavior regardless of the order of operations.
190pub trait MergeOperator: Send + Sync {
191 /// Merges a batch of operands with an optional existing value.
192 ///
193 /// # Arguments
194 /// * `key` - The key associated with the values being merged
195 /// * `existing_value` - The current value stored in the database (if any)
196 /// * `operands` - A slice of operands to merge, ordered from oldest to newest
197 fn merge_batch(&self, key: &Bytes, existing_value: Option<Bytes>, operands: &[Bytes]) -> Bytes;
198}
199
200pub fn default_merge_batch(
201 key: &Bytes,
202 existing_value: Option<Bytes>,
203 operands: &[Bytes],
204 merge: impl Fn(&Bytes, Option<Bytes>, Bytes) -> Bytes,
205) -> Bytes {
206 let mut result = existing_value;
207 for operand in operands {
208 result = Some(merge(key, result, operand.clone()));
209 }
210 result.expect("merge_batch called with no existing value and no operands")
211}
212
213/// Iterator over storage records.
214#[async_trait]
215pub trait StorageIterator {
216 /// Returns the next record from this iterator.
217 ///
218 /// Returns `Ok(None)` when the iterator is exhausted.
219 async fn next(&mut self) -> StorageResult<Option<Record>>;
220}
221
222/// Common read operations supported by both Storage and StorageSnapshot.
223///
224/// This trait provides the core read methods that are shared between full storage
225/// access and point-in-time snapshots. By extracting these common operations,
226/// we can write code that works with both storage types.
227#[async_trait]
228pub trait StorageRead: Send + Sync {
229 /// Retrieves a single record by exact key.
230 ///
231 /// Returns `Ok(None)` if the key is not present.
232 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
233
234 /// Returns an iterator over records in the given range.
235 ///
236 /// The returned iterator is owned and does not borrow from the storage,
237 /// allowing it to be stored in structs or passed across await points.
238 async fn scan_iter(
239 &self,
240 range: BytesRange,
241 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
242
243 /// Collects all records in the range into a Vec.
244 #[tracing::instrument(level = "trace", skip_all)]
245 async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
246 let mut iter = self.scan_iter(range).await?;
247 let mut records = Vec::new();
248 while let Some(record) = iter.next().await? {
249 records.push(record);
250 }
251 Ok(records)
252 }
253
254 /// Closes the storage, releasing any resources.
255 ///
256 /// This method should be called before dropping the storage to ensure
257 /// proper cleanup. For SlateDB, this releases the database fence.
258 ///
259 /// # Errors
260 ///
261 /// Returns an error if resource shutdown fails.
262 async fn close(&self) -> StorageResult<()> {
263 Ok(())
264 }
265}
266
267/// A point-in-time snapshot of the storage layer.
268///
269/// Snapshots provide a consistent read-only view of the database at the time
270/// the snapshot was created. Reads from a snapshot will not see any subsequent
271/// writes to the underlying storage.
272#[async_trait]
273pub trait StorageSnapshot: StorageRead {}
274
275/// The storage type encapsulates access to the underlying storage (e.g. SlateDB).
276#[async_trait]
277pub trait Storage: StorageRead {
278 /// Applies a batch of mixed operations (puts, merges, and deletes) atomically.
279 ///
280 /// All operations in the batch are written together in a single `WriteBatch`,
281 /// so either all succeed or none are visible. This is the primary write method
282 /// used by flushers that produce a mix of operation types from a frozen delta.
283 ///
284 /// Uses `WriteOptions::default()` (`await_durable: false`).
285 async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<WriteResult> {
286 self.apply_with_options(ops, WriteOptions::default()).await
287 }
288
289 /// Applies a batch of mixed operations with custom write options.
290 ///
291 /// `options.await_durable` controls whether this call returns only after
292 /// the batch reaches durable storage.
293 ///
294 /// # Errors
295 ///
296 /// Returns an error if the batch cannot be applied.
297 async fn apply_with_options(
298 &self,
299 ops: Vec<RecordOp>,
300 options: WriteOptions,
301 ) -> StorageResult<WriteResult>;
302
303 /// Writes records to storage.
304 ///
305 /// Uses `WriteOptions::default()` (`await_durable: false`).
306 async fn put(&self, records: Vec<PutRecordOp>) -> StorageResult<WriteResult> {
307 self.put_with_options(records, WriteOptions::default())
308 .await
309 }
310
311 /// Writes records to storage with custom options.
312 ///
313 /// This method allows control over durability behavior. Use this when you
314 /// need to specify whether to wait for writes to be durable.
315 ///
316 /// # Arguments
317 ///
318 /// * `records` - The records to write
319 /// * `options` - Write options controlling durability behavior
320 ///
321 /// # Errors
322 ///
323 /// Returns an error if any write in the batch fails.
324 async fn put_with_options(
325 &self,
326 records: Vec<PutRecordOp>,
327 options: WriteOptions,
328 ) -> StorageResult<WriteResult>;
329
330 /// Merges values for the given keys using the configured merge operator.
331 ///
332 /// This method requires the underlying storage engine to be configured with
333 /// a merge operator. If no merge operator is configured, this method will
334 /// return a `StorageError::Storage` error.
335 ///
336 /// The merge operation is atomic - all merges in the batch are applied
337 /// together or not at all.
338 ///
339 /// Uses `WriteOptions::default()` (`await_durable: false`).
340 async fn merge(&self, records: Vec<MergeRecordOp>) -> StorageResult<WriteResult> {
341 self.merge_with_options(records, WriteOptions::default())
342 .await
343 }
344
345 /// Merges values with custom write options.
346 ///
347 /// `options.await_durable` controls whether this call returns only after
348 /// the batch reaches durable storage.
349 ///
350 /// # Errors
351 ///
352 /// Returns an error if no merge operator is configured or if the merge
353 /// operation fails.
354 async fn merge_with_options(
355 &self,
356 records: Vec<MergeRecordOp>,
357 options: WriteOptions,
358 ) -> StorageResult<WriteResult>;
359
360 /// Creates a point-in-time snapshot of the storage.
361 ///
362 /// The snapshot provides a consistent read-only view of the database at the time
363 /// the snapshot was created. Reads from the snapshot will not see any subsequent
364 /// writes to the underlying storage.
365 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
366
367 /// Flushes all pending writes to durable storage.
368 ///
369 /// This ensures that all writes that have been acknowledged are persisted
370 /// to durable storage. For SlateDB, this flushes the memtable to the WAL
371 /// and object store.
372 ///
373 /// # Errors
374 ///
375 /// Returns an error if durability cannot be established.
376 async fn flush(&self) -> StorageResult<()>;
377
378 /// Subscribes to the durable sequence watermark.
379 ///
380 /// Returns a `watch::Receiver<u64>` that tracks the highest sequence number
381 /// confirmed durable by the storage engine. For SlateDB, this maps to
382 /// `DbStatus::durable_seq`. For in-memory storage, writes are immediately
383 /// "durable" so the watermark matches the latest written seqnum.
384 fn subscribe_durable(&self) -> tokio::sync::watch::Receiver<u64>;
385
386 /// Creates a durable checkpoint of the current state.
387 ///
388 /// The returned [`CheckpointInfo::id`] can later be passed to a
389 /// `StorageReaderRuntime` to open a reader pinned to this exact view of
390 /// the database. For SlateDB, this maps to `Db::create_checkpoint` with
391 /// `CheckpointScope::All` (flushes WALs and freezes the memtable so the
392 /// checkpoint includes recent writes).
393 ///
394 /// # Errors
395 ///
396 /// Returns an error if the underlying backend does not support
397 /// checkpoints, or if the checkpoint cannot be persisted.
398 async fn create_checkpoint(&self) -> StorageResult<CheckpointInfo>;
399}