Skip to main content

common/storage/
slate.rs

1use std::sync::Arc;
2
3use crate::storage::factory::OwnedHybridCache;
4use crate::storage::{MergeOptions, PutOptions};
5use crate::{
6    BytesRange, CheckpointInfo, Record, StorageError, StorageIterator, StorageRead, StorageResult,
7    Ttl,
8    storage::{
9        MergeOperator, MergeRecordOp, PutRecordOp, RecordOp, Storage, StorageSnapshot,
10        WriteOptions, WriteResult,
11    },
12};
13use async_trait::async_trait;
14use bytes::Bytes;
15use slatedb::IterationOrder;
16use slatedb::config::{CheckpointOptions, CheckpointScope, ScanOptions};
17use slatedb::{
18    Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
19    MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
20};
21use tokio::sync::watch;
22use tracing::warn;
23
24/// Adapter that wraps our `MergeOperator` trait to implement SlateDB's `MergeOperator` trait.
25///
26/// This allows using our common merge operator interface with SlateDB's merge functionality.
27pub struct SlateDbMergeOperatorAdapter {
28    operator: Arc<dyn MergeOperator>,
29}
30
31impl SlateDbMergeOperatorAdapter {
32    fn new(operator: Arc<dyn MergeOperator>) -> Self {
33        Self { operator }
34    }
35}
36
37impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
38    fn merge(
39        &self,
40        key: &Bytes,
41        existing_value: Option<Bytes>,
42        value: Bytes,
43    ) -> Result<Bytes, MergeOperatorError> {
44        Ok(self.operator.merge_batch(key, existing_value, &[value]))
45    }
46
47    fn merge_batch(
48        &self,
49        key: &Bytes,
50        existing_value: Option<Bytes>,
51        operands: &[Bytes],
52    ) -> Result<Bytes, MergeOperatorError> {
53        if operands.is_empty() && existing_value.is_none() {
54            return Err(MergeOperatorError::EmptyBatch);
55        }
56        Ok(self.operator.merge_batch(key, existing_value, operands))
57    }
58}
59
60/// Returns the default scan options used for storage scans.
61fn default_scan_options() -> ScanOptions {
62    ScanOptions {
63        durability_filter: Default::default(),
64        dirty: false,
65        read_ahead_bytes: 1024 * 1024,
66        cache_blocks: true,
67        max_fetch_tasks: 4,
68        order: IterationOrder::Ascending,
69    }
70}
71
72/// SlateDB-backed implementation of the Storage trait.
73///
74/// SlateDB is an embedded key-value store built on object storage, providing
75/// LSM-tree semantics with cloud-native durability.
76pub struct SlateDbStorage {
77    pub(super) db: Arc<Db>,
78    durable_tx: watch::Sender<u64>,
79    durable_bridge_abort: tokio::task::AbortHandle,
80    // TODO(slatedb 0.13): remove once DbCache exposes a close() hook and
81    // SlateDb::close() drives cache shutdown itself.
82    managed_cache: Option<OwnedHybridCache>,
83}
84
85impl SlateDbStorage {
86    /// Creates a new SlateDbStorage instance wrapping the given SlateDB database.
87    pub fn new(db: Arc<Db>) -> Self {
88        Self::new_with_managed_cache(db, None)
89    }
90
91    /// Creates a new SlateDbStorage that will explicitly close the given foyer
92    /// hybrid cache during [`StorageRead::close`]. See [`OwnedHybridCache`] for
93    /// why this exists.
94    pub(crate) fn new_with_managed_cache(
95        db: Arc<Db>,
96        managed_cache: Option<OwnedHybridCache>,
97    ) -> Self {
98        let slate_rx = db.subscribe();
99        let (durable_tx, _) = watch::channel(slate_rx.borrow().durable_seq);
100        let task = tokio::spawn({
101            let tx = durable_tx.clone();
102            async move {
103                let mut slate_rx = slate_rx;
104                while slate_rx.changed().await.is_ok() {
105                    let durable_seq = slate_rx.borrow_and_update().durable_seq;
106                    if tx.send(durable_seq).is_err() {
107                        break;
108                    }
109                }
110            }
111        });
112
113        Self {
114            db,
115            durable_tx,
116            durable_bridge_abort: task.abort_handle(),
117            managed_cache,
118        }
119    }
120
121    /// Creates a SlateDB `MergeOperator` from our common `MergeOperator` trait.
122    ///
123    /// This adapter can be used when constructing a SlateDB database with a merge operator:
124    /// ```rust,ignore
125    /// use common::storage::MergeOperator;
126    /// use slatedb::{DbBuilder, object_store::ObjectStore};
127    ///
128    /// let my_merge_op: Arc<dyn MergeOperator> = Arc::new(MyMergeOperator);
129    /// let slate_merge_op = SlateDbStorage::merge_operator_adapter(my_merge_op);
130    ///
131    /// let db = DbBuilder::new("path", object_store)
132    ///     .with_merge_operator(Arc::new(slate_merge_op))
133    ///     .build()
134    ///     .await?;
135    /// ```
136    pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
137        SlateDbMergeOperatorAdapter::new(operator)
138    }
139}
140
141#[async_trait]
142impl StorageRead for SlateDbStorage {
143    /// Retrieves a single record by key from SlateDB.
144    ///
145    /// Returns `None` if the key does not exist.
146    #[tracing::instrument(level = "trace", skip_all)]
147    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
148        let value = self
149            .db
150            .get(&key)
151            .await
152            .map_err(StorageError::from_storage)?;
153
154        match value {
155            Some(v) => Ok(Some(Record::new(key, v))),
156            None => Ok(None),
157        }
158    }
159
160    #[tracing::instrument(level = "trace", skip_all)]
161    async fn scan_iter(
162        &self,
163        range: BytesRange,
164    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
165        let iter = self
166            .db
167            .scan_with_options(range, &default_scan_options())
168            .await
169            .map_err(StorageError::from_storage)?;
170        Ok(Box::new(SlateDbIterator { iter }))
171    }
172
173    async fn close(&self) -> StorageResult<()> {
174        // Stop durable bridge first so no status subscriber outlives DB close.
175        self.durable_bridge_abort.abort();
176        self.db.close().await.map_err(StorageError::from_storage)?;
177        // Close the cache after SlateDB so no inserts race the flushers. Log
178        // and swallow errors: cache flush failures only cost cache warmth, not
179        // durability, and we'd rather not fail shutdown over it.
180        // TODO(slatedb 0.13): remove once DbCache owns its close lifecycle.
181        if let Some(cache) = &self.managed_cache
182            && let Err(e) = cache.close().await
183        {
184            warn!(error = ?e, "foyer hybrid cache close failed");
185        }
186        Ok(())
187    }
188}
189
190pub(super) struct SlateDbIterator {
191    iter: DbIterator,
192}
193
194#[async_trait]
195impl StorageIterator for SlateDbIterator {
196    #[tracing::instrument(level = "trace", skip_all)]
197    async fn next(&mut self) -> StorageResult<Option<Record>> {
198        match self.iter.next().await.map_err(StorageError::from_storage)? {
199            Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
200            None => Ok(None),
201        }
202    }
203}
204
205/// SlateDB snapshot wrapper that implements StorageSnapshot.
206///
207/// Provides a consistent read-only view of the database at the time the snapshot was created.
208pub struct SlateDbStorageSnapshot {
209    snapshot: Arc<DbSnapshot>,
210}
211
212#[async_trait]
213impl StorageRead for SlateDbStorageSnapshot {
214    #[tracing::instrument(level = "trace", skip_all)]
215    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
216        let value = self
217            .snapshot
218            .get(&key)
219            .await
220            .map_err(StorageError::from_storage)?;
221
222        match value {
223            Some(v) => Ok(Some(Record::new(key, v))),
224            None => Ok(None),
225        }
226    }
227
228    #[tracing::instrument(level = "trace", skip_all)]
229    async fn scan_iter(
230        &self,
231        range: BytesRange,
232    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
233        let iter = self
234            .snapshot
235            .scan_with_options(range, &default_scan_options())
236            .await
237            .map_err(StorageError::from_storage)?;
238        Ok(Box::new(SlateDbIterator { iter }))
239    }
240}
241
242#[async_trait]
243impl StorageSnapshot for SlateDbStorageSnapshot {}
244
245#[async_trait]
246impl Storage for SlateDbStorage {
247    async fn apply_with_options(
248        &self,
249        records: Vec<RecordOp>,
250        options: WriteOptions,
251    ) -> StorageResult<WriteResult> {
252        let mut batch = WriteBatch::new();
253        for op in records {
254            match op {
255                RecordOp::Put(op) => {
256                    batch.put_with_options(op.record.key, op.record.value, &op.options.into())
257                }
258                RecordOp::Merge(op) => {
259                    batch.merge_with_options(op.record.key, op.record.value, &op.options.into())
260                }
261                RecordOp::Delete(key) => batch.delete(key),
262            }
263        }
264        let slate_options = SlateDbWriteOptions {
265            await_durable: options.await_durable,
266        };
267        let write_handle = self
268            .db
269            .write_with_options(batch, &slate_options)
270            .await
271            .map_err(StorageError::from_storage)?;
272        Ok(WriteResult {
273            seqnum: write_handle.seqnum(),
274        })
275    }
276
277    async fn put_with_options(
278        &self,
279        records: Vec<PutRecordOp>,
280        options: WriteOptions,
281    ) -> StorageResult<WriteResult> {
282        let mut batch = WriteBatch::new();
283        for op in records {
284            batch.put_with_options(op.record.key, op.record.value, &op.options.into());
285        }
286        let slate_options = SlateDbWriteOptions {
287            await_durable: options.await_durable,
288        };
289        let write_handle = self
290            .db
291            .write_with_options(batch, &slate_options)
292            .await
293            .map_err(StorageError::from_storage)?;
294        Ok(WriteResult {
295            seqnum: write_handle.seqnum(),
296        })
297    }
298
299    async fn merge_with_options(
300        &self,
301        records: Vec<MergeRecordOp>,
302        options: WriteOptions,
303    ) -> StorageResult<WriteResult> {
304        let mut batch = WriteBatch::new();
305        for op in records {
306            batch.merge_with_options(op.record.key, op.record.value, &op.options.into());
307        }
308        let slate_options = SlateDbWriteOptions {
309            await_durable: options.await_durable,
310        };
311        let write_handle = self
312            .db
313            .write_with_options(batch, &slate_options)
314            .await
315            .map_err(|e| {
316                let error_msg = e.to_string();
317                if error_msg.contains("merge operator") || error_msg.contains("not configured") {
318                    StorageError::Storage(
319                        "Merge operator not configured for this database".to_string(),
320                    )
321                } else {
322                    StorageError::from_storage(e)
323                }
324            })?;
325        Ok(WriteResult {
326            seqnum: write_handle.seqnum(),
327        })
328    }
329
330    fn subscribe_durable(&self) -> watch::Receiver<u64> {
331        self.durable_tx.subscribe()
332    }
333
334    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
335        let snapshot = self
336            .db
337            .snapshot()
338            .await
339            .map_err(StorageError::from_storage)?;
340        Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
341    }
342
343    async fn flush(&self) -> StorageResult<()> {
344        self.db.flush().await.map_err(StorageError::from_storage)?;
345        Ok(())
346    }
347
348    async fn create_checkpoint(&self) -> StorageResult<CheckpointInfo> {
349        let result = self
350            .db
351            .create_checkpoint(CheckpointScope::All, &CheckpointOptions::default())
352            .await
353            .map_err(StorageError::from_storage)?;
354        Ok(CheckpointInfo {
355            id: result.id,
356            manifest_id: result.manifest_id,
357        })
358    }
359}
360
361impl From<Ttl> for slatedb::config::Ttl {
362    fn from(value: Ttl) -> Self {
363        match value {
364            Ttl::Default => slatedb::config::Ttl::Default,
365            Ttl::NoExpiry => slatedb::config::Ttl::NoExpiry,
366            Ttl::ExpireAfter(ts) => slatedb::config::Ttl::ExpireAfter(ts),
367            Ttl::ExpireAt(ts) => slatedb::config::Ttl::ExpireAt(ts),
368        }
369    }
370}
371
372impl From<PutOptions> for slatedb::config::PutOptions {
373    fn from(value: PutOptions) -> Self {
374        Self {
375            ttl: value.ttl.into(),
376        }
377    }
378}
379
380impl From<MergeOptions> for slatedb::config::MergeOptions {
381    fn from(value: MergeOptions) -> Self {
382        Self {
383            ttl: value.ttl.into(),
384        }
385    }
386}
387
388/// Read-only SlateDB storage using `DbReader`.
389///
390/// This struct provides read-only access to a SlateDB database without fencing,
391/// allowing multiple readers to coexist with a single writer.
392pub struct SlateDbStorageReader {
393    reader: Arc<DbReader>,
394    // TODO(slatedb 0.13): remove once DbCache exposes a close() hook and
395    // DbReader::close() drives cache shutdown itself.
396    managed_cache: Option<OwnedHybridCache>,
397}
398
399impl SlateDbStorageReader {
400    /// Creates a new SlateDbStorageReader wrapping the given DbReader.
401    pub fn new(reader: Arc<DbReader>) -> Self {
402        Self::new_with_managed_cache(reader, None)
403    }
404
405    /// Creates a reader that will explicitly close the given foyer hybrid
406    /// cache during [`StorageRead::close`].
407    pub(crate) fn new_with_managed_cache(
408        reader: Arc<DbReader>,
409        managed_cache: Option<OwnedHybridCache>,
410    ) -> Self {
411        Self {
412            reader,
413            managed_cache,
414        }
415    }
416}
417
418#[async_trait]
419impl StorageRead for SlateDbStorageReader {
420    #[tracing::instrument(level = "trace", skip_all)]
421    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
422        let value = self
423            .reader
424            .get(&key)
425            .await
426            .map_err(StorageError::from_storage)?;
427
428        match value {
429            Some(v) => Ok(Some(Record::new(key, v))),
430            None => Ok(None),
431        }
432    }
433
434    #[tracing::instrument(level = "trace", skip_all)]
435    async fn scan_iter(
436        &self,
437        range: BytesRange,
438    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
439        let iter = self
440            .reader
441            .scan_with_options(range, &default_scan_options())
442            .await
443            .map_err(StorageError::from_storage)?;
444        Ok(Box::new(SlateDbIterator { iter }))
445    }
446
447    async fn close(&self) -> StorageResult<()> {
448        self.reader
449            .close()
450            .await
451            .map_err(StorageError::from_storage)?;
452        // Close the cache after the reader so no inserts race the flushers.
453        // TODO(slatedb 0.13): remove once DbCache owns its close lifecycle.
454        if let Some(cache) = &self.managed_cache
455            && let Err(e) = cache.close().await
456        {
457            warn!(error = ?e, "foyer hybrid cache close failed");
458        }
459        Ok(())
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466    use crate::BytesRange;
467    use slatedb::DbBuilder;
468    use slatedb::config::Settings;
469    use slatedb::object_store::memory::InMemory;
470    use slatedb_common::clock::MockSystemClock;
471
472    #[tokio::test]
473    async fn should_read_data_written_by_storage_via_reader() {
474        let object_store = Arc::new(InMemory::new());
475        let path = "/test/db";
476
477        // Create writer and write data
478        let db = DbBuilder::new(path, object_store.clone())
479            .build()
480            .await
481            .unwrap();
482        let storage = SlateDbStorage::new(Arc::new(db));
483
484        storage
485            .put(vec![
486                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
487                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
488            ])
489            .await
490            .unwrap();
491        storage.flush().await.unwrap();
492
493        // Create reader and verify data
494        let reader = DbReader::builder(path, object_store).build().await.unwrap();
495        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
496
497        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
498        assert!(record.is_some());
499        assert_eq!(record.unwrap().value, Bytes::from("value1"));
500
501        let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
502        assert!(record.is_some());
503        assert_eq!(record.unwrap().value, Bytes::from("value2"));
504
505        let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
506        assert!(record.is_none());
507
508        storage.close().await.unwrap();
509    }
510
511    #[tokio::test]
512    async fn should_scan_data_written_by_storage_via_reader() {
513        let object_store = Arc::new(InMemory::new());
514        let path = "/test/db";
515
516        // Create writer and write data
517        let db = DbBuilder::new(path, object_store.clone())
518            .build()
519            .await
520            .unwrap();
521        let storage = SlateDbStorage::new(Arc::new(db));
522
523        storage
524            .put(vec![
525                Record::new(Bytes::from("a"), Bytes::from("1")).into(),
526                Record::new(Bytes::from("b"), Bytes::from("2")).into(),
527                Record::new(Bytes::from("c"), Bytes::from("3")).into(),
528            ])
529            .await
530            .unwrap();
531        storage.flush().await.unwrap();
532
533        // Create reader and scan data
534        let reader = DbReader::builder(path, object_store).build().await.unwrap();
535        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
536
537        let mut iter = storage_reader
538            .scan_iter(BytesRange::unbounded())
539            .await
540            .unwrap();
541        let mut results = Vec::new();
542        while let Some(record) = iter.next().await.unwrap() {
543            results.push((record.key, record.value));
544        }
545
546        assert_eq!(results.len(), 3);
547        assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
548        assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
549        assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
550
551        storage.close().await.unwrap();
552    }
553
554    #[tokio::test]
555    async fn should_coexist_writer_and_reader_without_fencing_error() {
556        let object_store = Arc::new(InMemory::new());
557        let path = "/test/db";
558
559        // Create writer
560        let db = DbBuilder::new(path, object_store.clone())
561            .build()
562            .await
563            .unwrap();
564        let storage = SlateDbStorage::new(Arc::new(db));
565
566        // Write initial data
567        storage
568            .put(vec![
569                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
570            ])
571            .await
572            .unwrap();
573        storage.flush().await.unwrap();
574
575        // Create reader while writer is still open - this should NOT cause fencing error
576        let reader = DbReader::builder(path, object_store).build().await.unwrap();
577        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
578
579        // Reader can read the data
580        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
581        assert!(record.is_some());
582        assert_eq!(record.unwrap().value, Bytes::from("value1"));
583
584        // Writer can still write more data
585        storage
586            .put(vec![
587                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
588            ])
589            .await
590            .unwrap();
591        storage.flush().await.unwrap();
592
593        storage.close().await.unwrap();
594    }
595
596    #[tokio::test]
597    async fn should_set_expire_ts_based_on_ttl() {
598        // given - storage configured with a 30 second default TTL
599        let object_store = Arc::new(InMemory::new());
600        let path = "/test/ttl_db";
601        let clock = Arc::new(MockSystemClock::new());
602
603        let db = DbBuilder::new(path, object_store)
604            .with_settings(Settings {
605                default_ttl: Some(30_000),
606                ..Default::default()
607            })
608            .with_system_clock(clock.clone())
609            .build()
610            .await
611            .unwrap();
612        let storage = SlateDbStorage::new(Arc::new(db));
613
614        // Write three keys at time=0:
615        //   key1: expires after 20 seconds
616        //   key2: uses default TTL (30 seconds)
617        //   key3: never expires
618        storage
619            .put(vec![
620                PutRecordOp::new_with_options(
621                    Record::new(Bytes::from("key1"), Bytes::from("value1")),
622                    PutOptions {
623                        ttl: Ttl::ExpireAfter(20_000),
624                    },
625                ),
626                PutRecordOp::new_with_options(
627                    Record::new(Bytes::from("key2"), Bytes::from("value2")),
628                    PutOptions { ttl: Ttl::Default },
629                ),
630                PutRecordOp::new_with_options(
631                    Record::new(Bytes::from("key3"), Bytes::from("value3")),
632                    PutOptions { ttl: Ttl::NoExpiry },
633                ),
634            ])
635            .await
636            .unwrap();
637
638        // then - key1 has expire_ts = 20_000 (time=0 + 20s TTL)
639        let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
640        assert_eq!(kv1.expire_ts, Some(20_000));
641
642        // then - key2 has expire_ts = 30_000 (time=0 + 30s default TTL)
643        let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
644        assert_eq!(kv2.expire_ts, Some(30_000));
645
646        // then - key3 has no expire_ts (NoExpiry)
647        let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
648        assert_eq!(kv3.expire_ts, None);
649
650        storage.close().await.unwrap();
651    }
652
653    /// Simple merge operator that concatenates existing and new values.
654    struct ConcatMergeOperator;
655
656    impl MergeOperator for ConcatMergeOperator {
657        fn merge_batch(
658            &self,
659            _key: &Bytes,
660            existing_value: Option<Bytes>,
661            operands: &[Bytes],
662        ) -> Bytes {
663            let mut result = existing_value.unwrap_or_default().to_vec();
664            for operand in operands {
665                result.extend_from_slice(operand);
666            }
667            Bytes::from(result)
668        }
669    }
670
671    #[tokio::test]
672    async fn should_set_expire_ts_on_merge_records_based_on_ttl() {
673        // given - storage configured with a 30 second default TTL and a merge operator
674        let object_store = Arc::new(InMemory::new());
675        let path = "/test/merge_ttl_db";
676        let clock = Arc::new(MockSystemClock::new());
677
678        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
679        let slate_merge_op = SlateDbStorage::merge_operator_adapter(merge_op);
680        let db = DbBuilder::new(path, object_store)
681            .with_settings(Settings {
682                default_ttl: Some(30_000),
683                ..Default::default()
684            })
685            .with_system_clock(clock.clone())
686            .with_merge_operator(Arc::new(slate_merge_op))
687            .build()
688            .await
689            .unwrap();
690        let storage = SlateDbStorage::new(Arc::new(db));
691
692        // Merge three keys at time=0:
693        //   key1: expires after 20 seconds
694        //   key2: uses default TTL (30 seconds)
695        //   key3: never expires
696        storage
697            .merge(vec![
698                MergeRecordOp::new_with_ttl(
699                    Record::new(Bytes::from("key1"), Bytes::from("v1")),
700                    MergeOptions {
701                        ttl: Ttl::ExpireAfter(20_000),
702                    },
703                ),
704                MergeRecordOp::new_with_ttl(
705                    Record::new(Bytes::from("key2"), Bytes::from("v2")),
706                    MergeOptions { ttl: Ttl::Default },
707                ),
708                MergeRecordOp::new_with_ttl(
709                    Record::new(Bytes::from("key3"), Bytes::from("v3")),
710                    MergeOptions { ttl: Ttl::NoExpiry },
711                ),
712            ])
713            .await
714            .unwrap();
715
716        // then - key1 has expire_ts = 20_000 (time=0 + 20s TTL)
717        let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
718        assert_eq!(kv1.value, Bytes::from("v1"));
719        assert_eq!(kv1.expire_ts, Some(20_000));
720
721        // then - key2 has expire_ts = 30_000 (time=0 + 30s default TTL)
722        let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
723        assert_eq!(kv2.value, Bytes::from("v2"));
724        assert_eq!(kv2.expire_ts, Some(30_000));
725
726        // then - key3 has no expire_ts (NoExpiry)
727        let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
728        assert_eq!(kv3.value, Bytes::from("v3"));
729        assert_eq!(kv3.expire_ts, None);
730
731        storage.close().await.unwrap();
732    }
733
734    /// Helper: open a DbReader against the same path/object_store and try to
735    /// read a key. Returns `true` if the key is present.
736    async fn reader_can_see(path: &str, object_store: Arc<InMemory>, key: &str) -> bool {
737        reader_can_see_with_merge_op(path, object_store, key, None).await
738    }
739
740    async fn reader_can_see_with_merge_op(
741        path: &str,
742        object_store: Arc<InMemory>,
743        key: &str,
744        merge_op: Option<Arc<dyn SlateDbMergeOperator + Send + Sync>>,
745    ) -> bool {
746        let mut builder = DbReader::builder(path, object_store);
747        if let Some(op) = merge_op {
748            builder = builder.with_merge_operator(op);
749        }
750        let reader = builder.build().await.unwrap();
751        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
752        storage_reader
753            .get(Bytes::from(key.to_owned()))
754            .await
755            .unwrap()
756            .is_some()
757    }
758
759    #[tokio::test]
760    async fn put_defaults_to_not_await_durable() {
761        let object_store = Arc::new(InMemory::new());
762        let path = "/test/put_default_durability";
763
764        let db = DbBuilder::new(path, object_store.clone())
765            .build()
766            .await
767            .unwrap();
768        let storage = SlateDbStorage::new(Arc::new(db));
769
770        // put() uses WriteOptions::default() which is await_durable: false
771        storage
772            .put(vec![
773                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
774            ])
775            .await
776            .unwrap();
777
778        // Data is in memtable only — a reader (which reads from durable state) should NOT see it
779        assert!(!reader_can_see(path, object_store.clone(), "k1").await);
780
781        // After explicit flush, reader can see it
782        storage.flush().await.unwrap();
783        assert!(reader_can_see(path, object_store.clone(), "k1").await);
784
785        storage.close().await.unwrap();
786    }
787
788    #[tokio::test]
789    async fn put_with_await_durable_true_is_visible_to_reader() {
790        let object_store = Arc::new(InMemory::new());
791        let path = "/test/put_durable";
792
793        let db = DbBuilder::new(path, object_store.clone())
794            .build()
795            .await
796            .unwrap();
797        let storage = SlateDbStorage::new(Arc::new(db));
798
799        // Write with await_durable: true — should be flushed before returning
800        storage
801            .put_with_options(
802                vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
803                WriteOptions {
804                    await_durable: true,
805                },
806            )
807            .await
808            .unwrap();
809
810        // Reader should see it immediately without explicit flush
811        assert!(reader_can_see(path, object_store.clone(), "k1").await);
812
813        storage.close().await.unwrap();
814    }
815
816    #[tokio::test]
817    async fn apply_defaults_to_not_await_durable() {
818        let object_store = Arc::new(InMemory::new());
819        let path = "/test/apply_default_durability";
820
821        let db = DbBuilder::new(path, object_store.clone())
822            .build()
823            .await
824            .unwrap();
825        let storage = SlateDbStorage::new(Arc::new(db));
826
827        // apply() delegates with WriteOptions::default() (await_durable: false)
828        storage
829            .apply(vec![RecordOp::Put(
830                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
831            )])
832            .await
833            .unwrap();
834
835        assert!(!reader_can_see(path, object_store.clone(), "k1").await);
836
837        storage.flush().await.unwrap();
838        assert!(reader_can_see(path, object_store.clone(), "k1").await);
839
840        storage.close().await.unwrap();
841    }
842
843    #[tokio::test]
844    async fn apply_with_await_durable_true_is_visible_to_reader() {
845        let object_store = Arc::new(InMemory::new());
846        let path = "/test/apply_durable";
847
848        let db = DbBuilder::new(path, object_store.clone())
849            .build()
850            .await
851            .unwrap();
852        let storage = SlateDbStorage::new(Arc::new(db));
853
854        storage
855            .apply_with_options(
856                vec![RecordOp::Put(
857                    Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
858                )],
859                WriteOptions {
860                    await_durable: true,
861                },
862            )
863            .await
864            .unwrap();
865
866        assert!(reader_can_see(path, object_store.clone(), "k1").await);
867
868        storage.close().await.unwrap();
869    }
870
871    #[tokio::test]
872    async fn merge_defaults_to_not_await_durable() {
873        let object_store = Arc::new(InMemory::new());
874        let path = "/test/merge_default_durability";
875
876        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
877        let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
878        let db = DbBuilder::new(path, object_store.clone())
879            .with_merge_operator(slate_merge_op.clone())
880            .build()
881            .await
882            .unwrap();
883        let storage = SlateDbStorage::new(Arc::new(db));
884
885        // merge() delegates with WriteOptions::default() (await_durable: false)
886        storage
887            .merge(vec![
888                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
889            ])
890            .await
891            .unwrap();
892
893        let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
894            Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
895        assert!(
896            !reader_can_see_with_merge_op(
897                path,
898                object_store.clone(),
899                "k1",
900                Some(reader_merge_op.clone()),
901            )
902            .await
903        );
904
905        storage.flush().await.unwrap();
906        assert!(
907            reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
908                .await
909        );
910
911        storage.close().await.unwrap();
912    }
913
914    #[tokio::test]
915    async fn merge_with_await_durable_true_is_visible_to_reader() {
916        let object_store = Arc::new(InMemory::new());
917        let path = "/test/merge_durable";
918
919        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
920        let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
921        let db = DbBuilder::new(path, object_store.clone())
922            .with_merge_operator(slate_merge_op.clone())
923            .build()
924            .await
925            .unwrap();
926        let storage = SlateDbStorage::new(Arc::new(db));
927
928        storage
929            .merge_with_options(
930                vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
931                WriteOptions {
932                    await_durable: true,
933                },
934            )
935            .await
936            .unwrap();
937
938        let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
939            Arc::new(SlateDbStorage::merge_operator_adapter(merge_op));
940        assert!(
941            reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
942                .await
943        );
944
945        storage.close().await.unwrap();
946    }
947}