1use std::sync::Arc;
2
3use crate::storage::{MergeOptions, PutOptions};
4use crate::{
5 BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl,
6 storage::{
7 MergeOperator, MergeRecordOp, PutRecordOp, RecordOp, Storage, StorageSnapshot,
8 WriteOptions, WriteResult,
9 },
10};
11use async_trait::async_trait;
12use bytes::Bytes;
13use slatedb::config::ScanOptions;
14use slatedb::{
15 Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
16 MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
17};
18use tokio::sync::watch;
19
20#[cfg(feature = "metrics")]
25#[derive(Debug)]
26struct ReadableStatGauge(std::sync::Arc<dyn slatedb::stats::ReadableStat>);
27
28#[cfg(feature = "metrics")]
29impl prometheus_client::encoding::EncodeMetric for ReadableStatGauge {
30 fn encode(
31 &self,
32 mut encoder: prometheus_client::encoding::MetricEncoder,
33 ) -> Result<(), std::fmt::Error> {
34 encoder.encode_gauge(&self.0.get())
35 }
36
37 fn metric_type(&self) -> prometheus_client::metrics::MetricType {
38 match self.0.metric_type() {
39 slatedb::stats::MetricType::Counter => prometheus_client::metrics::MetricType::Counter,
40 slatedb::stats::MetricType::Gauge => prometheus_client::metrics::MetricType::Gauge,
41 }
42 }
43}
44
45pub struct SlateDbMergeOperatorAdapter {
49 operator: Arc<dyn MergeOperator>,
50}
51
52impl SlateDbMergeOperatorAdapter {
53 fn new(operator: Arc<dyn MergeOperator>) -> Self {
54 Self { operator }
55 }
56}
57
58impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
59 fn merge(
60 &self,
61 key: &Bytes,
62 existing_value: Option<Bytes>,
63 value: Bytes,
64 ) -> Result<Bytes, MergeOperatorError> {
65 Ok(self.operator.merge_batch(key, existing_value, &[value]))
66 }
67
68 fn merge_batch(
69 &self,
70 key: &Bytes,
71 existing_value: Option<Bytes>,
72 operands: &[Bytes],
73 ) -> Result<Bytes, MergeOperatorError> {
74 if operands.is_empty() && existing_value.is_none() {
75 return Err(MergeOperatorError::EmptyBatch);
76 }
77 Ok(self.operator.merge_batch(key, existing_value, operands))
78 }
79}
80
81fn default_scan_options() -> ScanOptions {
83 ScanOptions {
84 durability_filter: Default::default(),
85 dirty: false,
86 read_ahead_bytes: 1024 * 1024,
87 cache_blocks: true,
88 max_fetch_tasks: 4,
89 }
90}
91
92pub struct SlateDbStorage {
97 pub(super) db: Arc<Db>,
98 durable_tx: watch::Sender<u64>,
99 durable_bridge_abort: tokio::task::AbortHandle,
100}
101
102impl SlateDbStorage {
103 pub fn new(db: Arc<Db>) -> Self {
105 let slate_rx = db.subscribe();
106 let (durable_tx, _) = watch::channel(slate_rx.borrow().durable_seq);
107 let task = tokio::spawn({
108 let tx = durable_tx.clone();
109 async move {
110 let mut slate_rx = slate_rx;
111 while slate_rx.changed().await.is_ok() {
112 let durable_seq = slate_rx.borrow_and_update().durable_seq;
113 if tx.send(durable_seq).is_err() {
114 break;
115 }
116 }
117 }
118 });
119
120 Self {
121 db,
122 durable_tx,
123 durable_bridge_abort: task.abort_handle(),
124 }
125 }
126
127 pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
143 SlateDbMergeOperatorAdapter::new(operator)
144 }
145}
146
147#[async_trait]
148impl StorageRead for SlateDbStorage {
149 #[tracing::instrument(level = "trace", skip_all)]
153 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
154 let value = self
155 .db
156 .get(&key)
157 .await
158 .map_err(StorageError::from_storage)?;
159
160 match value {
161 Some(v) => Ok(Some(Record::new(key, v))),
162 None => Ok(None),
163 }
164 }
165
166 #[tracing::instrument(level = "trace", skip_all)]
167 async fn scan_iter(
168 &self,
169 range: BytesRange,
170 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
171 let iter = self
172 .db
173 .scan_with_options(range, &default_scan_options())
174 .await
175 .map_err(StorageError::from_storage)?;
176 Ok(Box::new(SlateDbIterator { iter }))
177 }
178}
179
180pub(super) struct SlateDbIterator {
181 iter: DbIterator,
182}
183
184#[async_trait]
185impl StorageIterator for SlateDbIterator {
186 #[tracing::instrument(level = "trace", skip_all)]
187 async fn next(&mut self) -> StorageResult<Option<Record>> {
188 match self.iter.next().await.map_err(StorageError::from_storage)? {
189 Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
190 None => Ok(None),
191 }
192 }
193}
194
195pub struct SlateDbStorageSnapshot {
199 snapshot: Arc<DbSnapshot>,
200}
201
202#[async_trait]
203impl StorageRead for SlateDbStorageSnapshot {
204 #[tracing::instrument(level = "trace", skip_all)]
205 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
206 let value = self
207 .snapshot
208 .get(&key)
209 .await
210 .map_err(StorageError::from_storage)?;
211
212 match value {
213 Some(v) => Ok(Some(Record::new(key, v))),
214 None => Ok(None),
215 }
216 }
217
218 #[tracing::instrument(level = "trace", skip_all)]
219 async fn scan_iter(
220 &self,
221 range: BytesRange,
222 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
223 let iter = self
224 .snapshot
225 .scan_with_options(range, &default_scan_options())
226 .await
227 .map_err(StorageError::from_storage)?;
228 Ok(Box::new(SlateDbIterator { iter }))
229 }
230}
231
232#[async_trait]
233impl StorageSnapshot for SlateDbStorageSnapshot {}
234
235#[async_trait]
236impl Storage for SlateDbStorage {
237 async fn apply_with_options(
238 &self,
239 records: Vec<RecordOp>,
240 options: WriteOptions,
241 ) -> StorageResult<WriteResult> {
242 let mut batch = WriteBatch::new();
243 for op in records {
244 match op {
245 RecordOp::Put(op) => {
246 batch.put_with_options(op.record.key, op.record.value, &op.options.into())
247 }
248 RecordOp::Merge(op) => {
249 batch.merge_with_options(op.record.key, op.record.value, &op.options.into())
250 }
251 RecordOp::Delete(key) => batch.delete(key),
252 }
253 }
254 let slate_options = SlateDbWriteOptions {
255 await_durable: options.await_durable,
256 };
257 let write_handle = self
258 .db
259 .write_with_options(batch, &slate_options)
260 .await
261 .map_err(StorageError::from_storage)?;
262 Ok(WriteResult {
263 seqnum: write_handle.seqnum(),
264 })
265 }
266
267 async fn put_with_options(
268 &self,
269 records: Vec<PutRecordOp>,
270 options: WriteOptions,
271 ) -> StorageResult<WriteResult> {
272 let mut batch = WriteBatch::new();
273 for op in records {
274 batch.put_with_options(op.record.key, op.record.value, &op.options.into());
275 }
276 let slate_options = SlateDbWriteOptions {
277 await_durable: options.await_durable,
278 };
279 let write_handle = self
280 .db
281 .write_with_options(batch, &slate_options)
282 .await
283 .map_err(StorageError::from_storage)?;
284 Ok(WriteResult {
285 seqnum: write_handle.seqnum(),
286 })
287 }
288
289 async fn merge_with_options(
290 &self,
291 records: Vec<MergeRecordOp>,
292 options: WriteOptions,
293 ) -> StorageResult<WriteResult> {
294 let mut batch = WriteBatch::new();
295 for op in records {
296 batch.merge_with_options(op.record.key, op.record.value, &op.options.into());
297 }
298 let slate_options = SlateDbWriteOptions {
299 await_durable: options.await_durable,
300 };
301 let write_handle = self
302 .db
303 .write_with_options(batch, &slate_options)
304 .await
305 .map_err(|e| {
306 let error_msg = e.to_string();
307 if error_msg.contains("merge operator") || error_msg.contains("not configured") {
308 StorageError::Storage(
309 "Merge operator not configured for this database".to_string(),
310 )
311 } else {
312 StorageError::from_storage(e)
313 }
314 })?;
315 Ok(WriteResult {
316 seqnum: write_handle.seqnum(),
317 })
318 }
319
320 fn subscribe_durable(&self) -> watch::Receiver<u64> {
321 self.durable_tx.subscribe()
322 }
323
324 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
325 let snapshot = self
326 .db
327 .snapshot()
328 .await
329 .map_err(StorageError::from_storage)?;
330 Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
331 }
332
333 async fn flush(&self) -> StorageResult<()> {
334 self.db.flush().await.map_err(StorageError::from_storage)?;
335 Ok(())
336 }
337
338 async fn close(&self) -> StorageResult<()> {
339 self.durable_bridge_abort.abort();
341 self.db.close().await.map_err(StorageError::from_storage)?;
342 Ok(())
343 }
344
345 #[cfg(feature = "metrics")]
346 fn register_metrics(&self, registry: &mut prometheus_client::registry::Registry) {
347 let stat_registry = self.db.metrics();
348 let mut seen = std::collections::HashSet::new();
349 for name in stat_registry.names() {
350 if let Some(stat) = stat_registry.lookup(name) {
351 let sanitized: String = name
352 .chars()
353 .map(|c| {
354 if c.is_ascii_alphanumeric() || c == '_' {
355 c
356 } else {
357 '_'
358 }
359 })
360 .collect();
361 let prom_name = format!("slatedb_{sanitized}");
362 if !seen.insert(prom_name.clone()) {
363 tracing::warn!(
364 "Duplicate metric name after sanitization: {prom_name:?} (from {name:?}, skipped)"
365 );
366 continue;
367 }
368 registry.register(
369 &prom_name,
370 format!("SlateDB {name}"),
371 ReadableStatGauge(stat),
372 );
373 }
374 }
375 }
376}
377
378impl From<Ttl> for slatedb::config::Ttl {
379 fn from(value: Ttl) -> Self {
380 match value {
381 Ttl::Default => slatedb::config::Ttl::Default,
382 Ttl::NoExpiry => slatedb::config::Ttl::NoExpiry,
383 Ttl::ExpireAfter(ts) => slatedb::config::Ttl::ExpireAfter(ts),
384 }
385 }
386}
387
388impl From<PutOptions> for slatedb::config::PutOptions {
389 fn from(value: PutOptions) -> Self {
390 Self {
391 ttl: value.ttl.into(),
392 }
393 }
394}
395
396impl From<MergeOptions> for slatedb::config::MergeOptions {
397 fn from(value: MergeOptions) -> Self {
398 Self {
399 ttl: value.ttl.into(),
400 }
401 }
402}
403
404pub struct SlateDbStorageReader {
409 reader: Arc<DbReader>,
410}
411
412impl SlateDbStorageReader {
413 pub fn new(reader: Arc<DbReader>) -> Self {
415 Self { reader }
416 }
417}
418
419#[async_trait]
420impl StorageRead for SlateDbStorageReader {
421 #[tracing::instrument(level = "trace", skip_all)]
422 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
423 let value = self
424 .reader
425 .get(&key)
426 .await
427 .map_err(StorageError::from_storage)?;
428
429 match value {
430 Some(v) => Ok(Some(Record::new(key, v))),
431 None => Ok(None),
432 }
433 }
434
435 #[tracing::instrument(level = "trace", skip_all)]
436 async fn scan_iter(
437 &self,
438 range: BytesRange,
439 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
440 let iter = self
441 .reader
442 .scan_with_options(range, &default_scan_options())
443 .await
444 .map_err(StorageError::from_storage)?;
445 Ok(Box::new(SlateDbIterator { iter }))
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use crate::BytesRange;
453 use slatedb::DbBuilder;
454 use slatedb::config::{DbReaderOptions, Settings};
455 use slatedb::object_store::memory::InMemory;
456 use slatedb_common::clock::MockSystemClock;
457
458 #[tokio::test]
459 async fn should_read_data_written_by_storage_via_reader() {
460 let object_store = Arc::new(InMemory::new());
461 let path = "/test/db";
462
463 let db = DbBuilder::new(path, object_store.clone())
465 .build()
466 .await
467 .unwrap();
468 let storage = SlateDbStorage::new(Arc::new(db));
469
470 storage
471 .put(vec![
472 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
473 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
474 ])
475 .await
476 .unwrap();
477 storage.flush().await.unwrap();
478
479 let reader = DbReader::open(path, object_store, None, Default::default())
481 .await
482 .unwrap();
483 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
484
485 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
486 assert!(record.is_some());
487 assert_eq!(record.unwrap().value, Bytes::from("value1"));
488
489 let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
490 assert!(record.is_some());
491 assert_eq!(record.unwrap().value, Bytes::from("value2"));
492
493 let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
494 assert!(record.is_none());
495
496 storage.close().await.unwrap();
497 }
498
499 #[tokio::test]
500 async fn should_scan_data_written_by_storage_via_reader() {
501 let object_store = Arc::new(InMemory::new());
502 let path = "/test/db";
503
504 let db = DbBuilder::new(path, object_store.clone())
506 .build()
507 .await
508 .unwrap();
509 let storage = SlateDbStorage::new(Arc::new(db));
510
511 storage
512 .put(vec![
513 Record::new(Bytes::from("a"), Bytes::from("1")).into(),
514 Record::new(Bytes::from("b"), Bytes::from("2")).into(),
515 Record::new(Bytes::from("c"), Bytes::from("3")).into(),
516 ])
517 .await
518 .unwrap();
519 storage.flush().await.unwrap();
520
521 let reader = DbReader::open(path, object_store, None, Default::default())
523 .await
524 .unwrap();
525 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
526
527 let mut iter = storage_reader
528 .scan_iter(BytesRange::unbounded())
529 .await
530 .unwrap();
531 let mut results = Vec::new();
532 while let Some(record) = iter.next().await.unwrap() {
533 results.push((record.key, record.value));
534 }
535
536 assert_eq!(results.len(), 3);
537 assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
538 assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
539 assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
540
541 storage.close().await.unwrap();
542 }
543
544 #[tokio::test]
545 async fn should_coexist_writer_and_reader_without_fencing_error() {
546 let object_store = Arc::new(InMemory::new());
547 let path = "/test/db";
548
549 let db = DbBuilder::new(path, object_store.clone())
551 .build()
552 .await
553 .unwrap();
554 let storage = SlateDbStorage::new(Arc::new(db));
555
556 storage
558 .put(vec![
559 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
560 ])
561 .await
562 .unwrap();
563 storage.flush().await.unwrap();
564
565 let reader = DbReader::open(path, object_store, None, Default::default())
567 .await
568 .unwrap();
569 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
570
571 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
573 assert!(record.is_some());
574 assert_eq!(record.unwrap().value, Bytes::from("value1"));
575
576 storage
578 .put(vec![
579 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
580 ])
581 .await
582 .unwrap();
583 storage.flush().await.unwrap();
584
585 storage.close().await.unwrap();
586 }
587
588 #[tokio::test]
589 async fn should_expire_records_based_on_ttl() {
590 let object_store = Arc::new(InMemory::new());
592 let path = "/test/ttl_db";
593 let clock = Arc::new(MockSystemClock::new());
594
595 let db = DbBuilder::new(path, object_store)
596 .with_settings(Settings {
597 default_ttl: Some(30_000),
598 ..Default::default()
599 })
600 .with_system_clock(clock.clone())
601 .build()
602 .await
603 .unwrap();
604 let storage = SlateDbStorage::new(Arc::new(db));
605
606 storage
611 .put(vec![
612 PutRecordOp::new_with_options(
613 Record::new(Bytes::from("key1"), Bytes::from("value1")),
614 PutOptions {
615 ttl: Ttl::ExpireAfter(20_000),
616 },
617 ),
618 PutRecordOp::new_with_options(
619 Record::new(Bytes::from("key2"), Bytes::from("value2")),
620 PutOptions { ttl: Ttl::Default },
621 ),
622 PutRecordOp::new_with_options(
623 Record::new(Bytes::from("key3"), Bytes::from("value3")),
624 PutOptions { ttl: Ttl::NoExpiry },
625 ),
626 ])
627 .await
628 .unwrap();
629
630 assert!(storage.get(Bytes::from("key1")).await.unwrap().is_some());
632 assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
633 assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
634
635 clock.set(25_000);
637
638 assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
640 assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
641 assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
642
643 clock.set(35_000);
645
646 assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
648 assert!(storage.get(Bytes::from("key2")).await.unwrap().is_none());
649 let record = storage.get(Bytes::from("key3")).await.unwrap();
650 assert!(record.is_some());
651 assert_eq!(record.unwrap().value, Bytes::from("value3"));
652
653 storage.close().await.unwrap();
654 }
655
656 struct ConcatMergeOperator;
658
659 impl MergeOperator for ConcatMergeOperator {
660 fn merge_batch(
661 &self,
662 _key: &Bytes,
663 existing_value: Option<Bytes>,
664 operands: &[Bytes],
665 ) -> Bytes {
666 let mut result = existing_value.unwrap_or_default().to_vec();
667 for operand in operands {
668 result.extend_from_slice(operand);
669 }
670 Bytes::from(result)
671 }
672 }
673
674 #[tokio::test]
675 async fn should_expire_merge_records_based_on_ttl() {
676 let object_store = Arc::new(InMemory::new());
678 let path = "/test/merge_ttl_db";
679 let clock = Arc::new(MockSystemClock::new());
680
681 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
682 let slate_merge_op = SlateDbStorage::merge_operator_adapter(merge_op);
683 let db = DbBuilder::new(path, object_store)
684 .with_settings(Settings {
685 default_ttl: Some(30_000),
686 ..Default::default()
687 })
688 .with_system_clock(clock.clone())
689 .with_merge_operator(Arc::new(slate_merge_op))
690 .build()
691 .await
692 .unwrap();
693 let storage = SlateDbStorage::new(Arc::new(db));
694
695 storage
700 .merge(vec![
701 MergeRecordOp::new_with_ttl(
702 Record::new(Bytes::from("key1"), Bytes::from("v1")),
703 MergeOptions {
704 ttl: Ttl::ExpireAfter(20_000),
705 },
706 ),
707 MergeRecordOp::new_with_ttl(
708 Record::new(Bytes::from("key2"), Bytes::from("v2")),
709 MergeOptions { ttl: Ttl::Default },
710 ),
711 MergeRecordOp::new_with_ttl(
712 Record::new(Bytes::from("key3"), Bytes::from("v3")),
713 MergeOptions { ttl: Ttl::NoExpiry },
714 ),
715 ])
716 .await
717 .unwrap();
718
719 assert_eq!(
721 storage
722 .get(Bytes::from("key1"))
723 .await
724 .unwrap()
725 .unwrap()
726 .value,
727 Bytes::from("v1")
728 );
729 assert_eq!(
730 storage
731 .get(Bytes::from("key2"))
732 .await
733 .unwrap()
734 .unwrap()
735 .value,
736 Bytes::from("v2")
737 );
738 assert_eq!(
739 storage
740 .get(Bytes::from("key3"))
741 .await
742 .unwrap()
743 .unwrap()
744 .value,
745 Bytes::from("v3")
746 );
747
748 clock.set(25_000);
750
751 assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
753 assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
754 assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
755
756 clock.set(35_000);
758
759 assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
761 assert!(storage.get(Bytes::from("key2")).await.unwrap().is_none());
762 let record = storage.get(Bytes::from("key3")).await.unwrap();
763 assert!(record.is_some());
764 assert_eq!(record.unwrap().value, Bytes::from("v3"));
765
766 storage.close().await.unwrap();
767 }
768
769 async fn reader_can_see(path: &str, object_store: Arc<InMemory>, key: &str) -> bool {
772 reader_can_see_with_merge_op(path, object_store, key, None).await
773 }
774
775 async fn reader_can_see_with_merge_op(
776 path: &str,
777 object_store: Arc<InMemory>,
778 key: &str,
779 merge_op: Option<Arc<dyn SlateDbMergeOperator + Send + Sync>>,
780 ) -> bool {
781 let options = DbReaderOptions {
782 merge_operator: merge_op,
783 ..Default::default()
784 };
785 let reader = DbReader::open(path, object_store, None, options)
786 .await
787 .unwrap();
788 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
789 storage_reader
790 .get(Bytes::from(key.to_owned()))
791 .await
792 .unwrap()
793 .is_some()
794 }
795
796 #[tokio::test]
797 async fn put_defaults_to_not_await_durable() {
798 let object_store = Arc::new(InMemory::new());
799 let path = "/test/put_default_durability";
800
801 let db = DbBuilder::new(path, object_store.clone())
802 .build()
803 .await
804 .unwrap();
805 let storage = SlateDbStorage::new(Arc::new(db));
806
807 storage
809 .put(vec![
810 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
811 ])
812 .await
813 .unwrap();
814
815 assert!(!reader_can_see(path, object_store.clone(), "k1").await);
817
818 storage.flush().await.unwrap();
820 assert!(reader_can_see(path, object_store.clone(), "k1").await);
821
822 storage.close().await.unwrap();
823 }
824
825 #[tokio::test]
826 async fn put_with_await_durable_true_is_visible_to_reader() {
827 let object_store = Arc::new(InMemory::new());
828 let path = "/test/put_durable";
829
830 let db = DbBuilder::new(path, object_store.clone())
831 .build()
832 .await
833 .unwrap();
834 let storage = SlateDbStorage::new(Arc::new(db));
835
836 storage
838 .put_with_options(
839 vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
840 WriteOptions {
841 await_durable: true,
842 },
843 )
844 .await
845 .unwrap();
846
847 assert!(reader_can_see(path, object_store.clone(), "k1").await);
849
850 storage.close().await.unwrap();
851 }
852
853 #[tokio::test]
854 async fn apply_defaults_to_not_await_durable() {
855 let object_store = Arc::new(InMemory::new());
856 let path = "/test/apply_default_durability";
857
858 let db = DbBuilder::new(path, object_store.clone())
859 .build()
860 .await
861 .unwrap();
862 let storage = SlateDbStorage::new(Arc::new(db));
863
864 storage
866 .apply(vec![RecordOp::Put(
867 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
868 )])
869 .await
870 .unwrap();
871
872 assert!(!reader_can_see(path, object_store.clone(), "k1").await);
873
874 storage.flush().await.unwrap();
875 assert!(reader_can_see(path, object_store.clone(), "k1").await);
876
877 storage.close().await.unwrap();
878 }
879
880 #[tokio::test]
881 async fn apply_with_await_durable_true_is_visible_to_reader() {
882 let object_store = Arc::new(InMemory::new());
883 let path = "/test/apply_durable";
884
885 let db = DbBuilder::new(path, object_store.clone())
886 .build()
887 .await
888 .unwrap();
889 let storage = SlateDbStorage::new(Arc::new(db));
890
891 storage
892 .apply_with_options(
893 vec![RecordOp::Put(
894 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
895 )],
896 WriteOptions {
897 await_durable: true,
898 },
899 )
900 .await
901 .unwrap();
902
903 assert!(reader_can_see(path, object_store.clone(), "k1").await);
904
905 storage.close().await.unwrap();
906 }
907
908 #[tokio::test]
909 async fn merge_defaults_to_not_await_durable() {
910 let object_store = Arc::new(InMemory::new());
911 let path = "/test/merge_default_durability";
912
913 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
914 let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
915 let db = DbBuilder::new(path, object_store.clone())
916 .with_merge_operator(slate_merge_op.clone())
917 .build()
918 .await
919 .unwrap();
920 let storage = SlateDbStorage::new(Arc::new(db));
921
922 storage
924 .merge(vec![
925 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
926 ])
927 .await
928 .unwrap();
929
930 let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
931 Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
932 assert!(
933 !reader_can_see_with_merge_op(
934 path,
935 object_store.clone(),
936 "k1",
937 Some(reader_merge_op.clone()),
938 )
939 .await
940 );
941
942 storage.flush().await.unwrap();
943 assert!(
944 reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
945 .await
946 );
947
948 storage.close().await.unwrap();
949 }
950
951 #[tokio::test]
952 async fn merge_with_await_durable_true_is_visible_to_reader() {
953 let object_store = Arc::new(InMemory::new());
954 let path = "/test/merge_durable";
955
956 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
957 let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
958 let db = DbBuilder::new(path, object_store.clone())
959 .with_merge_operator(slate_merge_op.clone())
960 .build()
961 .await
962 .unwrap();
963 let storage = SlateDbStorage::new(Arc::new(db));
964
965 storage
966 .merge_with_options(
967 vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
968 WriteOptions {
969 await_durable: true,
970 },
971 )
972 .await
973 .unwrap();
974
975 let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
976 Arc::new(SlateDbStorage::merge_operator_adapter(merge_op));
977 assert!(
978 reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
979 .await
980 );
981
982 storage.close().await.unwrap();
983 }
984}
985
986#[cfg(all(test, feature = "metrics"))]
987mod metrics_tests {
988 use super::ReadableStatGauge;
989 use prometheus_client::encoding::EncodeMetric;
990 use slatedb::stats::{MetricType as SlateMetricType, ReadableStat};
991 use std::sync::Arc;
992
993 #[derive(Debug)]
994 struct MockStat {
995 metric_type: SlateMetricType,
996 }
997
998 impl ReadableStat for MockStat {
999 fn get(&self) -> i64 {
1000 0
1001 }
1002
1003 fn metric_type(&self) -> SlateMetricType {
1004 self.metric_type
1005 }
1006 }
1007
1008 #[test]
1009 fn should_return_counter_when_slate_metric_type_is_counter() {
1010 let stat = Arc::new(MockStat {
1012 metric_type: SlateMetricType::Counter,
1013 });
1014 let gauge = ReadableStatGauge(stat);
1015
1016 let result = gauge.metric_type();
1018
1019 assert!(matches!(
1021 result,
1022 prometheus_client::metrics::MetricType::Counter,
1023 ));
1024 }
1025
1026 #[test]
1027 fn should_return_gauge_when_slate_metric_type_is_gauge() {
1028 let stat = Arc::new(MockStat {
1030 metric_type: SlateMetricType::Gauge,
1031 });
1032 let gauge = ReadableStatGauge(stat);
1033
1034 let result = gauge.metric_type();
1036
1037 assert!(matches!(
1039 result,
1040 prometheus_client::metrics::MetricType::Gauge,
1041 ));
1042 }
1043}