1use std::sync::Arc;
2
3use crate::{
4 BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult,
5 storage::{MergeOperator, RecordOp, Storage, StorageSnapshot, WriteOptions},
6};
7use async_trait::async_trait;
8use bytes::Bytes;
9use slatedb::config::ScanOptions;
10use slatedb::{
11 Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
12 MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
13};
14
15pub struct SlateDbMergeOperatorAdapter {
19 operator: Arc<dyn MergeOperator>,
20}
21
22impl SlateDbMergeOperatorAdapter {
23 fn new(operator: Arc<dyn MergeOperator>) -> Self {
24 Self { operator }
25 }
26}
27
28impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
29 fn merge(
30 &self,
31 key: &Bytes,
32 existing_value: Option<Bytes>,
33 value: Bytes,
34 ) -> Result<Bytes, MergeOperatorError> {
35 Ok(self.operator.merge(key, existing_value, value))
36 }
37}
38
39fn default_scan_options() -> ScanOptions {
41 ScanOptions {
42 durability_filter: Default::default(),
43 dirty: false,
44 read_ahead_bytes: 1024 * 1024,
45 cache_blocks: true,
46 max_fetch_tasks: 4,
47 }
48}
49
50pub struct SlateDbStorage {
55 pub(super) db: Arc<Db>,
56}
57
58impl SlateDbStorage {
59 pub fn new(db: Arc<Db>) -> Self {
61 Self { db }
62 }
63
64 pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
80 SlateDbMergeOperatorAdapter::new(operator)
81 }
82}
83
84#[async_trait]
85impl StorageRead for SlateDbStorage {
86 #[tracing::instrument(level = "trace", skip_all)]
90 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
91 let value = self
92 .db
93 .get(&key)
94 .await
95 .map_err(StorageError::from_storage)?;
96
97 match value {
98 Some(v) => Ok(Some(Record::new(key, v))),
99 None => Ok(None),
100 }
101 }
102
103 #[tracing::instrument(level = "trace", skip_all)]
104 async fn scan_iter(
105 &self,
106 range: BytesRange,
107 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
108 let iter = self
109 .db
110 .scan_with_options(range, &default_scan_options())
111 .await
112 .map_err(StorageError::from_storage)?;
113 Ok(Box::new(SlateDbIterator { iter }))
114 }
115}
116
117pub(super) struct SlateDbIterator {
118 iter: DbIterator,
119}
120
121#[async_trait]
122impl StorageIterator for SlateDbIterator {
123 #[tracing::instrument(level = "trace", skip_all)]
124 async fn next(&mut self) -> StorageResult<Option<Record>> {
125 match self.iter.next().await.map_err(StorageError::from_storage)? {
126 Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
127 None => Ok(None),
128 }
129 }
130}
131
132pub struct SlateDbStorageSnapshot {
136 snapshot: Arc<DbSnapshot>,
137}
138
139#[async_trait]
140impl StorageRead for SlateDbStorageSnapshot {
141 #[tracing::instrument(level = "trace", skip_all)]
142 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
143 let value = self
144 .snapshot
145 .get(&key)
146 .await
147 .map_err(StorageError::from_storage)?;
148
149 match value {
150 Some(v) => Ok(Some(Record::new(key, v))),
151 None => Ok(None),
152 }
153 }
154
155 #[tracing::instrument(level = "trace", skip_all)]
156 async fn scan_iter(
157 &self,
158 range: BytesRange,
159 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
160 let iter = self
161 .snapshot
162 .scan_with_options(range, &default_scan_options())
163 .await
164 .map_err(StorageError::from_storage)?;
165 Ok(Box::new(SlateDbIterator { iter }))
166 }
167}
168
169#[async_trait]
170impl StorageSnapshot for SlateDbStorageSnapshot {}
171
172#[async_trait]
173impl Storage for SlateDbStorage {
174 async fn apply(&self, records: Vec<RecordOp>) -> StorageResult<()> {
175 let mut batch = WriteBatch::new();
176 for op in records {
177 match op {
178 RecordOp::Put(record) => batch.put(record.key, record.value),
179 RecordOp::Merge(record) => batch.merge(record.key, record.value),
180 RecordOp::Delete(key) => batch.delete(key),
181 }
182 }
183 self.db
184 .write(batch)
185 .await
186 .map_err(StorageError::from_storage)?;
187 Ok(())
188 }
189 async fn put(&self, records: Vec<Record>) -> StorageResult<()> {
190 self.put_with_options(records, WriteOptions::default())
191 .await
192 }
193
194 async fn put_with_options(
195 &self,
196 records: Vec<Record>,
197 options: WriteOptions,
198 ) -> StorageResult<()> {
199 let mut batch = WriteBatch::new();
200 for record in records {
201 batch.put(record.key, record.value);
202 }
203 let slate_options = SlateDbWriteOptions {
204 await_durable: options.await_durable,
205 };
206 self.db
207 .write_with_options(batch, &slate_options)
208 .await
209 .map_err(StorageError::from_storage)?;
210 Ok(())
211 }
212
213 async fn merge(&self, records: Vec<Record>) -> StorageResult<()> {
219 let mut batch = WriteBatch::new();
220 for record in records {
221 batch.merge(record.key, record.value);
222 }
223 self.db.write(batch).await.map_err(|e| {
224 let error_msg = e.to_string();
225 if error_msg.contains("merge operator") || error_msg.contains("not configured") {
227 StorageError::Storage("Merge operator not configured for this database".to_string())
228 } else {
229 StorageError::from_storage(e)
230 }
231 })?;
232 Ok(())
233 }
234
235 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
236 let snapshot = self
237 .db
238 .snapshot()
239 .await
240 .map_err(StorageError::from_storage)?;
241 Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
242 }
243
244 async fn flush(&self) -> StorageResult<()> {
245 self.db.flush().await.map_err(StorageError::from_storage)?;
246 Ok(())
247 }
248
249 async fn close(&self) -> StorageResult<()> {
250 self.db.close().await.map_err(StorageError::from_storage)?;
251 Ok(())
252 }
253}
254
255pub struct SlateDbStorageReader {
260 reader: Arc<DbReader>,
261}
262
263impl SlateDbStorageReader {
264 pub fn new(reader: Arc<DbReader>) -> Self {
266 Self { reader }
267 }
268}
269
270#[async_trait]
271impl StorageRead for SlateDbStorageReader {
272 #[tracing::instrument(level = "trace", skip_all)]
273 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
274 let value = self
275 .reader
276 .get(&key)
277 .await
278 .map_err(StorageError::from_storage)?;
279
280 match value {
281 Some(v) => Ok(Some(Record::new(key, v))),
282 None => Ok(None),
283 }
284 }
285
286 #[tracing::instrument(level = "trace", skip_all)]
287 async fn scan_iter(
288 &self,
289 range: BytesRange,
290 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
291 let iter = self
292 .reader
293 .scan_with_options(range, &default_scan_options())
294 .await
295 .map_err(StorageError::from_storage)?;
296 Ok(Box::new(SlateDbIterator { iter }))
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303 use crate::BytesRange;
304 use slatedb::DbBuilder;
305 use slatedb::object_store::memory::InMemory;
306
307 #[tokio::test]
308 async fn should_read_data_written_by_storage_via_reader() {
309 let object_store = Arc::new(InMemory::new());
310 let path = "/test/db";
311
312 let db = DbBuilder::new(path, object_store.clone())
314 .build()
315 .await
316 .unwrap();
317 let storage = SlateDbStorage::new(Arc::new(db));
318
319 storage
320 .put(vec![
321 Record::new(Bytes::from("key1"), Bytes::from("value1")),
322 Record::new(Bytes::from("key2"), Bytes::from("value2")),
323 ])
324 .await
325 .unwrap();
326 storage.flush().await.unwrap();
327
328 let reader = DbReader::open(path, object_store, None, Default::default())
330 .await
331 .unwrap();
332 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
333
334 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
335 assert!(record.is_some());
336 assert_eq!(record.unwrap().value, Bytes::from("value1"));
337
338 let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
339 assert!(record.is_some());
340 assert_eq!(record.unwrap().value, Bytes::from("value2"));
341
342 let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
343 assert!(record.is_none());
344
345 storage.close().await.unwrap();
346 }
347
348 #[tokio::test]
349 async fn should_scan_data_written_by_storage_via_reader() {
350 let object_store = Arc::new(InMemory::new());
351 let path = "/test/db";
352
353 let db = DbBuilder::new(path, object_store.clone())
355 .build()
356 .await
357 .unwrap();
358 let storage = SlateDbStorage::new(Arc::new(db));
359
360 storage
361 .put(vec![
362 Record::new(Bytes::from("a"), Bytes::from("1")),
363 Record::new(Bytes::from("b"), Bytes::from("2")),
364 Record::new(Bytes::from("c"), Bytes::from("3")),
365 ])
366 .await
367 .unwrap();
368 storage.flush().await.unwrap();
369
370 let reader = DbReader::open(path, object_store, None, Default::default())
372 .await
373 .unwrap();
374 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
375
376 let mut iter = storage_reader
377 .scan_iter(BytesRange::unbounded())
378 .await
379 .unwrap();
380 let mut results = Vec::new();
381 while let Some(record) = iter.next().await.unwrap() {
382 results.push((record.key, record.value));
383 }
384
385 assert_eq!(results.len(), 3);
386 assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
387 assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
388 assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
389
390 storage.close().await.unwrap();
391 }
392
393 #[tokio::test]
394 async fn should_coexist_writer_and_reader_without_fencing_error() {
395 let object_store = Arc::new(InMemory::new());
396 let path = "/test/db";
397
398 let db = DbBuilder::new(path, object_store.clone())
400 .build()
401 .await
402 .unwrap();
403 let storage = SlateDbStorage::new(Arc::new(db));
404
405 storage
407 .put(vec![Record::new(
408 Bytes::from("key1"),
409 Bytes::from("value1"),
410 )])
411 .await
412 .unwrap();
413 storage.flush().await.unwrap();
414
415 let reader = DbReader::open(path, object_store, None, Default::default())
417 .await
418 .unwrap();
419 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
420
421 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
423 assert!(record.is_some());
424 assert_eq!(record.unwrap().value, Bytes::from("value1"));
425
426 storage
428 .put(vec![Record::new(
429 Bytes::from("key2"),
430 Bytes::from("value2"),
431 )])
432 .await
433 .unwrap();
434 storage.flush().await.unwrap();
435
436 storage.close().await.unwrap();
437 }
438}