Skip to main content

common/storage/
slate.rs

1use std::sync::Arc;
2
3use crate::storage::{MergeOptions, PutOptions};
4use crate::{
5    BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl,
6    storage::{
7        MergeOperator, MergeRecordOp, PutRecordOp, RecordOp, Storage, StorageSnapshot,
8        WriteOptions, WriteResult,
9    },
10};
11use async_trait::async_trait;
12use bytes::Bytes;
13use slatedb::config::ScanOptions;
14use slatedb::{
15    Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
16    MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
17};
18use tokio::sync::watch;
19
20/// Thin wrapper that exposes a SlateDB [`ReadableStat`] as a Prometheus gauge.
21///
22/// Instead of snapshotting stats into an intermediate representation and
23/// refreshing gauges, this reads the live atomic value on each encode/scrape.
24#[cfg(feature = "metrics")]
25#[derive(Debug)]
26struct ReadableStatGauge(std::sync::Arc<dyn slatedb::stats::ReadableStat>);
27
28#[cfg(feature = "metrics")]
29impl prometheus_client::encoding::EncodeMetric for ReadableStatGauge {
30    fn encode(
31        &self,
32        mut encoder: prometheus_client::encoding::MetricEncoder,
33    ) -> Result<(), std::fmt::Error> {
34        encoder.encode_gauge(&self.0.get())
35    }
36
37    fn metric_type(&self) -> prometheus_client::metrics::MetricType {
38        match self.0.metric_type() {
39            slatedb::stats::MetricType::Counter => prometheus_client::metrics::MetricType::Counter,
40            slatedb::stats::MetricType::Gauge => prometheus_client::metrics::MetricType::Gauge,
41        }
42    }
43}
44
45/// Adapter that wraps our `MergeOperator` trait to implement SlateDB's `MergeOperator` trait.
46///
47/// This allows using our common merge operator interface with SlateDB's merge functionality.
48pub struct SlateDbMergeOperatorAdapter {
49    operator: Arc<dyn MergeOperator>,
50}
51
52impl SlateDbMergeOperatorAdapter {
53    fn new(operator: Arc<dyn MergeOperator>) -> Self {
54        Self { operator }
55    }
56}
57
58impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
59    fn merge(
60        &self,
61        key: &Bytes,
62        existing_value: Option<Bytes>,
63        value: Bytes,
64    ) -> Result<Bytes, MergeOperatorError> {
65        Ok(self.operator.merge(key, existing_value, value))
66    }
67
68    fn merge_batch(
69        &self,
70        key: &Bytes,
71        existing_value: Option<Bytes>,
72        operands: &[Bytes],
73    ) -> Result<Bytes, MergeOperatorError> {
74        if operands.is_empty() && existing_value.is_none() {
75            return Err(MergeOperatorError::EmptyBatch);
76        }
77        Ok(self.operator.merge_batch(key, existing_value, operands))
78    }
79}
80
81/// Returns the default scan options used for storage scans.
82fn default_scan_options() -> ScanOptions {
83    ScanOptions {
84        durability_filter: Default::default(),
85        dirty: false,
86        read_ahead_bytes: 1024 * 1024,
87        cache_blocks: true,
88        max_fetch_tasks: 4,
89    }
90}
91
92/// SlateDB-backed implementation of the Storage trait.
93///
94/// SlateDB is an embedded key-value store built on object storage, providing
95/// LSM-tree semantics with cloud-native durability.
96pub struct SlateDbStorage {
97    pub(super) db: Arc<Db>,
98    durable_tx: watch::Sender<u64>,
99    durable_bridge_abort: tokio::task::AbortHandle,
100}
101
102impl SlateDbStorage {
103    /// Creates a new SlateDbStorage instance wrapping the given SlateDB database.
104    pub fn new(db: Arc<Db>) -> Self {
105        let slate_rx = db.subscribe();
106        let (durable_tx, _) = watch::channel(slate_rx.borrow().durable_seq);
107        let task = tokio::spawn({
108            let tx = durable_tx.clone();
109            async move {
110                let mut slate_rx = slate_rx;
111                while slate_rx.changed().await.is_ok() {
112                    let durable_seq = slate_rx.borrow_and_update().durable_seq;
113                    if tx.send(durable_seq).is_err() {
114                        break;
115                    }
116                }
117            }
118        });
119
120        Self {
121            db,
122            durable_tx,
123            durable_bridge_abort: task.abort_handle(),
124        }
125    }
126
127    /// Creates a SlateDB `MergeOperator` from our common `MergeOperator` trait.
128    ///
129    /// This adapter can be used when constructing a SlateDB database with a merge operator:
130    /// ```rust,ignore
131    /// use common::storage::MergeOperator;
132    /// use slatedb::{DbBuilder, object_store::ObjectStore};
133    ///
134    /// let my_merge_op: Arc<dyn MergeOperator> = Arc::new(MyMergeOperator);
135    /// let slate_merge_op = SlateDbStorage::merge_operator_adapter(my_merge_op);
136    ///
137    /// let db = DbBuilder::new("path", object_store)
138    ///     .with_merge_operator(Arc::new(slate_merge_op))
139    ///     .build()
140    ///     .await?;
141    /// ```
142    pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
143        SlateDbMergeOperatorAdapter::new(operator)
144    }
145}
146
147#[async_trait]
148impl StorageRead for SlateDbStorage {
149    /// Retrieves a single record by key from SlateDB.
150    ///
151    /// Returns `None` if the key does not exist.
152    #[tracing::instrument(level = "trace", skip_all)]
153    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
154        let value = self
155            .db
156            .get(&key)
157            .await
158            .map_err(StorageError::from_storage)?;
159
160        match value {
161            Some(v) => Ok(Some(Record::new(key, v))),
162            None => Ok(None),
163        }
164    }
165
166    #[tracing::instrument(level = "trace", skip_all)]
167    async fn scan_iter(
168        &self,
169        range: BytesRange,
170    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
171        let iter = self
172            .db
173            .scan_with_options(range, &default_scan_options())
174            .await
175            .map_err(StorageError::from_storage)?;
176        Ok(Box::new(SlateDbIterator { iter }))
177    }
178}
179
180pub(super) struct SlateDbIterator {
181    iter: DbIterator,
182}
183
184#[async_trait]
185impl StorageIterator for SlateDbIterator {
186    #[tracing::instrument(level = "trace", skip_all)]
187    async fn next(&mut self) -> StorageResult<Option<Record>> {
188        match self.iter.next().await.map_err(StorageError::from_storage)? {
189            Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
190            None => Ok(None),
191        }
192    }
193}
194
195/// SlateDB snapshot wrapper that implements StorageSnapshot.
196///
197/// Provides a consistent read-only view of the database at the time the snapshot was created.
198pub struct SlateDbStorageSnapshot {
199    snapshot: Arc<DbSnapshot>,
200}
201
202#[async_trait]
203impl StorageRead for SlateDbStorageSnapshot {
204    #[tracing::instrument(level = "trace", skip_all)]
205    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
206        let value = self
207            .snapshot
208            .get(&key)
209            .await
210            .map_err(StorageError::from_storage)?;
211
212        match value {
213            Some(v) => Ok(Some(Record::new(key, v))),
214            None => Ok(None),
215        }
216    }
217
218    #[tracing::instrument(level = "trace", skip_all)]
219    async fn scan_iter(
220        &self,
221        range: BytesRange,
222    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
223        let iter = self
224            .snapshot
225            .scan_with_options(range, &default_scan_options())
226            .await
227            .map_err(StorageError::from_storage)?;
228        Ok(Box::new(SlateDbIterator { iter }))
229    }
230}
231
232#[async_trait]
233impl StorageSnapshot for SlateDbStorageSnapshot {}
234
235#[async_trait]
236impl Storage for SlateDbStorage {
237    async fn apply_with_options(
238        &self,
239        records: Vec<RecordOp>,
240        options: WriteOptions,
241    ) -> StorageResult<WriteResult> {
242        let mut batch = WriteBatch::new();
243        for op in records {
244            match op {
245                RecordOp::Put(op) => {
246                    batch.put_with_options(op.record.key, op.record.value, &op.options.into())
247                }
248                RecordOp::Merge(op) => {
249                    batch.merge_with_options(op.record.key, op.record.value, &op.options.into())
250                }
251                RecordOp::Delete(key) => batch.delete(key),
252            }
253        }
254        let slate_options = SlateDbWriteOptions {
255            await_durable: options.await_durable,
256        };
257        let write_handle = self
258            .db
259            .write_with_options(batch, &slate_options)
260            .await
261            .map_err(StorageError::from_storage)?;
262        Ok(WriteResult {
263            seqnum: write_handle.seqnum(),
264        })
265    }
266
267    async fn put_with_options(
268        &self,
269        records: Vec<PutRecordOp>,
270        options: WriteOptions,
271    ) -> StorageResult<WriteResult> {
272        let mut batch = WriteBatch::new();
273        for op in records {
274            batch.put_with_options(op.record.key, op.record.value, &op.options.into());
275        }
276        let slate_options = SlateDbWriteOptions {
277            await_durable: options.await_durable,
278        };
279        let write_handle = self
280            .db
281            .write_with_options(batch, &slate_options)
282            .await
283            .map_err(StorageError::from_storage)?;
284        Ok(WriteResult {
285            seqnum: write_handle.seqnum(),
286        })
287    }
288
289    async fn merge_with_options(
290        &self,
291        records: Vec<MergeRecordOp>,
292        options: WriteOptions,
293    ) -> StorageResult<WriteResult> {
294        let mut batch = WriteBatch::new();
295        for op in records {
296            batch.merge_with_options(op.record.key, op.record.value, &op.options.into());
297        }
298        let slate_options = SlateDbWriteOptions {
299            await_durable: options.await_durable,
300        };
301        let write_handle = self
302            .db
303            .write_with_options(batch, &slate_options)
304            .await
305            .map_err(|e| {
306                let error_msg = e.to_string();
307                if error_msg.contains("merge operator") || error_msg.contains("not configured") {
308                    StorageError::Storage(
309                        "Merge operator not configured for this database".to_string(),
310                    )
311                } else {
312                    StorageError::from_storage(e)
313                }
314            })?;
315        Ok(WriteResult {
316            seqnum: write_handle.seqnum(),
317        })
318    }
319
320    fn subscribe_durable(&self) -> watch::Receiver<u64> {
321        self.durable_tx.subscribe()
322    }
323
324    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
325        let snapshot = self
326            .db
327            .snapshot()
328            .await
329            .map_err(StorageError::from_storage)?;
330        Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
331    }
332
333    async fn flush(&self) -> StorageResult<()> {
334        self.db.flush().await.map_err(StorageError::from_storage)?;
335        Ok(())
336    }
337
338    async fn close(&self) -> StorageResult<()> {
339        // Stop durable bridge first so no status subscriber outlives DB close.
340        self.durable_bridge_abort.abort();
341        self.db.close().await.map_err(StorageError::from_storage)?;
342        Ok(())
343    }
344
345    #[cfg(feature = "metrics")]
346    fn register_metrics(&self, registry: &mut prometheus_client::registry::Registry) {
347        let stat_registry = self.db.metrics();
348        let mut seen = std::collections::HashSet::new();
349        for name in stat_registry.names() {
350            if let Some(stat) = stat_registry.lookup(name) {
351                let sanitized: String = name
352                    .chars()
353                    .map(|c| {
354                        if c.is_ascii_alphanumeric() || c == '_' {
355                            c
356                        } else {
357                            '_'
358                        }
359                    })
360                    .collect();
361                let prom_name = format!("slatedb_{sanitized}");
362                if !seen.insert(prom_name.clone()) {
363                    tracing::warn!(
364                        "Duplicate metric name after sanitization: {prom_name:?} (from {name:?}, skipped)"
365                    );
366                    continue;
367                }
368                registry.register(
369                    &prom_name,
370                    format!("SlateDB {name}"),
371                    ReadableStatGauge(stat),
372                );
373            }
374        }
375    }
376}
377
378impl From<Ttl> for slatedb::config::Ttl {
379    fn from(value: Ttl) -> Self {
380        match value {
381            Ttl::Default => slatedb::config::Ttl::Default,
382            Ttl::NoExpiry => slatedb::config::Ttl::NoExpiry,
383            Ttl::ExpireAfter(ts) => slatedb::config::Ttl::ExpireAfter(ts),
384        }
385    }
386}
387
388impl From<PutOptions> for slatedb::config::PutOptions {
389    fn from(value: PutOptions) -> Self {
390        Self {
391            ttl: value.ttl.into(),
392        }
393    }
394}
395
396impl From<MergeOptions> for slatedb::config::MergeOptions {
397    fn from(value: MergeOptions) -> Self {
398        Self {
399            ttl: value.ttl.into(),
400        }
401    }
402}
403
404/// Read-only SlateDB storage using `DbReader`.
405///
406/// This struct provides read-only access to a SlateDB database without fencing,
407/// allowing multiple readers to coexist with a single writer.
408pub struct SlateDbStorageReader {
409    reader: Arc<DbReader>,
410}
411
412impl SlateDbStorageReader {
413    /// Creates a new SlateDbStorageReader wrapping the given DbReader.
414    pub fn new(reader: Arc<DbReader>) -> Self {
415        Self { reader }
416    }
417}
418
419#[async_trait]
420impl StorageRead for SlateDbStorageReader {
421    #[tracing::instrument(level = "trace", skip_all)]
422    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
423        let value = self
424            .reader
425            .get(&key)
426            .await
427            .map_err(StorageError::from_storage)?;
428
429        match value {
430            Some(v) => Ok(Some(Record::new(key, v))),
431            None => Ok(None),
432        }
433    }
434
435    #[tracing::instrument(level = "trace", skip_all)]
436    async fn scan_iter(
437        &self,
438        range: BytesRange,
439    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
440        let iter = self
441            .reader
442            .scan_with_options(range, &default_scan_options())
443            .await
444            .map_err(StorageError::from_storage)?;
445        Ok(Box::new(SlateDbIterator { iter }))
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452    use crate::BytesRange;
453    use slatedb::DbBuilder;
454    use slatedb::config::{DbReaderOptions, Settings};
455    use slatedb::object_store::memory::InMemory;
456    use slatedb_common::clock::MockSystemClock;
457
458    #[tokio::test]
459    async fn should_read_data_written_by_storage_via_reader() {
460        let object_store = Arc::new(InMemory::new());
461        let path = "/test/db";
462
463        // Create writer and write data
464        let db = DbBuilder::new(path, object_store.clone())
465            .build()
466            .await
467            .unwrap();
468        let storage = SlateDbStorage::new(Arc::new(db));
469
470        storage
471            .put(vec![
472                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
473                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
474            ])
475            .await
476            .unwrap();
477        storage.flush().await.unwrap();
478
479        // Create reader and verify data
480        let reader = DbReader::open(path, object_store, None, Default::default())
481            .await
482            .unwrap();
483        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
484
485        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
486        assert!(record.is_some());
487        assert_eq!(record.unwrap().value, Bytes::from("value1"));
488
489        let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
490        assert!(record.is_some());
491        assert_eq!(record.unwrap().value, Bytes::from("value2"));
492
493        let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
494        assert!(record.is_none());
495
496        storage.close().await.unwrap();
497    }
498
499    #[tokio::test]
500    async fn should_scan_data_written_by_storage_via_reader() {
501        let object_store = Arc::new(InMemory::new());
502        let path = "/test/db";
503
504        // Create writer and write data
505        let db = DbBuilder::new(path, object_store.clone())
506            .build()
507            .await
508            .unwrap();
509        let storage = SlateDbStorage::new(Arc::new(db));
510
511        storage
512            .put(vec![
513                Record::new(Bytes::from("a"), Bytes::from("1")).into(),
514                Record::new(Bytes::from("b"), Bytes::from("2")).into(),
515                Record::new(Bytes::from("c"), Bytes::from("3")).into(),
516            ])
517            .await
518            .unwrap();
519        storage.flush().await.unwrap();
520
521        // Create reader and scan data
522        let reader = DbReader::open(path, object_store, None, Default::default())
523            .await
524            .unwrap();
525        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
526
527        let mut iter = storage_reader
528            .scan_iter(BytesRange::unbounded())
529            .await
530            .unwrap();
531        let mut results = Vec::new();
532        while let Some(record) = iter.next().await.unwrap() {
533            results.push((record.key, record.value));
534        }
535
536        assert_eq!(results.len(), 3);
537        assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
538        assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
539        assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
540
541        storage.close().await.unwrap();
542    }
543
544    #[tokio::test]
545    async fn should_coexist_writer_and_reader_without_fencing_error() {
546        let object_store = Arc::new(InMemory::new());
547        let path = "/test/db";
548
549        // Create writer
550        let db = DbBuilder::new(path, object_store.clone())
551            .build()
552            .await
553            .unwrap();
554        let storage = SlateDbStorage::new(Arc::new(db));
555
556        // Write initial data
557        storage
558            .put(vec![
559                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
560            ])
561            .await
562            .unwrap();
563        storage.flush().await.unwrap();
564
565        // Create reader while writer is still open - this should NOT cause fencing error
566        let reader = DbReader::open(path, object_store, None, Default::default())
567            .await
568            .unwrap();
569        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
570
571        // Reader can read the data
572        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
573        assert!(record.is_some());
574        assert_eq!(record.unwrap().value, Bytes::from("value1"));
575
576        // Writer can still write more data
577        storage
578            .put(vec![
579                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
580            ])
581            .await
582            .unwrap();
583        storage.flush().await.unwrap();
584
585        storage.close().await.unwrap();
586    }
587
588    #[tokio::test]
589    async fn should_expire_records_based_on_ttl() {
590        // given - storage configured with a 30 second default TTL
591        let object_store = Arc::new(InMemory::new());
592        let path = "/test/ttl_db";
593        let clock = Arc::new(MockSystemClock::new());
594
595        let db = DbBuilder::new(path, object_store)
596            .with_settings(Settings {
597                default_ttl: Some(30_000),
598                ..Default::default()
599            })
600            .with_system_clock(clock.clone())
601            .build()
602            .await
603            .unwrap();
604        let storage = SlateDbStorage::new(Arc::new(db));
605
606        // Write three keys at time=0:
607        //   key1: expires after 20 seconds
608        //   key2: uses default TTL (30 seconds)
609        //   key3: never expires
610        storage
611            .put(vec![
612                PutRecordOp::new_with_options(
613                    Record::new(Bytes::from("key1"), Bytes::from("value1")),
614                    PutOptions {
615                        ttl: Ttl::ExpireAfter(20_000),
616                    },
617                ),
618                PutRecordOp::new_with_options(
619                    Record::new(Bytes::from("key2"), Bytes::from("value2")),
620                    PutOptions { ttl: Ttl::Default },
621                ),
622                PutRecordOp::new_with_options(
623                    Record::new(Bytes::from("key3"), Bytes::from("value3")),
624                    PutOptions { ttl: Ttl::NoExpiry },
625                ),
626            ])
627            .await
628            .unwrap();
629
630        // then - all three keys are present at time=0
631        assert!(storage.get(Bytes::from("key1")).await.unwrap().is_some());
632        assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
633        assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
634
635        // when - advance to 25 seconds
636        clock.set(25_000);
637
638        // then - key1 (20s TTL) expired; key2 (30s default) and key3 (no expiry) still present
639        assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
640        assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
641        assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
642
643        // when - advance to 35 seconds
644        clock.set(35_000);
645
646        // then - key1 and key2 expired; key3 (no expiry) still present with correct value
647        assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
648        assert!(storage.get(Bytes::from("key2")).await.unwrap().is_none());
649        let record = storage.get(Bytes::from("key3")).await.unwrap();
650        assert!(record.is_some());
651        assert_eq!(record.unwrap().value, Bytes::from("value3"));
652
653        storage.close().await.unwrap();
654    }
655
656    /// Simple merge operator that concatenates existing and new values.
657    struct ConcatMergeOperator;
658
659    impl MergeOperator for ConcatMergeOperator {
660        fn merge(&self, _key: &Bytes, existing_value: Option<Bytes>, new_value: Bytes) -> Bytes {
661            match existing_value {
662                Some(existing) => {
663                    let mut result = Vec::with_capacity(existing.len() + new_value.len());
664                    result.extend_from_slice(&existing);
665                    result.extend_from_slice(&new_value);
666                    Bytes::from(result)
667                }
668                None => new_value,
669            }
670        }
671    }
672
673    #[tokio::test]
674    async fn should_expire_merge_records_based_on_ttl() {
675        // given - storage configured with a 30 second default TTL and a merge operator
676        let object_store = Arc::new(InMemory::new());
677        let path = "/test/merge_ttl_db";
678        let clock = Arc::new(MockSystemClock::new());
679
680        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
681        let slate_merge_op = SlateDbStorage::merge_operator_adapter(merge_op);
682        let db = DbBuilder::new(path, object_store)
683            .with_settings(Settings {
684                default_ttl: Some(30_000),
685                ..Default::default()
686            })
687            .with_system_clock(clock.clone())
688            .with_merge_operator(Arc::new(slate_merge_op))
689            .build()
690            .await
691            .unwrap();
692        let storage = SlateDbStorage::new(Arc::new(db));
693
694        // Merge three keys at time=0:
695        //   key1: expires after 20 seconds
696        //   key2: uses default TTL (30 seconds)
697        //   key3: never expires
698        storage
699            .merge(vec![
700                MergeRecordOp::new_with_ttl(
701                    Record::new(Bytes::from("key1"), Bytes::from("v1")),
702                    MergeOptions {
703                        ttl: Ttl::ExpireAfter(20_000),
704                    },
705                ),
706                MergeRecordOp::new_with_ttl(
707                    Record::new(Bytes::from("key2"), Bytes::from("v2")),
708                    MergeOptions { ttl: Ttl::Default },
709                ),
710                MergeRecordOp::new_with_ttl(
711                    Record::new(Bytes::from("key3"), Bytes::from("v3")),
712                    MergeOptions { ttl: Ttl::NoExpiry },
713                ),
714            ])
715            .await
716            .unwrap();
717
718        // then - all three keys are present at time=0
719        assert_eq!(
720            storage
721                .get(Bytes::from("key1"))
722                .await
723                .unwrap()
724                .unwrap()
725                .value,
726            Bytes::from("v1")
727        );
728        assert_eq!(
729            storage
730                .get(Bytes::from("key2"))
731                .await
732                .unwrap()
733                .unwrap()
734                .value,
735            Bytes::from("v2")
736        );
737        assert_eq!(
738            storage
739                .get(Bytes::from("key3"))
740                .await
741                .unwrap()
742                .unwrap()
743                .value,
744            Bytes::from("v3")
745        );
746
747        // when - advance to 25 seconds
748        clock.set(25_000);
749
750        // then - key1 (20s TTL) expired; key2 (30s default) and key3 (no expiry) still present
751        assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
752        assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
753        assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
754
755        // when - advance to 35 seconds
756        clock.set(35_000);
757
758        // then - key1 and key2 expired; key3 (no expiry) still present with correct value
759        assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
760        assert!(storage.get(Bytes::from("key2")).await.unwrap().is_none());
761        let record = storage.get(Bytes::from("key3")).await.unwrap();
762        assert!(record.is_some());
763        assert_eq!(record.unwrap().value, Bytes::from("v3"));
764
765        storage.close().await.unwrap();
766    }
767
768    /// Helper: open a DbReader against the same path/object_store and try to
769    /// read a key. Returns `true` if the key is present.
770    async fn reader_can_see(path: &str, object_store: Arc<InMemory>, key: &str) -> bool {
771        reader_can_see_with_merge_op(path, object_store, key, None).await
772    }
773
774    async fn reader_can_see_with_merge_op(
775        path: &str,
776        object_store: Arc<InMemory>,
777        key: &str,
778        merge_op: Option<Arc<dyn SlateDbMergeOperator + Send + Sync>>,
779    ) -> bool {
780        let options = DbReaderOptions {
781            merge_operator: merge_op,
782            ..Default::default()
783        };
784        let reader = DbReader::open(path, object_store, None, options)
785            .await
786            .unwrap();
787        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
788        storage_reader
789            .get(Bytes::from(key.to_owned()))
790            .await
791            .unwrap()
792            .is_some()
793    }
794
795    #[tokio::test]
796    async fn put_defaults_to_not_await_durable() {
797        let object_store = Arc::new(InMemory::new());
798        let path = "/test/put_default_durability";
799
800        let db = DbBuilder::new(path, object_store.clone())
801            .build()
802            .await
803            .unwrap();
804        let storage = SlateDbStorage::new(Arc::new(db));
805
806        // put() uses WriteOptions::default() which is await_durable: false
807        storage
808            .put(vec![
809                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
810            ])
811            .await
812            .unwrap();
813
814        // Data is in memtable only — a reader (which reads from durable state) should NOT see it
815        assert!(!reader_can_see(path, object_store.clone(), "k1").await);
816
817        // After explicit flush, reader can see it
818        storage.flush().await.unwrap();
819        assert!(reader_can_see(path, object_store.clone(), "k1").await);
820
821        storage.close().await.unwrap();
822    }
823
824    #[tokio::test]
825    async fn put_with_await_durable_true_is_visible_to_reader() {
826        let object_store = Arc::new(InMemory::new());
827        let path = "/test/put_durable";
828
829        let db = DbBuilder::new(path, object_store.clone())
830            .build()
831            .await
832            .unwrap();
833        let storage = SlateDbStorage::new(Arc::new(db));
834
835        // Write with await_durable: true — should be flushed before returning
836        storage
837            .put_with_options(
838                vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
839                WriteOptions {
840                    await_durable: true,
841                },
842            )
843            .await
844            .unwrap();
845
846        // Reader should see it immediately without explicit flush
847        assert!(reader_can_see(path, object_store.clone(), "k1").await);
848
849        storage.close().await.unwrap();
850    }
851
852    #[tokio::test]
853    async fn apply_defaults_to_not_await_durable() {
854        let object_store = Arc::new(InMemory::new());
855        let path = "/test/apply_default_durability";
856
857        let db = DbBuilder::new(path, object_store.clone())
858            .build()
859            .await
860            .unwrap();
861        let storage = SlateDbStorage::new(Arc::new(db));
862
863        // apply() delegates with WriteOptions::default() (await_durable: false)
864        storage
865            .apply(vec![RecordOp::Put(
866                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
867            )])
868            .await
869            .unwrap();
870
871        assert!(!reader_can_see(path, object_store.clone(), "k1").await);
872
873        storage.flush().await.unwrap();
874        assert!(reader_can_see(path, object_store.clone(), "k1").await);
875
876        storage.close().await.unwrap();
877    }
878
879    #[tokio::test]
880    async fn apply_with_await_durable_true_is_visible_to_reader() {
881        let object_store = Arc::new(InMemory::new());
882        let path = "/test/apply_durable";
883
884        let db = DbBuilder::new(path, object_store.clone())
885            .build()
886            .await
887            .unwrap();
888        let storage = SlateDbStorage::new(Arc::new(db));
889
890        storage
891            .apply_with_options(
892                vec![RecordOp::Put(
893                    Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
894                )],
895                WriteOptions {
896                    await_durable: true,
897                },
898            )
899            .await
900            .unwrap();
901
902        assert!(reader_can_see(path, object_store.clone(), "k1").await);
903
904        storage.close().await.unwrap();
905    }
906
907    #[tokio::test]
908    async fn merge_defaults_to_not_await_durable() {
909        let object_store = Arc::new(InMemory::new());
910        let path = "/test/merge_default_durability";
911
912        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
913        let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
914        let db = DbBuilder::new(path, object_store.clone())
915            .with_merge_operator(slate_merge_op.clone())
916            .build()
917            .await
918            .unwrap();
919        let storage = SlateDbStorage::new(Arc::new(db));
920
921        // merge() delegates with WriteOptions::default() (await_durable: false)
922        storage
923            .merge(vec![
924                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
925            ])
926            .await
927            .unwrap();
928
929        let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
930            Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
931        assert!(
932            !reader_can_see_with_merge_op(
933                path,
934                object_store.clone(),
935                "k1",
936                Some(reader_merge_op.clone()),
937            )
938            .await
939        );
940
941        storage.flush().await.unwrap();
942        assert!(
943            reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
944                .await
945        );
946
947        storage.close().await.unwrap();
948    }
949
950    #[tokio::test]
951    async fn merge_with_await_durable_true_is_visible_to_reader() {
952        let object_store = Arc::new(InMemory::new());
953        let path = "/test/merge_durable";
954
955        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
956        let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
957        let db = DbBuilder::new(path, object_store.clone())
958            .with_merge_operator(slate_merge_op.clone())
959            .build()
960            .await
961            .unwrap();
962        let storage = SlateDbStorage::new(Arc::new(db));
963
964        storage
965            .merge_with_options(
966                vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
967                WriteOptions {
968                    await_durable: true,
969                },
970            )
971            .await
972            .unwrap();
973
974        let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
975            Arc::new(SlateDbStorage::merge_operator_adapter(merge_op));
976        assert!(
977            reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
978                .await
979        );
980
981        storage.close().await.unwrap();
982    }
983}
984
985#[cfg(all(test, feature = "metrics"))]
986mod metrics_tests {
987    use super::ReadableStatGauge;
988    use prometheus_client::encoding::EncodeMetric;
989    use slatedb::stats::{MetricType as SlateMetricType, ReadableStat};
990    use std::sync::Arc;
991
992    #[derive(Debug)]
993    struct MockStat {
994        metric_type: SlateMetricType,
995    }
996
997    impl ReadableStat for MockStat {
998        fn get(&self) -> i64 {
999            0
1000        }
1001
1002        fn metric_type(&self) -> SlateMetricType {
1003            self.metric_type
1004        }
1005    }
1006
1007    #[test]
1008    fn should_return_counter_when_slate_metric_type_is_counter() {
1009        // given
1010        let stat = Arc::new(MockStat {
1011            metric_type: SlateMetricType::Counter,
1012        });
1013        let gauge = ReadableStatGauge(stat);
1014
1015        // when
1016        let result = gauge.metric_type();
1017
1018        // then
1019        assert!(matches!(
1020            result,
1021            prometheus_client::metrics::MetricType::Counter,
1022        ));
1023    }
1024
1025    #[test]
1026    fn should_return_gauge_when_slate_metric_type_is_gauge() {
1027        // given
1028        let stat = Arc::new(MockStat {
1029            metric_type: SlateMetricType::Gauge,
1030        });
1031        let gauge = ReadableStatGauge(stat);
1032
1033        // when
1034        let result = gauge.metric_type();
1035
1036        // then
1037        assert!(matches!(
1038            result,
1039            prometheus_client::metrics::MetricType::Gauge,
1040        ));
1041    }
1042}