Skip to main content

common/storage/
slate.rs

1use std::sync::Arc;
2
3use crate::storage::factory::OwnedHybridCache;
4use crate::storage::{MergeOptions, PutOptions};
5use crate::{
6    BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl,
7    storage::{
8        MergeOperator, MergeRecordOp, PutRecordOp, RecordOp, Storage, StorageSnapshot,
9        WriteOptions, WriteResult,
10    },
11};
12use async_trait::async_trait;
13use bytes::Bytes;
14use slatedb::IterationOrder;
15use slatedb::config::ScanOptions;
16use slatedb::{
17    Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
18    MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
19};
20use tokio::sync::watch;
21use tracing::warn;
22
23/// Adapter that wraps our `MergeOperator` trait to implement SlateDB's `MergeOperator` trait.
24///
25/// This allows using our common merge operator interface with SlateDB's merge functionality.
26pub struct SlateDbMergeOperatorAdapter {
27    operator: Arc<dyn MergeOperator>,
28}
29
30impl SlateDbMergeOperatorAdapter {
31    fn new(operator: Arc<dyn MergeOperator>) -> Self {
32        Self { operator }
33    }
34}
35
36impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
37    fn merge(
38        &self,
39        key: &Bytes,
40        existing_value: Option<Bytes>,
41        value: Bytes,
42    ) -> Result<Bytes, MergeOperatorError> {
43        Ok(self.operator.merge_batch(key, existing_value, &[value]))
44    }
45
46    fn merge_batch(
47        &self,
48        key: &Bytes,
49        existing_value: Option<Bytes>,
50        operands: &[Bytes],
51    ) -> Result<Bytes, MergeOperatorError> {
52        if operands.is_empty() && existing_value.is_none() {
53            return Err(MergeOperatorError::EmptyBatch);
54        }
55        Ok(self.operator.merge_batch(key, existing_value, operands))
56    }
57}
58
59/// Returns the default scan options used for storage scans.
60fn default_scan_options() -> ScanOptions {
61    ScanOptions {
62        durability_filter: Default::default(),
63        dirty: false,
64        read_ahead_bytes: 1024 * 1024,
65        cache_blocks: true,
66        max_fetch_tasks: 4,
67        order: IterationOrder::Ascending,
68    }
69}
70
71/// SlateDB-backed implementation of the Storage trait.
72///
73/// SlateDB is an embedded key-value store built on object storage, providing
74/// LSM-tree semantics with cloud-native durability.
75pub struct SlateDbStorage {
76    pub(super) db: Arc<Db>,
77    durable_tx: watch::Sender<u64>,
78    durable_bridge_abort: tokio::task::AbortHandle,
79    // TODO(slatedb 0.13): remove once DbCache exposes a close() hook and
80    // SlateDb::close() drives cache shutdown itself.
81    managed_cache: Option<OwnedHybridCache>,
82}
83
84impl SlateDbStorage {
85    /// Creates a new SlateDbStorage instance wrapping the given SlateDB database.
86    pub fn new(db: Arc<Db>) -> Self {
87        Self::new_with_managed_cache(db, None)
88    }
89
90    /// Creates a new SlateDbStorage that will explicitly close the given foyer
91    /// hybrid cache during [`StorageRead::close`]. See [`OwnedHybridCache`] for
92    /// why this exists.
93    pub(crate) fn new_with_managed_cache(
94        db: Arc<Db>,
95        managed_cache: Option<OwnedHybridCache>,
96    ) -> Self {
97        let slate_rx = db.subscribe();
98        let (durable_tx, _) = watch::channel(slate_rx.borrow().durable_seq);
99        let task = tokio::spawn({
100            let tx = durable_tx.clone();
101            async move {
102                let mut slate_rx = slate_rx;
103                while slate_rx.changed().await.is_ok() {
104                    let durable_seq = slate_rx.borrow_and_update().durable_seq;
105                    if tx.send(durable_seq).is_err() {
106                        break;
107                    }
108                }
109            }
110        });
111
112        Self {
113            db,
114            durable_tx,
115            durable_bridge_abort: task.abort_handle(),
116            managed_cache,
117        }
118    }
119
120    /// Creates a SlateDB `MergeOperator` from our common `MergeOperator` trait.
121    ///
122    /// This adapter can be used when constructing a SlateDB database with a merge operator:
123    /// ```rust,ignore
124    /// use common::storage::MergeOperator;
125    /// use slatedb::{DbBuilder, object_store::ObjectStore};
126    ///
127    /// let my_merge_op: Arc<dyn MergeOperator> = Arc::new(MyMergeOperator);
128    /// let slate_merge_op = SlateDbStorage::merge_operator_adapter(my_merge_op);
129    ///
130    /// let db = DbBuilder::new("path", object_store)
131    ///     .with_merge_operator(Arc::new(slate_merge_op))
132    ///     .build()
133    ///     .await?;
134    /// ```
135    pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
136        SlateDbMergeOperatorAdapter::new(operator)
137    }
138}
139
140#[async_trait]
141impl StorageRead for SlateDbStorage {
142    /// Retrieves a single record by key from SlateDB.
143    ///
144    /// Returns `None` if the key does not exist.
145    #[tracing::instrument(level = "trace", skip_all)]
146    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
147        let value = self
148            .db
149            .get(&key)
150            .await
151            .map_err(StorageError::from_storage)?;
152
153        match value {
154            Some(v) => Ok(Some(Record::new(key, v))),
155            None => Ok(None),
156        }
157    }
158
159    #[tracing::instrument(level = "trace", skip_all)]
160    async fn scan_iter(
161        &self,
162        range: BytesRange,
163    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
164        let iter = self
165            .db
166            .scan_with_options(range, &default_scan_options())
167            .await
168            .map_err(StorageError::from_storage)?;
169        Ok(Box::new(SlateDbIterator { iter }))
170    }
171
172    async fn close(&self) -> StorageResult<()> {
173        // Stop durable bridge first so no status subscriber outlives DB close.
174        self.durable_bridge_abort.abort();
175        self.db.close().await.map_err(StorageError::from_storage)?;
176        // Close the cache after SlateDB so no inserts race the flushers. Log
177        // and swallow errors: cache flush failures only cost cache warmth, not
178        // durability, and we'd rather not fail shutdown over it.
179        // TODO(slatedb 0.13): remove once DbCache owns its close lifecycle.
180        if let Some(cache) = &self.managed_cache
181            && let Err(e) = cache.close().await
182        {
183            warn!(error = ?e, "foyer hybrid cache close failed");
184        }
185        Ok(())
186    }
187}
188
189pub(super) struct SlateDbIterator {
190    iter: DbIterator,
191}
192
193#[async_trait]
194impl StorageIterator for SlateDbIterator {
195    #[tracing::instrument(level = "trace", skip_all)]
196    async fn next(&mut self) -> StorageResult<Option<Record>> {
197        match self.iter.next().await.map_err(StorageError::from_storage)? {
198            Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
199            None => Ok(None),
200        }
201    }
202}
203
204/// SlateDB snapshot wrapper that implements StorageSnapshot.
205///
206/// Provides a consistent read-only view of the database at the time the snapshot was created.
207pub struct SlateDbStorageSnapshot {
208    snapshot: Arc<DbSnapshot>,
209}
210
211#[async_trait]
212impl StorageRead for SlateDbStorageSnapshot {
213    #[tracing::instrument(level = "trace", skip_all)]
214    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
215        let value = self
216            .snapshot
217            .get(&key)
218            .await
219            .map_err(StorageError::from_storage)?;
220
221        match value {
222            Some(v) => Ok(Some(Record::new(key, v))),
223            None => Ok(None),
224        }
225    }
226
227    #[tracing::instrument(level = "trace", skip_all)]
228    async fn scan_iter(
229        &self,
230        range: BytesRange,
231    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
232        let iter = self
233            .snapshot
234            .scan_with_options(range, &default_scan_options())
235            .await
236            .map_err(StorageError::from_storage)?;
237        Ok(Box::new(SlateDbIterator { iter }))
238    }
239}
240
241#[async_trait]
242impl StorageSnapshot for SlateDbStorageSnapshot {}
243
244#[async_trait]
245impl Storage for SlateDbStorage {
246    async fn apply_with_options(
247        &self,
248        records: Vec<RecordOp>,
249        options: WriteOptions,
250    ) -> StorageResult<WriteResult> {
251        let mut batch = WriteBatch::new();
252        for op in records {
253            match op {
254                RecordOp::Put(op) => {
255                    batch.put_with_options(op.record.key, op.record.value, &op.options.into())
256                }
257                RecordOp::Merge(op) => {
258                    batch.merge_with_options(op.record.key, op.record.value, &op.options.into())
259                }
260                RecordOp::Delete(key) => batch.delete(key),
261            }
262        }
263        let slate_options = SlateDbWriteOptions {
264            await_durable: options.await_durable,
265        };
266        let write_handle = self
267            .db
268            .write_with_options(batch, &slate_options)
269            .await
270            .map_err(StorageError::from_storage)?;
271        Ok(WriteResult {
272            seqnum: write_handle.seqnum(),
273        })
274    }
275
276    async fn put_with_options(
277        &self,
278        records: Vec<PutRecordOp>,
279        options: WriteOptions,
280    ) -> StorageResult<WriteResult> {
281        let mut batch = WriteBatch::new();
282        for op in records {
283            batch.put_with_options(op.record.key, op.record.value, &op.options.into());
284        }
285        let slate_options = SlateDbWriteOptions {
286            await_durable: options.await_durable,
287        };
288        let write_handle = self
289            .db
290            .write_with_options(batch, &slate_options)
291            .await
292            .map_err(StorageError::from_storage)?;
293        Ok(WriteResult {
294            seqnum: write_handle.seqnum(),
295        })
296    }
297
298    async fn merge_with_options(
299        &self,
300        records: Vec<MergeRecordOp>,
301        options: WriteOptions,
302    ) -> StorageResult<WriteResult> {
303        let mut batch = WriteBatch::new();
304        for op in records {
305            batch.merge_with_options(op.record.key, op.record.value, &op.options.into());
306        }
307        let slate_options = SlateDbWriteOptions {
308            await_durable: options.await_durable,
309        };
310        let write_handle = self
311            .db
312            .write_with_options(batch, &slate_options)
313            .await
314            .map_err(|e| {
315                let error_msg = e.to_string();
316                if error_msg.contains("merge operator") || error_msg.contains("not configured") {
317                    StorageError::Storage(
318                        "Merge operator not configured for this database".to_string(),
319                    )
320                } else {
321                    StorageError::from_storage(e)
322                }
323            })?;
324        Ok(WriteResult {
325            seqnum: write_handle.seqnum(),
326        })
327    }
328
329    fn subscribe_durable(&self) -> watch::Receiver<u64> {
330        self.durable_tx.subscribe()
331    }
332
333    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
334        let snapshot = self
335            .db
336            .snapshot()
337            .await
338            .map_err(StorageError::from_storage)?;
339        Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
340    }
341
342    async fn flush(&self) -> StorageResult<()> {
343        self.db.flush().await.map_err(StorageError::from_storage)?;
344        Ok(())
345    }
346}
347
348impl From<Ttl> for slatedb::config::Ttl {
349    fn from(value: Ttl) -> Self {
350        match value {
351            Ttl::Default => slatedb::config::Ttl::Default,
352            Ttl::NoExpiry => slatedb::config::Ttl::NoExpiry,
353            Ttl::ExpireAfter(ts) => slatedb::config::Ttl::ExpireAfter(ts),
354        }
355    }
356}
357
358impl From<PutOptions> for slatedb::config::PutOptions {
359    fn from(value: PutOptions) -> Self {
360        Self {
361            ttl: value.ttl.into(),
362        }
363    }
364}
365
366impl From<MergeOptions> for slatedb::config::MergeOptions {
367    fn from(value: MergeOptions) -> Self {
368        Self {
369            ttl: value.ttl.into(),
370        }
371    }
372}
373
374/// Read-only SlateDB storage using `DbReader`.
375///
376/// This struct provides read-only access to a SlateDB database without fencing,
377/// allowing multiple readers to coexist with a single writer.
378pub struct SlateDbStorageReader {
379    reader: Arc<DbReader>,
380    // TODO(slatedb 0.13): remove once DbCache exposes a close() hook and
381    // DbReader::close() drives cache shutdown itself.
382    managed_cache: Option<OwnedHybridCache>,
383}
384
385impl SlateDbStorageReader {
386    /// Creates a new SlateDbStorageReader wrapping the given DbReader.
387    pub fn new(reader: Arc<DbReader>) -> Self {
388        Self::new_with_managed_cache(reader, None)
389    }
390
391    /// Creates a reader that will explicitly close the given foyer hybrid
392    /// cache during [`StorageRead::close`].
393    pub(crate) fn new_with_managed_cache(
394        reader: Arc<DbReader>,
395        managed_cache: Option<OwnedHybridCache>,
396    ) -> Self {
397        Self {
398            reader,
399            managed_cache,
400        }
401    }
402}
403
404#[async_trait]
405impl StorageRead for SlateDbStorageReader {
406    #[tracing::instrument(level = "trace", skip_all)]
407    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
408        let value = self
409            .reader
410            .get(&key)
411            .await
412            .map_err(StorageError::from_storage)?;
413
414        match value {
415            Some(v) => Ok(Some(Record::new(key, v))),
416            None => Ok(None),
417        }
418    }
419
420    #[tracing::instrument(level = "trace", skip_all)]
421    async fn scan_iter(
422        &self,
423        range: BytesRange,
424    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
425        let iter = self
426            .reader
427            .scan_with_options(range, &default_scan_options())
428            .await
429            .map_err(StorageError::from_storage)?;
430        Ok(Box::new(SlateDbIterator { iter }))
431    }
432
433    async fn close(&self) -> StorageResult<()> {
434        self.reader
435            .close()
436            .await
437            .map_err(StorageError::from_storage)?;
438        // Close the cache after the reader so no inserts race the flushers.
439        // TODO(slatedb 0.13): remove once DbCache owns its close lifecycle.
440        if let Some(cache) = &self.managed_cache
441            && let Err(e) = cache.close().await
442        {
443            warn!(error = ?e, "foyer hybrid cache close failed");
444        }
445        Ok(())
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452    use crate::BytesRange;
453    use slatedb::DbBuilder;
454    use slatedb::config::Settings;
455    use slatedb::object_store::memory::InMemory;
456    use slatedb_common::clock::MockSystemClock;
457
458    #[tokio::test]
459    async fn should_read_data_written_by_storage_via_reader() {
460        let object_store = Arc::new(InMemory::new());
461        let path = "/test/db";
462
463        // Create writer and write data
464        let db = DbBuilder::new(path, object_store.clone())
465            .build()
466            .await
467            .unwrap();
468        let storage = SlateDbStorage::new(Arc::new(db));
469
470        storage
471            .put(vec![
472                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
473                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
474            ])
475            .await
476            .unwrap();
477        storage.flush().await.unwrap();
478
479        // Create reader and verify data
480        let reader = DbReader::builder(path, object_store).build().await.unwrap();
481        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
482
483        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
484        assert!(record.is_some());
485        assert_eq!(record.unwrap().value, Bytes::from("value1"));
486
487        let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
488        assert!(record.is_some());
489        assert_eq!(record.unwrap().value, Bytes::from("value2"));
490
491        let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
492        assert!(record.is_none());
493
494        storage.close().await.unwrap();
495    }
496
497    #[tokio::test]
498    async fn should_scan_data_written_by_storage_via_reader() {
499        let object_store = Arc::new(InMemory::new());
500        let path = "/test/db";
501
502        // Create writer and write data
503        let db = DbBuilder::new(path, object_store.clone())
504            .build()
505            .await
506            .unwrap();
507        let storage = SlateDbStorage::new(Arc::new(db));
508
509        storage
510            .put(vec![
511                Record::new(Bytes::from("a"), Bytes::from("1")).into(),
512                Record::new(Bytes::from("b"), Bytes::from("2")).into(),
513                Record::new(Bytes::from("c"), Bytes::from("3")).into(),
514            ])
515            .await
516            .unwrap();
517        storage.flush().await.unwrap();
518
519        // Create reader and scan data
520        let reader = DbReader::builder(path, object_store).build().await.unwrap();
521        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
522
523        let mut iter = storage_reader
524            .scan_iter(BytesRange::unbounded())
525            .await
526            .unwrap();
527        let mut results = Vec::new();
528        while let Some(record) = iter.next().await.unwrap() {
529            results.push((record.key, record.value));
530        }
531
532        assert_eq!(results.len(), 3);
533        assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
534        assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
535        assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
536
537        storage.close().await.unwrap();
538    }
539
540    #[tokio::test]
541    async fn should_coexist_writer_and_reader_without_fencing_error() {
542        let object_store = Arc::new(InMemory::new());
543        let path = "/test/db";
544
545        // Create writer
546        let db = DbBuilder::new(path, object_store.clone())
547            .build()
548            .await
549            .unwrap();
550        let storage = SlateDbStorage::new(Arc::new(db));
551
552        // Write initial data
553        storage
554            .put(vec![
555                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
556            ])
557            .await
558            .unwrap();
559        storage.flush().await.unwrap();
560
561        // Create reader while writer is still open - this should NOT cause fencing error
562        let reader = DbReader::builder(path, object_store).build().await.unwrap();
563        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
564
565        // Reader can read the data
566        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
567        assert!(record.is_some());
568        assert_eq!(record.unwrap().value, Bytes::from("value1"));
569
570        // Writer can still write more data
571        storage
572            .put(vec![
573                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
574            ])
575            .await
576            .unwrap();
577        storage.flush().await.unwrap();
578
579        storage.close().await.unwrap();
580    }
581
582    #[tokio::test]
583    async fn should_set_expire_ts_based_on_ttl() {
584        // given - storage configured with a 30 second default TTL
585        let object_store = Arc::new(InMemory::new());
586        let path = "/test/ttl_db";
587        let clock = Arc::new(MockSystemClock::new());
588
589        let db = DbBuilder::new(path, object_store)
590            .with_settings(Settings {
591                default_ttl: Some(30_000),
592                ..Default::default()
593            })
594            .with_system_clock(clock.clone())
595            .build()
596            .await
597            .unwrap();
598        let storage = SlateDbStorage::new(Arc::new(db));
599
600        // Write three keys at time=0:
601        //   key1: expires after 20 seconds
602        //   key2: uses default TTL (30 seconds)
603        //   key3: never expires
604        storage
605            .put(vec![
606                PutRecordOp::new_with_options(
607                    Record::new(Bytes::from("key1"), Bytes::from("value1")),
608                    PutOptions {
609                        ttl: Ttl::ExpireAfter(20_000),
610                    },
611                ),
612                PutRecordOp::new_with_options(
613                    Record::new(Bytes::from("key2"), Bytes::from("value2")),
614                    PutOptions { ttl: Ttl::Default },
615                ),
616                PutRecordOp::new_with_options(
617                    Record::new(Bytes::from("key3"), Bytes::from("value3")),
618                    PutOptions { ttl: Ttl::NoExpiry },
619                ),
620            ])
621            .await
622            .unwrap();
623
624        // then - key1 has expire_ts = 20_000 (time=0 + 20s TTL)
625        let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
626        assert_eq!(kv1.expire_ts, Some(20_000));
627
628        // then - key2 has expire_ts = 30_000 (time=0 + 30s default TTL)
629        let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
630        assert_eq!(kv2.expire_ts, Some(30_000));
631
632        // then - key3 has no expire_ts (NoExpiry)
633        let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
634        assert_eq!(kv3.expire_ts, None);
635
636        storage.close().await.unwrap();
637    }
638
639    /// Simple merge operator that concatenates existing and new values.
640    struct ConcatMergeOperator;
641
642    impl MergeOperator for ConcatMergeOperator {
643        fn merge_batch(
644            &self,
645            _key: &Bytes,
646            existing_value: Option<Bytes>,
647            operands: &[Bytes],
648        ) -> Bytes {
649            let mut result = existing_value.unwrap_or_default().to_vec();
650            for operand in operands {
651                result.extend_from_slice(operand);
652            }
653            Bytes::from(result)
654        }
655    }
656
657    #[tokio::test]
658    async fn should_set_expire_ts_on_merge_records_based_on_ttl() {
659        // given - storage configured with a 30 second default TTL and a merge operator
660        let object_store = Arc::new(InMemory::new());
661        let path = "/test/merge_ttl_db";
662        let clock = Arc::new(MockSystemClock::new());
663
664        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
665        let slate_merge_op = SlateDbStorage::merge_operator_adapter(merge_op);
666        let db = DbBuilder::new(path, object_store)
667            .with_settings(Settings {
668                default_ttl: Some(30_000),
669                ..Default::default()
670            })
671            .with_system_clock(clock.clone())
672            .with_merge_operator(Arc::new(slate_merge_op))
673            .build()
674            .await
675            .unwrap();
676        let storage = SlateDbStorage::new(Arc::new(db));
677
678        // Merge three keys at time=0:
679        //   key1: expires after 20 seconds
680        //   key2: uses default TTL (30 seconds)
681        //   key3: never expires
682        storage
683            .merge(vec![
684                MergeRecordOp::new_with_ttl(
685                    Record::new(Bytes::from("key1"), Bytes::from("v1")),
686                    MergeOptions {
687                        ttl: Ttl::ExpireAfter(20_000),
688                    },
689                ),
690                MergeRecordOp::new_with_ttl(
691                    Record::new(Bytes::from("key2"), Bytes::from("v2")),
692                    MergeOptions { ttl: Ttl::Default },
693                ),
694                MergeRecordOp::new_with_ttl(
695                    Record::new(Bytes::from("key3"), Bytes::from("v3")),
696                    MergeOptions { ttl: Ttl::NoExpiry },
697                ),
698            ])
699            .await
700            .unwrap();
701
702        // then - key1 has expire_ts = 20_000 (time=0 + 20s TTL)
703        let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
704        assert_eq!(kv1.value, Bytes::from("v1"));
705        assert_eq!(kv1.expire_ts, Some(20_000));
706
707        // then - key2 has expire_ts = 30_000 (time=0 + 30s default TTL)
708        let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
709        assert_eq!(kv2.value, Bytes::from("v2"));
710        assert_eq!(kv2.expire_ts, Some(30_000));
711
712        // then - key3 has no expire_ts (NoExpiry)
713        let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
714        assert_eq!(kv3.value, Bytes::from("v3"));
715        assert_eq!(kv3.expire_ts, None);
716
717        storage.close().await.unwrap();
718    }
719
720    /// Helper: open a DbReader against the same path/object_store and try to
721    /// read a key. Returns `true` if the key is present.
722    async fn reader_can_see(path: &str, object_store: Arc<InMemory>, key: &str) -> bool {
723        reader_can_see_with_merge_op(path, object_store, key, None).await
724    }
725
726    async fn reader_can_see_with_merge_op(
727        path: &str,
728        object_store: Arc<InMemory>,
729        key: &str,
730        merge_op: Option<Arc<dyn SlateDbMergeOperator + Send + Sync>>,
731    ) -> bool {
732        let mut builder = DbReader::builder(path, object_store);
733        if let Some(op) = merge_op {
734            builder = builder.with_merge_operator(op);
735        }
736        let reader = builder.build().await.unwrap();
737        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
738        storage_reader
739            .get(Bytes::from(key.to_owned()))
740            .await
741            .unwrap()
742            .is_some()
743    }
744
745    #[tokio::test]
746    async fn put_defaults_to_not_await_durable() {
747        let object_store = Arc::new(InMemory::new());
748        let path = "/test/put_default_durability";
749
750        let db = DbBuilder::new(path, object_store.clone())
751            .build()
752            .await
753            .unwrap();
754        let storage = SlateDbStorage::new(Arc::new(db));
755
756        // put() uses WriteOptions::default() which is await_durable: false
757        storage
758            .put(vec![
759                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
760            ])
761            .await
762            .unwrap();
763
764        // Data is in memtable only — a reader (which reads from durable state) should NOT see it
765        assert!(!reader_can_see(path, object_store.clone(), "k1").await);
766
767        // After explicit flush, reader can see it
768        storage.flush().await.unwrap();
769        assert!(reader_can_see(path, object_store.clone(), "k1").await);
770
771        storage.close().await.unwrap();
772    }
773
774    #[tokio::test]
775    async fn put_with_await_durable_true_is_visible_to_reader() {
776        let object_store = Arc::new(InMemory::new());
777        let path = "/test/put_durable";
778
779        let db = DbBuilder::new(path, object_store.clone())
780            .build()
781            .await
782            .unwrap();
783        let storage = SlateDbStorage::new(Arc::new(db));
784
785        // Write with await_durable: true — should be flushed before returning
786        storage
787            .put_with_options(
788                vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
789                WriteOptions {
790                    await_durable: true,
791                },
792            )
793            .await
794            .unwrap();
795
796        // Reader should see it immediately without explicit flush
797        assert!(reader_can_see(path, object_store.clone(), "k1").await);
798
799        storage.close().await.unwrap();
800    }
801
802    #[tokio::test]
803    async fn apply_defaults_to_not_await_durable() {
804        let object_store = Arc::new(InMemory::new());
805        let path = "/test/apply_default_durability";
806
807        let db = DbBuilder::new(path, object_store.clone())
808            .build()
809            .await
810            .unwrap();
811        let storage = SlateDbStorage::new(Arc::new(db));
812
813        // apply() delegates with WriteOptions::default() (await_durable: false)
814        storage
815            .apply(vec![RecordOp::Put(
816                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
817            )])
818            .await
819            .unwrap();
820
821        assert!(!reader_can_see(path, object_store.clone(), "k1").await);
822
823        storage.flush().await.unwrap();
824        assert!(reader_can_see(path, object_store.clone(), "k1").await);
825
826        storage.close().await.unwrap();
827    }
828
829    #[tokio::test]
830    async fn apply_with_await_durable_true_is_visible_to_reader() {
831        let object_store = Arc::new(InMemory::new());
832        let path = "/test/apply_durable";
833
834        let db = DbBuilder::new(path, object_store.clone())
835            .build()
836            .await
837            .unwrap();
838        let storage = SlateDbStorage::new(Arc::new(db));
839
840        storage
841            .apply_with_options(
842                vec![RecordOp::Put(
843                    Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
844                )],
845                WriteOptions {
846                    await_durable: true,
847                },
848            )
849            .await
850            .unwrap();
851
852        assert!(reader_can_see(path, object_store.clone(), "k1").await);
853
854        storage.close().await.unwrap();
855    }
856
857    #[tokio::test]
858    async fn merge_defaults_to_not_await_durable() {
859        let object_store = Arc::new(InMemory::new());
860        let path = "/test/merge_default_durability";
861
862        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
863        let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
864        let db = DbBuilder::new(path, object_store.clone())
865            .with_merge_operator(slate_merge_op.clone())
866            .build()
867            .await
868            .unwrap();
869        let storage = SlateDbStorage::new(Arc::new(db));
870
871        // merge() delegates with WriteOptions::default() (await_durable: false)
872        storage
873            .merge(vec![
874                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
875            ])
876            .await
877            .unwrap();
878
879        let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
880            Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
881        assert!(
882            !reader_can_see_with_merge_op(
883                path,
884                object_store.clone(),
885                "k1",
886                Some(reader_merge_op.clone()),
887            )
888            .await
889        );
890
891        storage.flush().await.unwrap();
892        assert!(
893            reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
894                .await
895        );
896
897        storage.close().await.unwrap();
898    }
899
900    #[tokio::test]
901    async fn merge_with_await_durable_true_is_visible_to_reader() {
902        let object_store = Arc::new(InMemory::new());
903        let path = "/test/merge_durable";
904
905        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
906        let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
907        let db = DbBuilder::new(path, object_store.clone())
908            .with_merge_operator(slate_merge_op.clone())
909            .build()
910            .await
911            .unwrap();
912        let storage = SlateDbStorage::new(Arc::new(db));
913
914        storage
915            .merge_with_options(
916                vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
917                WriteOptions {
918                    await_durable: true,
919                },
920            )
921            .await
922            .unwrap();
923
924        let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
925            Arc::new(SlateDbStorage::merge_operator_adapter(merge_op));
926        assert!(
927            reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
928                .await
929        );
930
931        storage.close().await.unwrap();
932    }
933}