Skip to main content

common/storage/
slate.rs

1use std::sync::Arc;
2
3use crate::storage::{MergeOptions, PutOptions};
4use crate::{
5    BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl,
6    storage::{
7        MergeOperator, MergeRecordOp, PutRecordOp, RecordOp, Storage, StorageSnapshot,
8        WriteOptions, WriteResult,
9    },
10};
11use async_trait::async_trait;
12use bytes::Bytes;
13use slatedb::config::ScanOptions;
14use slatedb::{
15    Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
16    MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
17};
18use tokio::sync::watch;
19
20/// Thin wrapper that exposes a SlateDB [`ReadableStat`] as a Prometheus gauge.
21///
22/// Instead of snapshotting stats into an intermediate representation and
23/// refreshing gauges, this reads the live atomic value on each encode/scrape.
24#[cfg(feature = "metrics")]
25#[derive(Debug)]
26struct ReadableStatGauge(std::sync::Arc<dyn slatedb::stats::ReadableStat>);
27
28#[cfg(feature = "metrics")]
29impl prometheus_client::encoding::EncodeMetric for ReadableStatGauge {
30    fn encode(
31        &self,
32        mut encoder: prometheus_client::encoding::MetricEncoder,
33    ) -> Result<(), std::fmt::Error> {
34        encoder.encode_gauge(&self.0.get())
35    }
36
37    fn metric_type(&self) -> prometheus_client::metrics::MetricType {
38        match self.0.metric_type() {
39            slatedb::stats::MetricType::Counter => prometheus_client::metrics::MetricType::Counter,
40            slatedb::stats::MetricType::Gauge => prometheus_client::metrics::MetricType::Gauge,
41        }
42    }
43}
44
45/// Adapter that wraps our `MergeOperator` trait to implement SlateDB's `MergeOperator` trait.
46///
47/// This allows using our common merge operator interface with SlateDB's merge functionality.
48pub struct SlateDbMergeOperatorAdapter {
49    operator: Arc<dyn MergeOperator>,
50}
51
52impl SlateDbMergeOperatorAdapter {
53    fn new(operator: Arc<dyn MergeOperator>) -> Self {
54        Self { operator }
55    }
56}
57
58impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
59    fn merge(
60        &self,
61        key: &Bytes,
62        existing_value: Option<Bytes>,
63        value: Bytes,
64    ) -> Result<Bytes, MergeOperatorError> {
65        Ok(self.operator.merge_batch(key, existing_value, &[value]))
66    }
67
68    fn merge_batch(
69        &self,
70        key: &Bytes,
71        existing_value: Option<Bytes>,
72        operands: &[Bytes],
73    ) -> Result<Bytes, MergeOperatorError> {
74        if operands.is_empty() && existing_value.is_none() {
75            return Err(MergeOperatorError::EmptyBatch);
76        }
77        Ok(self.operator.merge_batch(key, existing_value, operands))
78    }
79}
80
81/// Returns the default scan options used for storage scans.
82fn default_scan_options() -> ScanOptions {
83    ScanOptions {
84        durability_filter: Default::default(),
85        dirty: false,
86        read_ahead_bytes: 1024 * 1024,
87        cache_blocks: true,
88        max_fetch_tasks: 4,
89    }
90}
91
92/// SlateDB-backed implementation of the Storage trait.
93///
94/// SlateDB is an embedded key-value store built on object storage, providing
95/// LSM-tree semantics with cloud-native durability.
96pub struct SlateDbStorage {
97    pub(super) db: Arc<Db>,
98    durable_tx: watch::Sender<u64>,
99    durable_bridge_abort: tokio::task::AbortHandle,
100}
101
102impl SlateDbStorage {
103    /// Creates a new SlateDbStorage instance wrapping the given SlateDB database.
104    pub fn new(db: Arc<Db>) -> Self {
105        let slate_rx = db.subscribe();
106        let (durable_tx, _) = watch::channel(slate_rx.borrow().durable_seq);
107        let task = tokio::spawn({
108            let tx = durable_tx.clone();
109            async move {
110                let mut slate_rx = slate_rx;
111                while slate_rx.changed().await.is_ok() {
112                    let durable_seq = slate_rx.borrow_and_update().durable_seq;
113                    if tx.send(durable_seq).is_err() {
114                        break;
115                    }
116                }
117            }
118        });
119
120        Self {
121            db,
122            durable_tx,
123            durable_bridge_abort: task.abort_handle(),
124        }
125    }
126
127    /// Creates a SlateDB `MergeOperator` from our common `MergeOperator` trait.
128    ///
129    /// This adapter can be used when constructing a SlateDB database with a merge operator:
130    /// ```rust,ignore
131    /// use common::storage::MergeOperator;
132    /// use slatedb::{DbBuilder, object_store::ObjectStore};
133    ///
134    /// let my_merge_op: Arc<dyn MergeOperator> = Arc::new(MyMergeOperator);
135    /// let slate_merge_op = SlateDbStorage::merge_operator_adapter(my_merge_op);
136    ///
137    /// let db = DbBuilder::new("path", object_store)
138    ///     .with_merge_operator(Arc::new(slate_merge_op))
139    ///     .build()
140    ///     .await?;
141    /// ```
142    pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
143        SlateDbMergeOperatorAdapter::new(operator)
144    }
145}
146
147#[async_trait]
148impl StorageRead for SlateDbStorage {
149    /// Retrieves a single record by key from SlateDB.
150    ///
151    /// Returns `None` if the key does not exist.
152    #[tracing::instrument(level = "trace", skip_all)]
153    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
154        let value = self
155            .db
156            .get(&key)
157            .await
158            .map_err(StorageError::from_storage)?;
159
160        match value {
161            Some(v) => Ok(Some(Record::new(key, v))),
162            None => Ok(None),
163        }
164    }
165
166    #[tracing::instrument(level = "trace", skip_all)]
167    async fn scan_iter(
168        &self,
169        range: BytesRange,
170    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
171        let iter = self
172            .db
173            .scan_with_options(range, &default_scan_options())
174            .await
175            .map_err(StorageError::from_storage)?;
176        Ok(Box::new(SlateDbIterator { iter }))
177    }
178}
179
180pub(super) struct SlateDbIterator {
181    iter: DbIterator,
182}
183
184#[async_trait]
185impl StorageIterator for SlateDbIterator {
186    #[tracing::instrument(level = "trace", skip_all)]
187    async fn next(&mut self) -> StorageResult<Option<Record>> {
188        match self.iter.next().await.map_err(StorageError::from_storage)? {
189            Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
190            None => Ok(None),
191        }
192    }
193}
194
195/// SlateDB snapshot wrapper that implements StorageSnapshot.
196///
197/// Provides a consistent read-only view of the database at the time the snapshot was created.
198pub struct SlateDbStorageSnapshot {
199    snapshot: Arc<DbSnapshot>,
200}
201
202#[async_trait]
203impl StorageRead for SlateDbStorageSnapshot {
204    #[tracing::instrument(level = "trace", skip_all)]
205    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
206        let value = self
207            .snapshot
208            .get(&key)
209            .await
210            .map_err(StorageError::from_storage)?;
211
212        match value {
213            Some(v) => Ok(Some(Record::new(key, v))),
214            None => Ok(None),
215        }
216    }
217
218    #[tracing::instrument(level = "trace", skip_all)]
219    async fn scan_iter(
220        &self,
221        range: BytesRange,
222    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
223        let iter = self
224            .snapshot
225            .scan_with_options(range, &default_scan_options())
226            .await
227            .map_err(StorageError::from_storage)?;
228        Ok(Box::new(SlateDbIterator { iter }))
229    }
230}
231
232#[async_trait]
233impl StorageSnapshot for SlateDbStorageSnapshot {}
234
235#[async_trait]
236impl Storage for SlateDbStorage {
237    async fn apply_with_options(
238        &self,
239        records: Vec<RecordOp>,
240        options: WriteOptions,
241    ) -> StorageResult<WriteResult> {
242        let mut batch = WriteBatch::new();
243        for op in records {
244            match op {
245                RecordOp::Put(op) => {
246                    batch.put_with_options(op.record.key, op.record.value, &op.options.into())
247                }
248                RecordOp::Merge(op) => {
249                    batch.merge_with_options(op.record.key, op.record.value, &op.options.into())
250                }
251                RecordOp::Delete(key) => batch.delete(key),
252            }
253        }
254        let slate_options = SlateDbWriteOptions {
255            await_durable: options.await_durable,
256        };
257        let write_handle = self
258            .db
259            .write_with_options(batch, &slate_options)
260            .await
261            .map_err(StorageError::from_storage)?;
262        Ok(WriteResult {
263            seqnum: write_handle.seqnum(),
264        })
265    }
266
267    async fn put_with_options(
268        &self,
269        records: Vec<PutRecordOp>,
270        options: WriteOptions,
271    ) -> StorageResult<WriteResult> {
272        let mut batch = WriteBatch::new();
273        for op in records {
274            batch.put_with_options(op.record.key, op.record.value, &op.options.into());
275        }
276        let slate_options = SlateDbWriteOptions {
277            await_durable: options.await_durable,
278        };
279        let write_handle = self
280            .db
281            .write_with_options(batch, &slate_options)
282            .await
283            .map_err(StorageError::from_storage)?;
284        Ok(WriteResult {
285            seqnum: write_handle.seqnum(),
286        })
287    }
288
289    async fn merge_with_options(
290        &self,
291        records: Vec<MergeRecordOp>,
292        options: WriteOptions,
293    ) -> StorageResult<WriteResult> {
294        let mut batch = WriteBatch::new();
295        for op in records {
296            batch.merge_with_options(op.record.key, op.record.value, &op.options.into());
297        }
298        let slate_options = SlateDbWriteOptions {
299            await_durable: options.await_durable,
300        };
301        let write_handle = self
302            .db
303            .write_with_options(batch, &slate_options)
304            .await
305            .map_err(|e| {
306                let error_msg = e.to_string();
307                if error_msg.contains("merge operator") || error_msg.contains("not configured") {
308                    StorageError::Storage(
309                        "Merge operator not configured for this database".to_string(),
310                    )
311                } else {
312                    StorageError::from_storage(e)
313                }
314            })?;
315        Ok(WriteResult {
316            seqnum: write_handle.seqnum(),
317        })
318    }
319
320    fn subscribe_durable(&self) -> watch::Receiver<u64> {
321        self.durable_tx.subscribe()
322    }
323
324    async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
325        let snapshot = self
326            .db
327            .snapshot()
328            .await
329            .map_err(StorageError::from_storage)?;
330        Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
331    }
332
333    async fn flush(&self) -> StorageResult<()> {
334        self.db.flush().await.map_err(StorageError::from_storage)?;
335        Ok(())
336    }
337
338    async fn close(&self) -> StorageResult<()> {
339        // Stop durable bridge first so no status subscriber outlives DB close.
340        self.durable_bridge_abort.abort();
341        self.db.close().await.map_err(StorageError::from_storage)?;
342        Ok(())
343    }
344
345    #[cfg(feature = "metrics")]
346    fn register_metrics(&self, registry: &mut prometheus_client::registry::Registry) {
347        let stat_registry = self.db.metrics();
348        let mut seen = std::collections::HashSet::new();
349        for name in stat_registry.names() {
350            if let Some(stat) = stat_registry.lookup(name) {
351                let sanitized: String = name
352                    .chars()
353                    .map(|c| {
354                        if c.is_ascii_alphanumeric() || c == '_' {
355                            c
356                        } else {
357                            '_'
358                        }
359                    })
360                    .collect();
361                let prom_name = format!("slatedb_{sanitized}");
362                if !seen.insert(prom_name.clone()) {
363                    tracing::warn!(
364                        "Duplicate metric name after sanitization: {prom_name:?} (from {name:?}, skipped)"
365                    );
366                    continue;
367                }
368                registry.register(
369                    &prom_name,
370                    format!("SlateDB {name}"),
371                    ReadableStatGauge(stat),
372                );
373            }
374        }
375    }
376}
377
378impl From<Ttl> for slatedb::config::Ttl {
379    fn from(value: Ttl) -> Self {
380        match value {
381            Ttl::Default => slatedb::config::Ttl::Default,
382            Ttl::NoExpiry => slatedb::config::Ttl::NoExpiry,
383            Ttl::ExpireAfter(ts) => slatedb::config::Ttl::ExpireAfter(ts),
384        }
385    }
386}
387
388impl From<PutOptions> for slatedb::config::PutOptions {
389    fn from(value: PutOptions) -> Self {
390        Self {
391            ttl: value.ttl.into(),
392        }
393    }
394}
395
396impl From<MergeOptions> for slatedb::config::MergeOptions {
397    fn from(value: MergeOptions) -> Self {
398        Self {
399            ttl: value.ttl.into(),
400        }
401    }
402}
403
404/// Read-only SlateDB storage using `DbReader`.
405///
406/// This struct provides read-only access to a SlateDB database without fencing,
407/// allowing multiple readers to coexist with a single writer.
408pub struct SlateDbStorageReader {
409    reader: Arc<DbReader>,
410}
411
412impl SlateDbStorageReader {
413    /// Creates a new SlateDbStorageReader wrapping the given DbReader.
414    pub fn new(reader: Arc<DbReader>) -> Self {
415        Self { reader }
416    }
417}
418
419#[async_trait]
420impl StorageRead for SlateDbStorageReader {
421    #[tracing::instrument(level = "trace", skip_all)]
422    async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
423        let value = self
424            .reader
425            .get(&key)
426            .await
427            .map_err(StorageError::from_storage)?;
428
429        match value {
430            Some(v) => Ok(Some(Record::new(key, v))),
431            None => Ok(None),
432        }
433    }
434
435    #[tracing::instrument(level = "trace", skip_all)]
436    async fn scan_iter(
437        &self,
438        range: BytesRange,
439    ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
440        let iter = self
441            .reader
442            .scan_with_options(range, &default_scan_options())
443            .await
444            .map_err(StorageError::from_storage)?;
445        Ok(Box::new(SlateDbIterator { iter }))
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452    use crate::BytesRange;
453    use slatedb::DbBuilder;
454    use slatedb::config::{DbReaderOptions, Settings};
455    use slatedb::object_store::memory::InMemory;
456    use slatedb_common::clock::MockSystemClock;
457
458    #[tokio::test]
459    async fn should_read_data_written_by_storage_via_reader() {
460        let object_store = Arc::new(InMemory::new());
461        let path = "/test/db";
462
463        // Create writer and write data
464        let db = DbBuilder::new(path, object_store.clone())
465            .build()
466            .await
467            .unwrap();
468        let storage = SlateDbStorage::new(Arc::new(db));
469
470        storage
471            .put(vec![
472                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
473                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
474            ])
475            .await
476            .unwrap();
477        storage.flush().await.unwrap();
478
479        // Create reader and verify data
480        let reader = DbReader::open(path, object_store, None, Default::default())
481            .await
482            .unwrap();
483        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
484
485        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
486        assert!(record.is_some());
487        assert_eq!(record.unwrap().value, Bytes::from("value1"));
488
489        let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
490        assert!(record.is_some());
491        assert_eq!(record.unwrap().value, Bytes::from("value2"));
492
493        let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
494        assert!(record.is_none());
495
496        storage.close().await.unwrap();
497    }
498
499    #[tokio::test]
500    async fn should_scan_data_written_by_storage_via_reader() {
501        let object_store = Arc::new(InMemory::new());
502        let path = "/test/db";
503
504        // Create writer and write data
505        let db = DbBuilder::new(path, object_store.clone())
506            .build()
507            .await
508            .unwrap();
509        let storage = SlateDbStorage::new(Arc::new(db));
510
511        storage
512            .put(vec![
513                Record::new(Bytes::from("a"), Bytes::from("1")).into(),
514                Record::new(Bytes::from("b"), Bytes::from("2")).into(),
515                Record::new(Bytes::from("c"), Bytes::from("3")).into(),
516            ])
517            .await
518            .unwrap();
519        storage.flush().await.unwrap();
520
521        // Create reader and scan data
522        let reader = DbReader::open(path, object_store, None, Default::default())
523            .await
524            .unwrap();
525        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
526
527        let mut iter = storage_reader
528            .scan_iter(BytesRange::unbounded())
529            .await
530            .unwrap();
531        let mut results = Vec::new();
532        while let Some(record) = iter.next().await.unwrap() {
533            results.push((record.key, record.value));
534        }
535
536        assert_eq!(results.len(), 3);
537        assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
538        assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
539        assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
540
541        storage.close().await.unwrap();
542    }
543
544    #[tokio::test]
545    async fn should_coexist_writer_and_reader_without_fencing_error() {
546        let object_store = Arc::new(InMemory::new());
547        let path = "/test/db";
548
549        // Create writer
550        let db = DbBuilder::new(path, object_store.clone())
551            .build()
552            .await
553            .unwrap();
554        let storage = SlateDbStorage::new(Arc::new(db));
555
556        // Write initial data
557        storage
558            .put(vec![
559                Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
560            ])
561            .await
562            .unwrap();
563        storage.flush().await.unwrap();
564
565        // Create reader while writer is still open - this should NOT cause fencing error
566        let reader = DbReader::open(path, object_store, None, Default::default())
567            .await
568            .unwrap();
569        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
570
571        // Reader can read the data
572        let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
573        assert!(record.is_some());
574        assert_eq!(record.unwrap().value, Bytes::from("value1"));
575
576        // Writer can still write more data
577        storage
578            .put(vec![
579                Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
580            ])
581            .await
582            .unwrap();
583        storage.flush().await.unwrap();
584
585        storage.close().await.unwrap();
586    }
587
588    #[tokio::test]
589    async fn should_expire_records_based_on_ttl() {
590        // given - storage configured with a 30 second default TTL
591        let object_store = Arc::new(InMemory::new());
592        let path = "/test/ttl_db";
593        let clock = Arc::new(MockSystemClock::new());
594
595        let db = DbBuilder::new(path, object_store)
596            .with_settings(Settings {
597                default_ttl: Some(30_000),
598                ..Default::default()
599            })
600            .with_system_clock(clock.clone())
601            .build()
602            .await
603            .unwrap();
604        let storage = SlateDbStorage::new(Arc::new(db));
605
606        // Write three keys at time=0:
607        //   key1: expires after 20 seconds
608        //   key2: uses default TTL (30 seconds)
609        //   key3: never expires
610        storage
611            .put(vec![
612                PutRecordOp::new_with_options(
613                    Record::new(Bytes::from("key1"), Bytes::from("value1")),
614                    PutOptions {
615                        ttl: Ttl::ExpireAfter(20_000),
616                    },
617                ),
618                PutRecordOp::new_with_options(
619                    Record::new(Bytes::from("key2"), Bytes::from("value2")),
620                    PutOptions { ttl: Ttl::Default },
621                ),
622                PutRecordOp::new_with_options(
623                    Record::new(Bytes::from("key3"), Bytes::from("value3")),
624                    PutOptions { ttl: Ttl::NoExpiry },
625                ),
626            ])
627            .await
628            .unwrap();
629
630        // then - all three keys are present at time=0
631        assert!(storage.get(Bytes::from("key1")).await.unwrap().is_some());
632        assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
633        assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
634
635        // when - advance to 25 seconds
636        clock.set(25_000);
637
638        // then - key1 (20s TTL) expired; key2 (30s default) and key3 (no expiry) still present
639        assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
640        assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
641        assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
642
643        // when - advance to 35 seconds
644        clock.set(35_000);
645
646        // then - key1 and key2 expired; key3 (no expiry) still present with correct value
647        assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
648        assert!(storage.get(Bytes::from("key2")).await.unwrap().is_none());
649        let record = storage.get(Bytes::from("key3")).await.unwrap();
650        assert!(record.is_some());
651        assert_eq!(record.unwrap().value, Bytes::from("value3"));
652
653        storage.close().await.unwrap();
654    }
655
656    /// Simple merge operator that concatenates existing and new values.
657    struct ConcatMergeOperator;
658
659    impl MergeOperator for ConcatMergeOperator {
660        fn merge_batch(
661            &self,
662            _key: &Bytes,
663            existing_value: Option<Bytes>,
664            operands: &[Bytes],
665        ) -> Bytes {
666            let mut result = existing_value.unwrap_or_default().to_vec();
667            for operand in operands {
668                result.extend_from_slice(operand);
669            }
670            Bytes::from(result)
671        }
672    }
673
674    #[tokio::test]
675    async fn should_expire_merge_records_based_on_ttl() {
676        // given - storage configured with a 30 second default TTL and a merge operator
677        let object_store = Arc::new(InMemory::new());
678        let path = "/test/merge_ttl_db";
679        let clock = Arc::new(MockSystemClock::new());
680
681        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
682        let slate_merge_op = SlateDbStorage::merge_operator_adapter(merge_op);
683        let db = DbBuilder::new(path, object_store)
684            .with_settings(Settings {
685                default_ttl: Some(30_000),
686                ..Default::default()
687            })
688            .with_system_clock(clock.clone())
689            .with_merge_operator(Arc::new(slate_merge_op))
690            .build()
691            .await
692            .unwrap();
693        let storage = SlateDbStorage::new(Arc::new(db));
694
695        // Merge three keys at time=0:
696        //   key1: expires after 20 seconds
697        //   key2: uses default TTL (30 seconds)
698        //   key3: never expires
699        storage
700            .merge(vec![
701                MergeRecordOp::new_with_ttl(
702                    Record::new(Bytes::from("key1"), Bytes::from("v1")),
703                    MergeOptions {
704                        ttl: Ttl::ExpireAfter(20_000),
705                    },
706                ),
707                MergeRecordOp::new_with_ttl(
708                    Record::new(Bytes::from("key2"), Bytes::from("v2")),
709                    MergeOptions { ttl: Ttl::Default },
710                ),
711                MergeRecordOp::new_with_ttl(
712                    Record::new(Bytes::from("key3"), Bytes::from("v3")),
713                    MergeOptions { ttl: Ttl::NoExpiry },
714                ),
715            ])
716            .await
717            .unwrap();
718
719        // then - all three keys are present at time=0
720        assert_eq!(
721            storage
722                .get(Bytes::from("key1"))
723                .await
724                .unwrap()
725                .unwrap()
726                .value,
727            Bytes::from("v1")
728        );
729        assert_eq!(
730            storage
731                .get(Bytes::from("key2"))
732                .await
733                .unwrap()
734                .unwrap()
735                .value,
736            Bytes::from("v2")
737        );
738        assert_eq!(
739            storage
740                .get(Bytes::from("key3"))
741                .await
742                .unwrap()
743                .unwrap()
744                .value,
745            Bytes::from("v3")
746        );
747
748        // when - advance to 25 seconds
749        clock.set(25_000);
750
751        // then - key1 (20s TTL) expired; key2 (30s default) and key3 (no expiry) still present
752        assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
753        assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
754        assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
755
756        // when - advance to 35 seconds
757        clock.set(35_000);
758
759        // then - key1 and key2 expired; key3 (no expiry) still present with correct value
760        assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
761        assert!(storage.get(Bytes::from("key2")).await.unwrap().is_none());
762        let record = storage.get(Bytes::from("key3")).await.unwrap();
763        assert!(record.is_some());
764        assert_eq!(record.unwrap().value, Bytes::from("v3"));
765
766        storage.close().await.unwrap();
767    }
768
769    /// Helper: open a DbReader against the same path/object_store and try to
770    /// read a key. Returns `true` if the key is present.
771    async fn reader_can_see(path: &str, object_store: Arc<InMemory>, key: &str) -> bool {
772        reader_can_see_with_merge_op(path, object_store, key, None).await
773    }
774
775    async fn reader_can_see_with_merge_op(
776        path: &str,
777        object_store: Arc<InMemory>,
778        key: &str,
779        merge_op: Option<Arc<dyn SlateDbMergeOperator + Send + Sync>>,
780    ) -> bool {
781        let options = DbReaderOptions {
782            merge_operator: merge_op,
783            ..Default::default()
784        };
785        let reader = DbReader::open(path, object_store, None, options)
786            .await
787            .unwrap();
788        let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
789        storage_reader
790            .get(Bytes::from(key.to_owned()))
791            .await
792            .unwrap()
793            .is_some()
794    }
795
796    #[tokio::test]
797    async fn put_defaults_to_not_await_durable() {
798        let object_store = Arc::new(InMemory::new());
799        let path = "/test/put_default_durability";
800
801        let db = DbBuilder::new(path, object_store.clone())
802            .build()
803            .await
804            .unwrap();
805        let storage = SlateDbStorage::new(Arc::new(db));
806
807        // put() uses WriteOptions::default() which is await_durable: false
808        storage
809            .put(vec![
810                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
811            ])
812            .await
813            .unwrap();
814
815        // Data is in memtable only — a reader (which reads from durable state) should NOT see it
816        assert!(!reader_can_see(path, object_store.clone(), "k1").await);
817
818        // After explicit flush, reader can see it
819        storage.flush().await.unwrap();
820        assert!(reader_can_see(path, object_store.clone(), "k1").await);
821
822        storage.close().await.unwrap();
823    }
824
825    #[tokio::test]
826    async fn put_with_await_durable_true_is_visible_to_reader() {
827        let object_store = Arc::new(InMemory::new());
828        let path = "/test/put_durable";
829
830        let db = DbBuilder::new(path, object_store.clone())
831            .build()
832            .await
833            .unwrap();
834        let storage = SlateDbStorage::new(Arc::new(db));
835
836        // Write with await_durable: true — should be flushed before returning
837        storage
838            .put_with_options(
839                vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
840                WriteOptions {
841                    await_durable: true,
842                },
843            )
844            .await
845            .unwrap();
846
847        // Reader should see it immediately without explicit flush
848        assert!(reader_can_see(path, object_store.clone(), "k1").await);
849
850        storage.close().await.unwrap();
851    }
852
853    #[tokio::test]
854    async fn apply_defaults_to_not_await_durable() {
855        let object_store = Arc::new(InMemory::new());
856        let path = "/test/apply_default_durability";
857
858        let db = DbBuilder::new(path, object_store.clone())
859            .build()
860            .await
861            .unwrap();
862        let storage = SlateDbStorage::new(Arc::new(db));
863
864        // apply() delegates with WriteOptions::default() (await_durable: false)
865        storage
866            .apply(vec![RecordOp::Put(
867                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
868            )])
869            .await
870            .unwrap();
871
872        assert!(!reader_can_see(path, object_store.clone(), "k1").await);
873
874        storage.flush().await.unwrap();
875        assert!(reader_can_see(path, object_store.clone(), "k1").await);
876
877        storage.close().await.unwrap();
878    }
879
880    #[tokio::test]
881    async fn apply_with_await_durable_true_is_visible_to_reader() {
882        let object_store = Arc::new(InMemory::new());
883        let path = "/test/apply_durable";
884
885        let db = DbBuilder::new(path, object_store.clone())
886            .build()
887            .await
888            .unwrap();
889        let storage = SlateDbStorage::new(Arc::new(db));
890
891        storage
892            .apply_with_options(
893                vec![RecordOp::Put(
894                    Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
895                )],
896                WriteOptions {
897                    await_durable: true,
898                },
899            )
900            .await
901            .unwrap();
902
903        assert!(reader_can_see(path, object_store.clone(), "k1").await);
904
905        storage.close().await.unwrap();
906    }
907
908    #[tokio::test]
909    async fn merge_defaults_to_not_await_durable() {
910        let object_store = Arc::new(InMemory::new());
911        let path = "/test/merge_default_durability";
912
913        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
914        let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
915        let db = DbBuilder::new(path, object_store.clone())
916            .with_merge_operator(slate_merge_op.clone())
917            .build()
918            .await
919            .unwrap();
920        let storage = SlateDbStorage::new(Arc::new(db));
921
922        // merge() delegates with WriteOptions::default() (await_durable: false)
923        storage
924            .merge(vec![
925                Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
926            ])
927            .await
928            .unwrap();
929
930        let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
931            Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
932        assert!(
933            !reader_can_see_with_merge_op(
934                path,
935                object_store.clone(),
936                "k1",
937                Some(reader_merge_op.clone()),
938            )
939            .await
940        );
941
942        storage.flush().await.unwrap();
943        assert!(
944            reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
945                .await
946        );
947
948        storage.close().await.unwrap();
949    }
950
951    #[tokio::test]
952    async fn merge_with_await_durable_true_is_visible_to_reader() {
953        let object_store = Arc::new(InMemory::new());
954        let path = "/test/merge_durable";
955
956        let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
957        let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
958        let db = DbBuilder::new(path, object_store.clone())
959            .with_merge_operator(slate_merge_op.clone())
960            .build()
961            .await
962            .unwrap();
963        let storage = SlateDbStorage::new(Arc::new(db));
964
965        storage
966            .merge_with_options(
967                vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
968                WriteOptions {
969                    await_durable: true,
970                },
971            )
972            .await
973            .unwrap();
974
975        let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
976            Arc::new(SlateDbStorage::merge_operator_adapter(merge_op));
977        assert!(
978            reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
979                .await
980        );
981
982        storage.close().await.unwrap();
983    }
984}
985
986#[cfg(all(test, feature = "metrics"))]
987mod metrics_tests {
988    use super::ReadableStatGauge;
989    use prometheus_client::encoding::EncodeMetric;
990    use slatedb::stats::{MetricType as SlateMetricType, ReadableStat};
991    use std::sync::Arc;
992
993    #[derive(Debug)]
994    struct MockStat {
995        metric_type: SlateMetricType,
996    }
997
998    impl ReadableStat for MockStat {
999        fn get(&self) -> i64 {
1000            0
1001        }
1002
1003        fn metric_type(&self) -> SlateMetricType {
1004            self.metric_type
1005        }
1006    }
1007
1008    #[test]
1009    fn should_return_counter_when_slate_metric_type_is_counter() {
1010        // given
1011        let stat = Arc::new(MockStat {
1012            metric_type: SlateMetricType::Counter,
1013        });
1014        let gauge = ReadableStatGauge(stat);
1015
1016        // when
1017        let result = gauge.metric_type();
1018
1019        // then
1020        assert!(matches!(
1021            result,
1022            prometheus_client::metrics::MetricType::Counter,
1023        ));
1024    }
1025
1026    #[test]
1027    fn should_return_gauge_when_slate_metric_type_is_gauge() {
1028        // given
1029        let stat = Arc::new(MockStat {
1030            metric_type: SlateMetricType::Gauge,
1031        });
1032        let gauge = ReadableStatGauge(stat);
1033
1034        // when
1035        let result = gauge.metric_type();
1036
1037        // then
1038        assert!(matches!(
1039            result,
1040            prometheus_client::metrics::MetricType::Gauge,
1041        ));
1042    }
1043}