common/storage/mod.rs
1pub mod config;
2pub mod factory;
3pub mod in_memory;
4pub mod loader;
5pub mod slate;
6pub mod util;
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use bytes::Bytes;
12
13use crate::BytesRange;
14
15#[derive(Clone, Debug)]
16pub struct Record {
17 pub key: Bytes,
18 pub value: Bytes,
19}
20
21impl Record {
22 pub fn new(key: Bytes, value: Bytes) -> Self {
23 Self { key, value }
24 }
25
26 pub fn empty(key: Bytes) -> Self {
27 Self::new(key, Bytes::new())
28 }
29}
30
31#[derive(Clone, Debug)]
32pub enum RecordOp {
33 Put(Record),
34 Merge(Record),
35 Delete(Bytes),
36}
37
38/// Options for write operations.
39///
40/// Controls the durability behavior of write operations like [`Storage::put`]
41/// and [`Storage::put_with_options`].
42#[derive(Debug, Clone, Default)]
43pub struct WriteOptions {
44 /// Whether to wait for the write to be durable before returning.
45 ///
46 /// When `true`, the operation will not return until the data has been
47 /// persisted to durable storage (e.g., flushed to the WAL and acknowledged
48 /// by the object store).
49 ///
50 /// When `false` (the default), the operation returns as soon as the data
51 /// is in memory, providing lower latency but risking data loss on crash.
52 pub await_durable: bool,
53}
54
55/// Error type for storage operations
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum StorageError {
58 /// Storage-related errors
59 Storage(String),
60 /// Internal errors
61 Internal(String),
62}
63
64impl std::error::Error for StorageError {}
65
66impl std::fmt::Display for StorageError {
67 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
68 match self {
69 StorageError::Storage(msg) => write!(f, "Storage error: {}", msg),
70 StorageError::Internal(msg) => write!(f, "Internal error: {}", msg),
71 }
72 }
73}
74
75impl StorageError {
76 /// Converts a storage error to StorageError::Storage.
77 pub fn from_storage(e: impl std::fmt::Display) -> Self {
78 StorageError::Storage(e.to_string())
79 }
80}
81
82/// Result type alias for storage operations
83pub type StorageResult<T> = std::result::Result<T, StorageError>;
84
85/// Trait for merging existing values with new values.
86///
87/// Merge operators must be associative: `merge(merge(a, b), c) == merge(a, merge(b, c))`.
88/// This ensures consistent merging behavior regardless of the order of operations.
89pub trait MergeOperator: Send + Sync {
90 /// Merges an existing value with a new value to produce a merged result.
91 ///
92 /// # Arguments
93 /// * `key` - The key associated with the values being merged
94 /// * `existing_value` - The current value stored in the database (if any)
95 /// * `new_value` - The new value to merge with the existing value
96 ///
97 /// # Returns
98 /// The merged value.
99 fn merge(&self, key: &Bytes, existing_value: Option<Bytes>, new_value: Bytes) -> Bytes;
100}
101
102/// Iterator over storage records.
103#[async_trait]
104pub trait StorageIterator {
105 async fn next(&mut self) -> StorageResult<Option<Record>>;
106}
107
108/// Common read operations supported by both Storage and StorageSnapshot.
109///
110/// This trait provides the core read methods that are shared between full storage
111/// access and point-in-time snapshots. By extracting these common operations,
112/// we can write code that works with both storage types.
113#[async_trait]
114pub trait StorageRead: Send + Sync {
115 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>>;
116
117 /// Returns an iterator over records in the given range.
118 ///
119 /// The returned iterator is owned and does not borrow from the storage,
120 /// allowing it to be stored in structs or passed across await points.
121 async fn scan_iter(
122 &self,
123 range: BytesRange,
124 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>>;
125
126 /// Collects all records in the range into a Vec.
127 #[tracing::instrument(level = "trace", skip_all)]
128 async fn scan(&self, range: BytesRange) -> StorageResult<Vec<Record>> {
129 let mut iter = self.scan_iter(range).await?;
130 let mut records = Vec::new();
131 while let Some(record) = iter.next().await? {
132 records.push(record);
133 }
134 Ok(records)
135 }
136}
137
138/// A point-in-time snapshot of the storage layer.
139///
140/// Snapshots provide a consistent read-only view of the database at the time
141/// the snapshot was created. Reads from a snapshot will not see any subsequent
142/// writes to the underlying storage.
143#[async_trait]
144pub trait StorageSnapshot: StorageRead {}
145
146/// The storage type encapsulates access to the underlying storage (e.g. SlateDB).
147#[async_trait]
148pub trait Storage: StorageRead {
149 async fn apply(&self, ops: Vec<RecordOp>) -> StorageResult<()>;
150
151 async fn put(&self, records: Vec<Record>) -> StorageResult<()>;
152
153 /// Writes records to storage with custom options.
154 ///
155 /// This method allows control over durability behavior. Use this when you
156 /// need to specify whether to wait for writes to be durable.
157 ///
158 /// # Arguments
159 ///
160 /// * `records` - The records to write
161 /// * `options` - Write options controlling durability behavior
162 async fn put_with_options(
163 &self,
164 records: Vec<Record>,
165 options: WriteOptions,
166 ) -> StorageResult<()>;
167
168 /// Merges values for the given keys using the configured merge operator.
169 ///
170 /// This method requires the underlying storage engine to be configured with
171 /// a merge operator. If no merge operator is configured, this method will
172 /// return a `StorageError::Storage` error.
173 ///
174 /// The merge operation is atomic - all merges in the batch are applied
175 /// together or not at all.
176 async fn merge(&self, records: Vec<Record>) -> StorageResult<()>;
177
178 /// Creates a point-in-time snapshot of the storage.
179 ///
180 /// The snapshot provides a consistent read-only view of the database at the time
181 /// the snapshot was created. Reads from the snapshot will not see any subsequent
182 /// writes to the underlying storage.
183 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>>;
184
185 /// Flushes all pending writes to durable storage.
186 ///
187 /// This ensures that all writes that have been acknowledged are persisted
188 /// to durable storage. For SlateDB, this flushes the memtable to the WAL
189 /// and object store.
190 async fn flush(&self) -> StorageResult<()>;
191
192 /// Closes the storage, releasing any resources.
193 ///
194 /// This method should be called before dropping the storage to ensure
195 /// proper cleanup. For SlateDB, this releases the database fence.
196 async fn close(&self) -> StorageResult<()>;
197}