Skip to main content

common/storage/
slate.rs

1use std::sync::Arc;
2
3use crate::{
4    BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult,
5    storage::{MergeOperator, RecordOp, Storage, StorageSnapshot, WriteOptions},
6};
7use async_trait::async_trait;
8use bytes::Bytes;
9use slatedb::config::ScanOptions;
10use slatedb::{
11    Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
12    MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
13};
14
15/// Adapter that wraps our `MergeOperator` trait to implement SlateDB's `MergeOperator` trait.
16///
17/// This allows using our common merge operator interface with SlateDB's merge functionality.
18pub struct SlateDbMergeOperatorAdapter {
19    operator: Arc<dyn MergeOperator>,
20}
21
22impl SlateDbMergeOperatorAdapter {
23    fn new(operator: Arc<dyn MergeOperator>) -> Self {
24        Self { operator }
25    }
26}
27
28impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
29    fn merge(
30        &self,
31        key: &Bytes,
32        existing_value: Option<Bytes>,
33        value: Bytes,
34    ) -> Result<Bytes, MergeOperatorError> {
35        Ok(self.operator.merge(key, existing_value, value))
36    }
37}
38
39/// Returns the default scan options used for storage scans.
40fn default_scan_options() -> ScanOptions {
41    ScanOptions {
42        durability_filter: Default::default(),
43        dirty: false,
44        read_ahead_bytes: 1024 * 1024,
45        cache_blocks: true,
46        max_fetch_tasks: 4,
47    }
48}
49
50/// SlateDB-backed implementation of the Storage trait.
51///
52/// SlateDB is an embedded key-value store built on object storage, providing
53/// LSM-tree semantics with cloud-native durability.
54pub struct SlateDbStorage {
55    pub(super) db: Arc<Db>,
56}
57
58impl SlateDbStorage {
59    /// Creates a new SlateDbStorage instance wrapping the given SlateDB database.
60    pub fn new(db: Arc<Db>) -> Self {
61        Self { db }
62    }
63
64    /// Creates a SlateDB `MergeOperator` from our common `MergeOperator` trait.
65    ///
66    /// This adapter can be used when constructing a SlateDB database with a merge operator:
67    /// ```rust,ignore
68    /// use common::storage::MergeOperator;
69    /// use slatedb::{DbBuilder, object_store::ObjectStore};
70    ///
71    /// let my_merge_op: Arc<dyn MergeOperator> = Arc::new(MyMergeOperator);
72    /// let slate_merge_op = SlateDbStorage::merge_operator_adapter(my_merge_op);
73    ///
74    /// let db = DbBuilder::new("path", object_store)
75    ///     .with_merge_operator(Arc::new(slate_merge_op))
76    ///     .build()
77    ///     .await?;
78    /// ```
79    pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
80        SlateDbMergeOperatorAdapter::new(operator)
81    }
82}
83
84#[async_trait]
85impl StorageRead for SlateDbStorage {
86    /// Retrieves a single record by key from SlateDB.
87    ///
88    /// Returns `None` if the key does not exist.
89    #[tracing::instrument(level = "trace", skip_all)]
90    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
91        let value = self
92            .db
93            .get(&key)
94            .await
95            .map_err(StorageError::from_storage)?;
96
97        match value {
98            Some(v) => Ok(Some(Record::new(key, v))),
99            None => Ok(None),
100        }
101    }
102
103    #[tracing::instrument(level = "trace", skip_all)]
104    async fn scan_iter(
105        &self,
106        range: BytesRange,
107    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
108        let iter = self
109            .db
110            .scan_with_options(range, &default_scan_options())
111            .await
112            .map_err(StorageError::from_storage)?;
113        Ok(Box::new(SlateDbIterator { iter }))
114    }
115}
116
117pub(super) struct SlateDbIterator {
118    iter: DbIterator,
119}
120
121#[async_trait]
122impl StorageIterator for SlateDbIterator {
123    #[tracing::instrument(level = "trace", skip_all)]
124    async fn next(&mut self) -> StorageResult<Option<Record>> {
125        match self.iter.next().await.map_err(StorageError::from_storage)? {
126            Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
127            None => Ok(None),
128        }
129    }
130}
131
132/// SlateDB snapshot wrapper that implements StorageSnapshot.
133///
134/// Provides a consistent read-only view of the database at the time the snapshot was created.
135pub struct SlateDbStorageSnapshot {
136    snapshot: Arc<DbSnapshot>,
137}
138
139#[async_trait]
140impl StorageRead for SlateDbStorageSnapshot {
141    #[tracing::instrument(level = "trace", skip_all)]
142    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
143        let value = self
144            .snapshot
145            .get(&key)
146            .await
147            .map_err(StorageError::from_storage)?;
148
149        match value {
150            Some(v) => Ok(Some(Record::new(key, v))),
151            None => Ok(None),
152        }
153    }
154
155    #[tracing::instrument(level = "trace", skip_all)]
156    async fn scan_iter(
157        &self,
158        range: BytesRange,
159    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
160        let iter = self
161            .snapshot
162            .scan_with_options(range, &default_scan_options())
163            .await
164            .map_err(StorageError::from_storage)?;
165        Ok(Box::new(SlateDbIterator { iter }))
166    }
167}
168
169#[async_trait]
170impl StorageSnapshot for SlateDbStorageSnapshot {}
171
172#[async_trait]
173impl Storage for SlateDbStorage {
174    async fn apply(&self, records: Vec<RecordOp>) -> StorageResult<()> {
175        let mut batch = WriteBatch::new();
176        for op in records {
177            match op {
178                RecordOp::Put(record) => batch.put(record.key, record.value),
179                RecordOp::Merge(record) => batch.merge(record.key, record.value),
180                RecordOp::Delete(key) => batch.delete(key),
181            }
182        }
183        self.db
184            .write(batch)
185            .await
186            .map_err(StorageError::from_storage)?;
187        Ok(())
188    }
189    async fn put(&self, records: Vec<Record>) -> StorageResult<()> {
190        self.put_with_options(records, WriteOptions::default())
191            .await
192    }
193
194    async fn put_with_options(
195        &self,
196        records: Vec<Record>,
197        options: WriteOptions,
198    ) -> StorageResult<()> {
199        let mut batch = WriteBatch::new();
200        for record in records {
201            batch.put(record.key, record.value);
202        }
203        let slate_options = SlateDbWriteOptions {
204            await_durable: options.await_durable,
205        };
206        self.db
207            .write_with_options(batch, &slate_options)
208            .await
209            .map_err(StorageError::from_storage)?;
210        Ok(())
211    }
212
213    /// Merges values for the given keys using SlateDB's merge operator.
214    ///
215    /// This method requires the database to be configured with a merge operator
216    /// during construction. If no merge operator is configured, this will return
217    /// a `StorageError::Storage` error.
218    async fn merge(&self, records: Vec<Record>) -> StorageResult<()> {
219        let mut batch = WriteBatch::new();
220        for record in records {
221            batch.merge(record.key, record.value);
222        }
223        self.db.write(batch).await.map_err(|e| {
224            let error_msg = e.to_string();
225            // Check if the error indicates merge operator is not configured
226            if error_msg.contains("merge operator") || error_msg.contains("not configured") {
227                StorageError::Storage("Merge operator not configured for this database".to_string())
228            } else {
229                StorageError::from_storage(e)
230            }
231        })?;
232        Ok(())
233    }
234
235    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
236        let snapshot = self
237            .db
238            .snapshot()
239            .await
240            .map_err(StorageError::from_storage)?;
241        Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
242    }
243
244    async fn flush(&self) -> StorageResult<()> {
245        self.db.flush().await.map_err(StorageError::from_storage)?;
246        Ok(())
247    }
248
249    async fn close(&self) -> StorageResult<()> {
250        self.db.close().await.map_err(StorageError::from_storage)?;
251        Ok(())
252    }
253}
254
255/// Read-only SlateDB storage using `DbReader`.
256///
257/// This struct provides read-only access to a SlateDB database without fencing,
258/// allowing multiple readers to coexist with a single writer.
259pub struct SlateDbStorageReader {
260    reader: Arc<DbReader>,
261}
262
263impl SlateDbStorageReader {
264    /// Creates a new SlateDbStorageReader wrapping the given DbReader.
265    pub fn new(reader: Arc<DbReader>) -> Self {
266        Self { reader }
267    }
268}
269
270#[async_trait]
271impl StorageRead for SlateDbStorageReader {
272    #[tracing::instrument(level = "trace", skip_all)]
273    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
274        let value = self
275            .reader
276            .get(&key)
277            .await
278            .map_err(StorageError::from_storage)?;
279
280        match value {
281            Some(v) => Ok(Some(Record::new(key, v))),
282            None => Ok(None),
283        }
284    }
285
286    #[tracing::instrument(level = "trace", skip_all)]
287    async fn scan_iter(
288        &self,
289        range: BytesRange,
290    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
291        let iter = self
292            .reader
293            .scan_with_options(range, &default_scan_options())
294            .await
295            .map_err(StorageError::from_storage)?;
296        Ok(Box::new(SlateDbIterator { iter }))
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303    use crate::BytesRange;
304    use slatedb::DbBuilder;
305    use slatedb::object_store::memory::InMemory;
306
307    #[tokio::test]
308    async fn should_read_data_written_by_storage_via_reader() {
309        let object_store = Arc::new(InMemory::new());
310        let path = "/test/db";
311
312        // Create writer and write data
313        let db = DbBuilder::new(path, object_store.clone())
314            .build()
315            .await
316            .unwrap();
317        let storage = SlateDbStorage::new(Arc::new(db));
318
319        storage
320            .put(vec![
321                Record::new(Bytes::from("key1"), Bytes::from("value1")),
322                Record::new(Bytes::from("key2"), Bytes::from("value2")),
323            ])
324            .await
325            .unwrap();
326        storage.flush().await.unwrap();
327
328        // Create reader and verify data
329        let reader = DbReader::open(path, object_store, None, Default::default())
330            .await
331            .unwrap();
332        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
333
334        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
335        assert!(record.is_some());
336        assert_eq!(record.unwrap().value, Bytes::from("value1"));
337
338        let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
339        assert!(record.is_some());
340        assert_eq!(record.unwrap().value, Bytes::from("value2"));
341
342        let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
343        assert!(record.is_none());
344
345        storage.close().await.unwrap();
346    }
347
348    #[tokio::test]
349    async fn should_scan_data_written_by_storage_via_reader() {
350        let object_store = Arc::new(InMemory::new());
351        let path = "/test/db";
352
353        // Create writer and write data
354        let db = DbBuilder::new(path, object_store.clone())
355            .build()
356            .await
357            .unwrap();
358        let storage = SlateDbStorage::new(Arc::new(db));
359
360        storage
361            .put(vec![
362                Record::new(Bytes::from("a"), Bytes::from("1")),
363                Record::new(Bytes::from("b"), Bytes::from("2")),
364                Record::new(Bytes::from("c"), Bytes::from("3")),
365            ])
366            .await
367            .unwrap();
368        storage.flush().await.unwrap();
369
370        // Create reader and scan data
371        let reader = DbReader::open(path, object_store, None, Default::default())
372            .await
373            .unwrap();
374        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
375
376        let mut iter = storage_reader
377            .scan_iter(BytesRange::unbounded())
378            .await
379            .unwrap();
380        let mut results = Vec::new();
381        while let Some(record) = iter.next().await.unwrap() {
382            results.push((record.key, record.value));
383        }
384
385        assert_eq!(results.len(), 3);
386        assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
387        assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
388        assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
389
390        storage.close().await.unwrap();
391    }
392
393    #[tokio::test]
394    async fn should_coexist_writer_and_reader_without_fencing_error() {
395        let object_store = Arc::new(InMemory::new());
396        let path = "/test/db";
397
398        // Create writer
399        let db = DbBuilder::new(path, object_store.clone())
400            .build()
401            .await
402            .unwrap();
403        let storage = SlateDbStorage::new(Arc::new(db));
404
405        // Write initial data
406        storage
407            .put(vec![Record::new(
408                Bytes::from("key1"),
409                Bytes::from("value1"),
410            )])
411            .await
412            .unwrap();
413        storage.flush().await.unwrap();
414
415        // Create reader while writer is still open - this should NOT cause fencing error
416        let reader = DbReader::open(path, object_store, None, Default::default())
417            .await
418            .unwrap();
419        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
420
421        // Reader can read the data
422        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
423        assert!(record.is_some());
424        assert_eq!(record.unwrap().value, Bytes::from("value1"));
425
426        // Writer can still write more data
427        storage
428            .put(vec![Record::new(
429                Bytes::from("key2"),
430                Bytes::from("value2"),
431            )])
432            .await
433            .unwrap();
434        storage.flush().await.unwrap();
435
436        storage.close().await.unwrap();
437    }
438}