Skip to main content

common/storage/
slate.rs

1use std::sync::Arc;
2
3use crate::storage::factory::OwnedHybridCache;
4use crate::storage::{MergeOptions, PutOptions};
5use crate::{
6    BytesRange, CheckpointInfo, Record, StorageError, StorageIterator, StorageRead, StorageResult,
7    Ttl,
8    storage::{
9        MergeOperator, MergeRecordOp, PutRecordOp, RecordOp, Storage, StorageSnapshot,
10        WriteOptions, WriteResult,
11    },
12};
13use async_trait::async_trait;
14use bytes::Bytes;
15use slatedb::IterationOrder;
16use slatedb::config::{CheckpointOptions, CheckpointScope, ScanOptions};
17use slatedb::{
18    Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
19    MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
20};
21use tokio::sync::watch;
22use tracing::warn;
23
24/// Adapter that wraps our `MergeOperator` trait to implement SlateDB's `MergeOperator` trait.
25///
26/// This allows using our common merge operator interface with SlateDB's merge functionality.
27pub struct SlateDbMergeOperatorAdapter {
28    operator: Arc<dyn MergeOperator>,
29}
30
31impl SlateDbMergeOperatorAdapter {
32    fn new(operator: Arc<dyn MergeOperator>) -> Self {
33        Self { operator }
34    }
35}
36
37impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
38    fn merge(
39        &self,
40        key: &Bytes,
41        existing_value: Option<Bytes>,
42        value: Bytes,
43    ) -> Result<Bytes, MergeOperatorError> {
44        Ok(self.operator.merge_batch(key, existing_value, &[value]))
45    }
46
47    fn merge_batch(
48        &self,
49        key: &Bytes,
50        existing_value: Option<Bytes>,
51        operands: &[Bytes],
52    ) -> Result<Bytes, MergeOperatorError> {
53        if operands.is_empty() && existing_value.is_none() {
54            return Err(MergeOperatorError::EmptyBatch);
55        }
56        Ok(self.operator.merge_batch(key, existing_value, operands))
57    }
58}
59
60/// Returns the default scan options used for storage scans.
61fn default_scan_options() -> ScanOptions {
62    ScanOptions {
63        durability_filter: Default::default(),
64        dirty: false,
65        read_ahead_bytes: 1024 * 1024,
66        cache_blocks: true,
67        max_fetch_tasks: 4,
68        order: IterationOrder::Ascending,
69        filter_context: None,
70    }
71}
72
73/// SlateDB-backed implementation of the Storage trait.
74///
75/// SlateDB is an embedded key-value store built on object storage, providing
76/// LSM-tree semantics with cloud-native durability.
77pub struct SlateDbStorage {
78    pub(super) db: Arc<Db>,
79    durable_tx: watch::Sender<u64>,
80    durable_bridge_abort: tokio::task::AbortHandle,
81    // TODO(slatedb 0.13): remove once DbCache exposes a close() hook and
82    // SlateDb::close() drives cache shutdown itself.
83    managed_cache: Option<OwnedHybridCache>,
84}
85
86impl SlateDbStorage {
87    /// Creates a new SlateDbStorage instance wrapping the given SlateDB database.
88    pub fn new(db: Arc<Db>) -> Self {
89        Self::new_with_managed_cache(db, None)
90    }
91
92    /// Creates a new SlateDbStorage that will explicitly close the given foyer
93    /// hybrid cache during [`StorageRead::close`]. See [`OwnedHybridCache`] for
94    /// why this exists.
95    pub(crate) fn new_with_managed_cache(
96        db: Arc<Db>,
97        managed_cache: Option<OwnedHybridCache>,
98    ) -> Self {
99        let slate_rx = db.subscribe();
100        let (durable_tx, _) = watch::channel(slate_rx.borrow().durable_seq);
101        let task = tokio::spawn({
102            let tx = durable_tx.clone();
103            async move {
104                let mut slate_rx = slate_rx;
105                while slate_rx.changed().await.is_ok() {
106                    let durable_seq = slate_rx.borrow_and_update().durable_seq;
107                    // Use send_replace rather than send: SlateDB may publish
108                    // a DbStatus change during `open` (e.g. manifest write)
109                    // before any consumer has subscribed, and `send` would
110                    // return Err on zero receivers — killing the bridge
111                    // permanently. send_replace doesn't care about receiver
112                    // count, so late subscribers still get future updates.
113                    tx.send_replace(durable_seq);
114                }
115            }
116        });
117
118        Self {
119            db,
120            durable_tx,
121            durable_bridge_abort: task.abort_handle(),
122            managed_cache,
123        }
124    }
125
126    /// Creates a SlateDB `MergeOperator` from our common `MergeOperator` trait.
127    ///
128    /// This adapter can be used when constructing a SlateDB database with a merge operator:
129    /// ```rust,ignore
130    /// use common::storage::MergeOperator;
131    /// use slatedb::{DbBuilder, object_store::ObjectStore};
132    ///
133    /// let my_merge_op: Arc<dyn MergeOperator> = Arc::new(MyMergeOperator);
134    /// let slate_merge_op = SlateDbStorage::merge_operator_adapter(my_merge_op);
135    ///
136    /// let db = DbBuilder::new("path", object_store)
137    ///     .with_merge_operator(Arc::new(slate_merge_op))
138    ///     .build()
139    ///     .await?;
140    /// ```
141    pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
142        SlateDbMergeOperatorAdapter::new(operator)
143    }
144}
145
146#[async_trait]
147impl StorageRead for SlateDbStorage {
148    /// Retrieves a single record by key from SlateDB.
149    ///
150    /// Returns `None` if the key does not exist.
151    #[tracing::instrument(level = "trace", skip_all)]
152    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
153        let value = self
154            .db
155            .get(&key)
156            .await
157            .map_err(StorageError::from_storage)?;
158
159        match value {
160            Some(v) => Ok(Some(Record::new(key, v))),
161            None => Ok(None),
162        }
163    }
164
165    #[tracing::instrument(level = "trace", skip_all)]
166    async fn scan_iter(
167        &self,
168        range: BytesRange,
169    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
170        let iter = self
171            .db
172            .scan_with_options(range, &default_scan_options())
173            .await
174            .map_err(StorageError::from_storage)?;
175        Ok(Box::new(SlateDbIterator { iter }))
176    }
177
178    async fn close(&self) -> StorageResult<()> {
179        // Stop durable bridge first so no status subscriber outlives DB close.
180        self.durable_bridge_abort.abort();
181        self.db.close().await.map_err(StorageError::from_storage)?;
182        // Close the cache after SlateDB so no inserts race the flushers. Log
183        // and swallow errors: cache flush failures only cost cache warmth, not
184        // durability, and we'd rather not fail shutdown over it.
185        // TODO(slatedb 0.13): remove once DbCache owns its close lifecycle.
186        if let Some(cache) = &self.managed_cache
187            && let Err(e) = cache.close().await
188        {
189            warn!(error = ?e, "foyer hybrid cache close failed");
190        }
191        Ok(())
192    }
193}
194
195pub(super) struct SlateDbIterator {
196    iter: DbIterator,
197}
198
199#[async_trait]
200impl StorageIterator for SlateDbIterator {
201    #[tracing::instrument(level = "trace", skip_all)]
202    async fn next(&mut self) -> StorageResult<Option<Record>> {
203        match self.iter.next().await.map_err(StorageError::from_storage)? {
204            Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
205            None => Ok(None),
206        }
207    }
208}
209
210/// SlateDB snapshot wrapper that implements StorageSnapshot.
211///
212/// Provides a consistent read-only view of the database at the time the snapshot was created.
213pub struct SlateDbStorageSnapshot {
214    snapshot: Arc<DbSnapshot>,
215}
216
217#[async_trait]
218impl StorageRead for SlateDbStorageSnapshot {
219    #[tracing::instrument(level = "trace", skip_all)]
220    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
221        let value = self
222            .snapshot
223            .get(&key)
224            .await
225            .map_err(StorageError::from_storage)?;
226
227        match value {
228            Some(v) => Ok(Some(Record::new(key, v))),
229            None => Ok(None),
230        }
231    }
232
233    #[tracing::instrument(level = "trace", skip_all)]
234    async fn scan_iter(
235        &self,
236        range: BytesRange,
237    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
238        let iter = self
239            .snapshot
240            .scan_with_options(range, &default_scan_options())
241            .await
242            .map_err(StorageError::from_storage)?;
243        Ok(Box::new(SlateDbIterator { iter }))
244    }
245}
246
247#[async_trait]
248impl StorageSnapshot for SlateDbStorageSnapshot {}
249
250#[async_trait]
251impl Storage for SlateDbStorage {
252    async fn apply_with_options(
253        &self,
254        records: Vec<RecordOp>,
255        options: WriteOptions,
256    ) -> StorageResult<WriteResult> {
257        let mut batch = WriteBatch::new();
258        for op in records {
259            match op {
260                RecordOp::Put(op) => {
261                    batch.put_with_options(op.record.key, op.record.value, &op.options.into())
262                }
263                RecordOp::Merge(op) => {
264                    batch.merge_with_options(op.record.key, op.record.value, &op.options.into())
265                }
266                RecordOp::Delete(key) => batch.delete(key),
267            }
268        }
269        let slate_options = SlateDbWriteOptions {
270            await_durable: options.await_durable,
271            ..SlateDbWriteOptions::default()
272        };
273        let write_handle = self
274            .db
275            .write_with_options(batch, &slate_options)
276            .await
277            .map_err(StorageError::from_storage)?;
278        Ok(WriteResult {
279            seqnum: write_handle.seqnum(),
280        })
281    }
282
283    async fn put_with_options(
284        &self,
285        records: Vec<PutRecordOp>,
286        options: WriteOptions,
287    ) -> StorageResult<WriteResult> {
288        let mut batch = WriteBatch::new();
289        for op in records {
290            batch.put_with_options(op.record.key, op.record.value, &op.options.into());
291        }
292        let slate_options = SlateDbWriteOptions {
293            await_durable: options.await_durable,
294            ..SlateDbWriteOptions::default()
295        };
296        let write_handle = self
297            .db
298            .write_with_options(batch, &slate_options)
299            .await
300            .map_err(StorageError::from_storage)?;
301        Ok(WriteResult {
302            seqnum: write_handle.seqnum(),
303        })
304    }
305
306    async fn merge_with_options(
307        &self,
308        records: Vec<MergeRecordOp>,
309        options: WriteOptions,
310    ) -> StorageResult<WriteResult> {
311        let mut batch = WriteBatch::new();
312        for op in records {
313            batch.merge_with_options(op.record.key, op.record.value, &op.options.into());
314        }
315        let slate_options = SlateDbWriteOptions {
316            await_durable: options.await_durable,
317            ..SlateDbWriteOptions::default()
318        };
319        let write_handle = self
320            .db
321            .write_with_options(batch, &slate_options)
322            .await
323            .map_err(|e| {
324                let error_msg = e.to_string();
325                if error_msg.contains("merge operator") || error_msg.contains("not configured") {
326                    StorageError::Storage(
327                        "Merge operator not configured for this database".to_string(),
328                    )
329                } else {
330                    StorageError::from_storage(e)
331                }
332            })?;
333        Ok(WriteResult {
334            seqnum: write_handle.seqnum(),
335        })
336    }
337
338    fn subscribe_durable(&self) -> watch::Receiver<u64> {
339        self.durable_tx.subscribe()
340    }
341
342    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
343        let snapshot = self
344            .db
345            .snapshot()
346            .await
347            .map_err(StorageError::from_storage)?;
348        Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
349    }
350
351    async fn flush(&self) -> StorageResult<()> {
352        self.db.flush().await.map_err(StorageError::from_storage)?;
353        Ok(())
354    }
355
356    async fn create_checkpoint(&self) -> StorageResult<CheckpointInfo> {
357        let result = self
358            .db
359            .create_checkpoint(CheckpointScope::All, &CheckpointOptions::default())
360            .await
361            .map_err(StorageError::from_storage)?;
362        Ok(CheckpointInfo {
363            id: result.id,
364            manifest_id: result.manifest_id,
365        })
366    }
367}
368
369impl From<Ttl> for slatedb::config::Ttl {
370    fn from(value: Ttl) -> Self {
371        match value {
372            Ttl::Default => slatedb::config::Ttl::Default,
373            Ttl::NoExpiry => slatedb::config::Ttl::NoExpiry,
374            Ttl::ExpireAfter(ts) => slatedb::config::Ttl::ExpireAfter(ts),
375            Ttl::ExpireAt(ts) => slatedb::config::Ttl::ExpireAt(ts),
376        }
377    }
378}
379
380impl From<PutOptions> for slatedb::config::PutOptions {
381    fn from(value: PutOptions) -> Self {
382        Self {
383            ttl: value.ttl.into(),
384        }
385    }
386}
387
388impl From<MergeOptions> for slatedb::config::MergeOptions {
389    fn from(value: MergeOptions) -> Self {
390        Self {
391            ttl: value.ttl.into(),
392        }
393    }
394}
395
396/// Read-only SlateDB storage using `DbReader`.
397///
398/// This struct provides read-only access to a SlateDB database without fencing,
399/// allowing multiple readers to coexist with a single writer.
400pub struct SlateDbStorageReader {
401    reader: Arc<DbReader>,
402    // TODO(slatedb 0.13): remove once DbCache exposes a close() hook and
403    // DbReader::close() drives cache shutdown itself.
404    managed_cache: Option<OwnedHybridCache>,
405}
406
407impl SlateDbStorageReader {
408    /// Creates a new SlateDbStorageReader wrapping the given DbReader.
409    pub fn new(reader: Arc<DbReader>) -> Self {
410        Self::new_with_managed_cache(reader, None)
411    }
412
413    /// Creates a reader that will explicitly close the given foyer hybrid
414    /// cache during [`StorageRead::close`].
415    pub(crate) fn new_with_managed_cache(
416        reader: Arc<DbReader>,
417        managed_cache: Option<OwnedHybridCache>,
418    ) -> Self {
419        Self {
420            reader,
421            managed_cache,
422        }
423    }
424}
425
426#[async_trait]
427impl StorageRead for SlateDbStorageReader {
428    #[tracing::instrument(level = "trace", skip_all)]
429    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
430        let value = self
431            .reader
432            .get(&key)
433            .await
434            .map_err(StorageError::from_storage)?;
435
436        match value {
437            Some(v) => Ok(Some(Record::new(key, v))),
438            None => Ok(None),
439        }
440    }
441
442    #[tracing::instrument(level = "trace", skip_all)]
443    async fn scan_iter(
444        &self,
445        range: BytesRange,
446    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
447        let iter = self
448            .reader
449            .scan_with_options(range, &default_scan_options())
450            .await
451            .map_err(StorageError::from_storage)?;
452        Ok(Box::new(SlateDbIterator { iter }))
453    }
454
455    async fn close(&self) -> StorageResult<()> {
456        self.reader
457            .close()
458            .await
459            .map_err(StorageError::from_storage)?;
460        // Close the cache after the reader so no inserts race the flushers.
461        // TODO(slatedb 0.13): remove once DbCache owns its close lifecycle.
462        if let Some(cache) = &self.managed_cache
463            && let Err(e) = cache.close().await
464        {
465            warn!(error = ?e, "foyer hybrid cache close failed");
466        }
467        Ok(())
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474    use crate::BytesRange;
475    use slatedb::DbBuilder;
476    use slatedb::config::Settings;
477    use slatedb::object_store::memory::InMemory;
478    use slatedb_common::clock::MockSystemClock;
479
480    #[tokio::test]
481    async fn should_read_data_written_by_storage_via_reader() {
482        let object_store = Arc::new(InMemory::new());
483        let path = "/test/db";
484
485        // Create writer and write data
486        let db = DbBuilder::new(path, object_store.clone())
487            .build()
488            .await
489            .unwrap();
490        let storage = SlateDbStorage::new(Arc::new(db));
491
492        storage
493            .put(vec![
494                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
495                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
496            ])
497            .await
498            .unwrap();
499        storage.flush().await.unwrap();
500
501        // Create reader and verify data
502        let reader = DbReader::builder(path, object_store.clone())
503            .build()
504            .await
505            .unwrap();
506        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
507
508        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
509        assert!(record.is_some());
510        assert_eq!(record.unwrap().value, Bytes::from("value1"));
511
512        let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
513        assert!(record.is_some());
514        assert_eq!(record.unwrap().value, Bytes::from("value2"));
515
516        let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
517        assert!(record.is_none());
518
519        storage.close().await.unwrap();
520    }
521
522    #[tokio::test]
523    async fn should_scan_data_written_by_storage_via_reader() {
524        let object_store = Arc::new(InMemory::new());
525        let path = "/test/db";
526
527        // Create writer and write data
528        let db = DbBuilder::new(path, object_store.clone())
529            .build()
530            .await
531            .unwrap();
532        let storage = SlateDbStorage::new(Arc::new(db));
533
534        storage
535            .put(vec![
536                Record::new(Bytes::from("a"), Bytes::from("1")).into(),
537                Record::new(Bytes::from("b"), Bytes::from("2")).into(),
538                Record::new(Bytes::from("c"), Bytes::from("3")).into(),
539            ])
540            .await
541            .unwrap();
542        storage.flush().await.unwrap();
543
544        // Create reader and scan data
545        let reader = DbReader::builder(path, object_store.clone())
546            .build()
547            .await
548            .unwrap();
549        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
550
551        let mut iter = storage_reader
552            .scan_iter(BytesRange::unbounded())
553            .await
554            .unwrap();
555        let mut results = Vec::new();
556        while let Some(record) = iter.next().await.unwrap() {
557            results.push((record.key, record.value));
558        }
559
560        assert_eq!(results.len(), 3);
561        assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
562        assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
563        assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
564
565        storage.close().await.unwrap();
566    }
567
568    #[tokio::test]
569    async fn should_coexist_writer_and_reader_without_fencing_error() {
570        let object_store = Arc::new(InMemory::new());
571        let path = "/test/db";
572
573        // Create writer
574        let db = DbBuilder::new(path, object_store.clone())
575            .build()
576            .await
577            .unwrap();
578        let storage = SlateDbStorage::new(Arc::new(db));
579
580        // Write initial data
581        storage
582            .put(vec![
583                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
584            ])
585            .await
586            .unwrap();
587        storage.flush().await.unwrap();
588
589        // Create reader while writer is still open - this should NOT cause fencing error
590        let reader = DbReader::builder(path, object_store.clone())
591            .build()
592            .await
593            .unwrap();
594        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
595
596        // Reader can read the data
597        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
598        assert!(record.is_some());
599        assert_eq!(record.unwrap().value, Bytes::from("value1"));
600
601        // Writer can still write more data
602        storage
603            .put(vec![
604                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
605            ])
606            .await
607            .unwrap();
608        storage.flush().await.unwrap();
609
610        storage.close().await.unwrap();
611    }
612
613    #[tokio::test]
614    async fn should_set_expire_ts_based_on_ttl() {
615        // given - storage configured with a 30 second default TTL
616        let object_store = Arc::new(InMemory::new());
617        let path = "/test/ttl_db";
618        let clock = Arc::new(MockSystemClock::new());
619
620        let db = DbBuilder::new(path, object_store.clone())
621            .with_settings(Settings {
622                default_ttl: Some(30_000),
623                ..Default::default()
624            })
625            .with_system_clock(clock.clone())
626            .build()
627            .await
628            .unwrap();
629        let storage = SlateDbStorage::new(Arc::new(db));
630
631        // Write three keys at time=0:
632        //   key1: expires after 20 seconds
633        //   key2: uses default TTL (30 seconds)
634        //   key3: never expires
635        storage
636            .put(vec![
637                PutRecordOp::new_with_options(
638                    Record::new(Bytes::from("key1"), Bytes::from("value1")),
639                    PutOptions {
640                        ttl: Ttl::ExpireAfter(20_000),
641                    },
642                ),
643                PutRecordOp::new_with_options(
644                    Record::new(Bytes::from("key2"), Bytes::from("value2")),
645                    PutOptions { ttl: Ttl::Default },
646                ),
647                PutRecordOp::new_with_options(
648                    Record::new(Bytes::from("key3"), Bytes::from("value3")),
649                    PutOptions { ttl: Ttl::NoExpiry },
650                ),
651            ])
652            .await
653            .unwrap();
654
655        // then - key1 has expire_ts = 20_000 (time=0 + 20s TTL)
656        let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
657        assert_eq!(kv1.expire_ts, Some(20_000));
658
659        // then - key2 has expire_ts = 30_000 (time=0 + 30s default TTL)
660        let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
661        assert_eq!(kv2.expire_ts, Some(30_000));
662
663        // then - key3 has no expire_ts (NoExpiry)
664        let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
665        assert_eq!(kv3.expire_ts, None);
666
667        storage.close().await.unwrap();
668    }
669
670    /// Simple merge operator that concatenates existing and new values.
671    struct ConcatMergeOperator;
672
673    impl MergeOperator for ConcatMergeOperator {
674        fn merge_batch(
675            &self,
676            _key: &Bytes,
677            existing_value: Option<Bytes>,
678            operands: &[Bytes],
679        ) -> Bytes {
680            let mut result = existing_value.unwrap_or_default().to_vec();
681            for operand in operands {
682                result.extend_from_slice(operand);
683            }
684            Bytes::from(result)
685        }
686    }
687
688    #[tokio::test]
689    async fn should_set_expire_ts_on_merge_records_based_on_ttl() {
690        // given - storage configured with a 30 second default TTL and a merge operator
691        let object_store = Arc::new(InMemory::new());
692        let path = "/test/merge_ttl_db";
693        let clock = Arc::new(MockSystemClock::new());
694
695        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
696        let slate_merge_op = SlateDbStorage::merge_operator_adapter(merge_op);
697        let db = DbBuilder::new(path, object_store.clone())
698            .with_settings(Settings {
699                default_ttl: Some(30_000),
700                ..Default::default()
701            })
702            .with_system_clock(clock.clone())
703            .with_merge_operator(Arc::new(slate_merge_op))
704            .build()
705            .await
706            .unwrap();
707        let storage = SlateDbStorage::new(Arc::new(db));
708
709        // Merge three keys at time=0:
710        //   key1: expires after 20 seconds
711        //   key2: uses default TTL (30 seconds)
712        //   key3: never expires
713        storage
714            .merge(vec![
715                MergeRecordOp::new_with_ttl(
716                    Record::new(Bytes::from("key1"), Bytes::from("v1")),
717                    MergeOptions {
718                        ttl: Ttl::ExpireAfter(20_000),
719                    },
720                ),
721                MergeRecordOp::new_with_ttl(
722                    Record::new(Bytes::from("key2"), Bytes::from("v2")),
723                    MergeOptions { ttl: Ttl::Default },
724                ),
725                MergeRecordOp::new_with_ttl(
726                    Record::new(Bytes::from("key3"), Bytes::from("v3")),
727                    MergeOptions { ttl: Ttl::NoExpiry },
728                ),
729            ])
730            .await
731            .unwrap();
732
733        // then - key1 has expire_ts = 20_000 (time=0 + 20s TTL)
734        let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
735        assert_eq!(kv1.value, Bytes::from("v1"));
736        assert_eq!(kv1.expire_ts, Some(20_000));
737
738        // then - key2 has expire_ts = 30_000 (time=0 + 30s default TTL)
739        let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
740        assert_eq!(kv2.value, Bytes::from("v2"));
741        assert_eq!(kv2.expire_ts, Some(30_000));
742
743        // then - key3 has no expire_ts (NoExpiry)
744        let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
745        assert_eq!(kv3.value, Bytes::from("v3"));
746        assert_eq!(kv3.expire_ts, None);
747
748        storage.close().await.unwrap();
749    }
750
751    /// Helper: open a DbReader against the same path/object_store and try to
752    /// read a key. Returns `true` if the key is present.
753    async fn reader_can_see(path: &str, object_store: Arc<InMemory>, key: &str) -> bool {
754        reader_can_see_with_merge_op(path, object_store, key, None).await
755    }
756
757    async fn reader_can_see_with_merge_op(
758        path: &str,
759        object_store: Arc<InMemory>,
760        key: &str,
761        merge_op: Option<Arc<dyn SlateDbMergeOperator + Send + Sync>>,
762    ) -> bool {
763        let mut builder = DbReader::builder(path, object_store);
764        if let Some(op) = merge_op {
765            builder = builder.with_merge_operator(op);
766        }
767        let reader = builder.build().await.unwrap();
768        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
769        storage_reader
770            .get(Bytes::from(key.to_owned()))
771            .await
772            .unwrap()
773            .is_some()
774    }
775
776    #[tokio::test]
777    async fn put_defaults_to_not_await_durable() {
778        let object_store = Arc::new(InMemory::new());
779        let path = "/test/put_default_durability";
780
781        let db = DbBuilder::new(path, object_store.clone())
782            .build()
783            .await
784            .unwrap();
785        let storage = SlateDbStorage::new(Arc::new(db));
786
787        // put() uses WriteOptions::default() which is await_durable: false
788        storage
789            .put(vec![
790                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
791            ])
792            .await
793            .unwrap();
794
795        // Data is in memtable only — a reader (which reads from durable state) should NOT see it
796        assert!(!reader_can_see(path, object_store.clone(), "k1").await);
797
798        // After explicit flush, reader can see it
799        storage.flush().await.unwrap();
800        assert!(reader_can_see(path, object_store.clone(), "k1").await);
801
802        storage.close().await.unwrap();
803    }
804
805    #[tokio::test]
806    async fn put_with_await_durable_true_is_visible_to_reader() {
807        let object_store = Arc::new(InMemory::new());
808        let path = "/test/put_durable";
809
810        let db = DbBuilder::new(path, object_store.clone())
811            .build()
812            .await
813            .unwrap();
814        let storage = SlateDbStorage::new(Arc::new(db));
815
816        // Write with await_durable: true — should be flushed before returning
817        storage
818            .put_with_options(
819                vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
820                WriteOptions {
821                    await_durable: true,
822                },
823            )
824            .await
825            .unwrap();
826
827        // Reader should see it immediately without explicit flush
828        assert!(reader_can_see(path, object_store.clone(), "k1").await);
829
830        storage.close().await.unwrap();
831    }
832
833    #[tokio::test]
834    async fn apply_defaults_to_not_await_durable() {
835        let object_store = Arc::new(InMemory::new());
836        let path = "/test/apply_default_durability";
837
838        let db = DbBuilder::new(path, object_store.clone())
839            .build()
840            .await
841            .unwrap();
842        let storage = SlateDbStorage::new(Arc::new(db));
843
844        // apply() delegates with WriteOptions::default() (await_durable: false)
845        storage
846            .apply(vec![RecordOp::Put(
847                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
848            )])
849            .await
850            .unwrap();
851
852        assert!(!reader_can_see(path, object_store.clone(), "k1").await);
853
854        storage.flush().await.unwrap();
855        assert!(reader_can_see(path, object_store.clone(), "k1").await);
856
857        storage.close().await.unwrap();
858    }
859
860    #[tokio::test]
861    async fn apply_with_await_durable_true_is_visible_to_reader() {
862        let object_store = Arc::new(InMemory::new());
863        let path = "/test/apply_durable";
864
865        let db = DbBuilder::new(path, object_store.clone())
866            .build()
867            .await
868            .unwrap();
869        let storage = SlateDbStorage::new(Arc::new(db));
870
871        storage
872            .apply_with_options(
873                vec![RecordOp::Put(
874                    Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
875                )],
876                WriteOptions {
877                    await_durable: true,
878                },
879            )
880            .await
881            .unwrap();
882
883        assert!(reader_can_see(path, object_store.clone(), "k1").await);
884
885        storage.close().await.unwrap();
886    }
887
888    #[tokio::test]
889    async fn merge_defaults_to_not_await_durable() {
890        let object_store = Arc::new(InMemory::new());
891        let path = "/test/merge_default_durability";
892
893        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
894        let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
895        let db = DbBuilder::new(path, object_store.clone())
896            .with_merge_operator(slate_merge_op.clone())
897            .build()
898            .await
899            .unwrap();
900        let storage = SlateDbStorage::new(Arc::new(db));
901
902        // merge() delegates with WriteOptions::default() (await_durable: false)
903        storage
904            .merge(vec![
905                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
906            ])
907            .await
908            .unwrap();
909
910        let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
911            Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
912        assert!(
913            !reader_can_see_with_merge_op(
914                path,
915                object_store.clone(),
916                "k1",
917                Some(reader_merge_op.clone()),
918            )
919            .await
920        );
921
922        storage.flush().await.unwrap();
923        assert!(
924            reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
925                .await
926        );
927
928        storage.close().await.unwrap();
929    }
930
931    #[tokio::test]
932    async fn merge_with_await_durable_true_is_visible_to_reader() {
933        let object_store = Arc::new(InMemory::new());
934        let path = "/test/merge_durable";
935
936        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
937        let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
938        let db = DbBuilder::new(path, object_store.clone())
939            .with_merge_operator(slate_merge_op.clone())
940            .build()
941            .await
942            .unwrap();
943        let storage = SlateDbStorage::new(Arc::new(db));
944
945        storage
946            .merge_with_options(
947                vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
948                WriteOptions {
949                    await_durable: true,
950                },
951            )
952            .await
953            .unwrap();
954
955        let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
956            Arc::new(SlateDbStorage::merge_operator_adapter(merge_op));
957        assert!(
958            reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
959                .await
960        );
961
962        storage.close().await.unwrap();
963    }
964}