1use std::sync::Arc;
2
3use crate::storage::{MergeOptions, PutOptions};
4use crate::{
5 BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl,
6 storage::{
7 MergeOperator, MergeRecordOp, PutRecordOp, RecordOp, Storage, StorageSnapshot,
8 WriteOptions, WriteResult,
9 },
10};
11use async_trait::async_trait;
12use bytes::Bytes;
13use slatedb::config::ScanOptions;
14use slatedb::{
15 Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
16 MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
17};
18use tokio::sync::watch;
19
20#[cfg(feature = "metrics")]
25#[derive(Debug)]
26struct ReadableStatGauge(std::sync::Arc<dyn slatedb::stats::ReadableStat>);
27
28#[cfg(feature = "metrics")]
29impl prometheus_client::encoding::EncodeMetric for ReadableStatGauge {
30 fn encode(
31 &self,
32 mut encoder: prometheus_client::encoding::MetricEncoder,
33 ) -> Result<(), std::fmt::Error> {
34 encoder.encode_gauge(&self.0.get())
35 }
36
37 fn metric_type(&self) -> prometheus_client::metrics::MetricType {
38 match self.0.metric_type() {
39 slatedb::stats::MetricType::Counter => prometheus_client::metrics::MetricType::Counter,
40 slatedb::stats::MetricType::Gauge => prometheus_client::metrics::MetricType::Gauge,
41 }
42 }
43}
44
45pub struct SlateDbMergeOperatorAdapter {
49 operator: Arc<dyn MergeOperator>,
50}
51
52impl SlateDbMergeOperatorAdapter {
53 fn new(operator: Arc<dyn MergeOperator>) -> Self {
54 Self { operator }
55 }
56}
57
58impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
59 fn merge(
60 &self,
61 key: &Bytes,
62 existing_value: Option<Bytes>,
63 value: Bytes,
64 ) -> Result<Bytes, MergeOperatorError> {
65 Ok(self.operator.merge(key, existing_value, value))
66 }
67
68 fn merge_batch(
69 &self,
70 key: &Bytes,
71 existing_value: Option<Bytes>,
72 operands: &[Bytes],
73 ) -> Result<Bytes, MergeOperatorError> {
74 if operands.is_empty() && existing_value.is_none() {
75 return Err(MergeOperatorError::EmptyBatch);
76 }
77 Ok(self.operator.merge_batch(key, existing_value, operands))
78 }
79}
80
81fn default_scan_options() -> ScanOptions {
83 ScanOptions {
84 durability_filter: Default::default(),
85 dirty: false,
86 read_ahead_bytes: 1024 * 1024,
87 cache_blocks: true,
88 max_fetch_tasks: 4,
89 }
90}
91
92pub struct SlateDbStorage {
97 pub(super) db: Arc<Db>,
98 durable_tx: watch::Sender<u64>,
99 durable_bridge_abort: tokio::task::AbortHandle,
100}
101
102impl SlateDbStorage {
103 pub fn new(db: Arc<Db>) -> Self {
105 let slate_rx = db.subscribe();
106 let (durable_tx, _) = watch::channel(slate_rx.borrow().durable_seq);
107 let task = tokio::spawn({
108 let tx = durable_tx.clone();
109 async move {
110 let mut slate_rx = slate_rx;
111 while slate_rx.changed().await.is_ok() {
112 let durable_seq = slate_rx.borrow_and_update().durable_seq;
113 if tx.send(durable_seq).is_err() {
114 break;
115 }
116 }
117 }
118 });
119
120 Self {
121 db,
122 durable_tx,
123 durable_bridge_abort: task.abort_handle(),
124 }
125 }
126
127 pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
143 SlateDbMergeOperatorAdapter::new(operator)
144 }
145}
146
147#[async_trait]
148impl StorageRead for SlateDbStorage {
149 #[tracing::instrument(level = "trace", skip_all)]
153 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
154 let value = self
155 .db
156 .get(&key)
157 .await
158 .map_err(StorageError::from_storage)?;
159
160 match value {
161 Some(v) => Ok(Some(Record::new(key, v))),
162 None => Ok(None),
163 }
164 }
165
166 #[tracing::instrument(level = "trace", skip_all)]
167 async fn scan_iter(
168 &self,
169 range: BytesRange,
170 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
171 let iter = self
172 .db
173 .scan_with_options(range, &default_scan_options())
174 .await
175 .map_err(StorageError::from_storage)?;
176 Ok(Box::new(SlateDbIterator { iter }))
177 }
178}
179
180pub(super) struct SlateDbIterator {
181 iter: DbIterator,
182}
183
184#[async_trait]
185impl StorageIterator for SlateDbIterator {
186 #[tracing::instrument(level = "trace", skip_all)]
187 async fn next(&mut self) -> StorageResult<Option<Record>> {
188 match self.iter.next().await.map_err(StorageError::from_storage)? {
189 Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
190 None => Ok(None),
191 }
192 }
193}
194
195pub struct SlateDbStorageSnapshot {
199 snapshot: Arc<DbSnapshot>,
200}
201
202#[async_trait]
203impl StorageRead for SlateDbStorageSnapshot {
204 #[tracing::instrument(level = "trace", skip_all)]
205 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
206 let value = self
207 .snapshot
208 .get(&key)
209 .await
210 .map_err(StorageError::from_storage)?;
211
212 match value {
213 Some(v) => Ok(Some(Record::new(key, v))),
214 None => Ok(None),
215 }
216 }
217
218 #[tracing::instrument(level = "trace", skip_all)]
219 async fn scan_iter(
220 &self,
221 range: BytesRange,
222 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
223 let iter = self
224 .snapshot
225 .scan_with_options(range, &default_scan_options())
226 .await
227 .map_err(StorageError::from_storage)?;
228 Ok(Box::new(SlateDbIterator { iter }))
229 }
230}
231
232#[async_trait]
233impl StorageSnapshot for SlateDbStorageSnapshot {}
234
235#[async_trait]
236impl Storage for SlateDbStorage {
237 async fn apply_with_options(
238 &self,
239 records: Vec<RecordOp>,
240 options: WriteOptions,
241 ) -> StorageResult<WriteResult> {
242 let mut batch = WriteBatch::new();
243 for op in records {
244 match op {
245 RecordOp::Put(op) => {
246 batch.put_with_options(op.record.key, op.record.value, &op.options.into())
247 }
248 RecordOp::Merge(op) => {
249 batch.merge_with_options(op.record.key, op.record.value, &op.options.into())
250 }
251 RecordOp::Delete(key) => batch.delete(key),
252 }
253 }
254 let slate_options = SlateDbWriteOptions {
255 await_durable: options.await_durable,
256 };
257 let write_handle = self
258 .db
259 .write_with_options(batch, &slate_options)
260 .await
261 .map_err(StorageError::from_storage)?;
262 Ok(WriteResult {
263 seqnum: write_handle.seqnum(),
264 })
265 }
266
267 async fn put_with_options(
268 &self,
269 records: Vec<PutRecordOp>,
270 options: WriteOptions,
271 ) -> StorageResult<WriteResult> {
272 let mut batch = WriteBatch::new();
273 for op in records {
274 batch.put_with_options(op.record.key, op.record.value, &op.options.into());
275 }
276 let slate_options = SlateDbWriteOptions {
277 await_durable: options.await_durable,
278 };
279 let write_handle = self
280 .db
281 .write_with_options(batch, &slate_options)
282 .await
283 .map_err(StorageError::from_storage)?;
284 Ok(WriteResult {
285 seqnum: write_handle.seqnum(),
286 })
287 }
288
289 async fn merge_with_options(
290 &self,
291 records: Vec<MergeRecordOp>,
292 options: WriteOptions,
293 ) -> StorageResult<WriteResult> {
294 let mut batch = WriteBatch::new();
295 for op in records {
296 batch.merge_with_options(op.record.key, op.record.value, &op.options.into());
297 }
298 let slate_options = SlateDbWriteOptions {
299 await_durable: options.await_durable,
300 };
301 let write_handle = self
302 .db
303 .write_with_options(batch, &slate_options)
304 .await
305 .map_err(|e| {
306 let error_msg = e.to_string();
307 if error_msg.contains("merge operator") || error_msg.contains("not configured") {
308 StorageError::Storage(
309 "Merge operator not configured for this database".to_string(),
310 )
311 } else {
312 StorageError::from_storage(e)
313 }
314 })?;
315 Ok(WriteResult {
316 seqnum: write_handle.seqnum(),
317 })
318 }
319
320 fn subscribe_durable(&self) -> watch::Receiver<u64> {
321 self.durable_tx.subscribe()
322 }
323
324 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
325 let snapshot = self
326 .db
327 .snapshot()
328 .await
329 .map_err(StorageError::from_storage)?;
330 Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
331 }
332
333 async fn flush(&self) -> StorageResult<()> {
334 self.db.flush().await.map_err(StorageError::from_storage)?;
335 Ok(())
336 }
337
338 async fn close(&self) -> StorageResult<()> {
339 self.durable_bridge_abort.abort();
341 self.db.close().await.map_err(StorageError::from_storage)?;
342 Ok(())
343 }
344
345 #[cfg(feature = "metrics")]
346 fn register_metrics(&self, registry: &mut prometheus_client::registry::Registry) {
347 let stat_registry = self.db.metrics();
348 let mut seen = std::collections::HashSet::new();
349 for name in stat_registry.names() {
350 if let Some(stat) = stat_registry.lookup(name) {
351 let sanitized: String = name
352 .chars()
353 .map(|c| {
354 if c.is_ascii_alphanumeric() || c == '_' {
355 c
356 } else {
357 '_'
358 }
359 })
360 .collect();
361 let prom_name = format!("slatedb_{sanitized}");
362 if !seen.insert(prom_name.clone()) {
363 tracing::warn!(
364 "Duplicate metric name after sanitization: {prom_name:?} (from {name:?}, skipped)"
365 );
366 continue;
367 }
368 registry.register(
369 &prom_name,
370 format!("SlateDB {name}"),
371 ReadableStatGauge(stat),
372 );
373 }
374 }
375 }
376}
377
378impl From<Ttl> for slatedb::config::Ttl {
379 fn from(value: Ttl) -> Self {
380 match value {
381 Ttl::Default => slatedb::config::Ttl::Default,
382 Ttl::NoExpiry => slatedb::config::Ttl::NoExpiry,
383 Ttl::ExpireAfter(ts) => slatedb::config::Ttl::ExpireAfter(ts),
384 }
385 }
386}
387
388impl From<PutOptions> for slatedb::config::PutOptions {
389 fn from(value: PutOptions) -> Self {
390 Self {
391 ttl: value.ttl.into(),
392 }
393 }
394}
395
396impl From<MergeOptions> for slatedb::config::MergeOptions {
397 fn from(value: MergeOptions) -> Self {
398 Self {
399 ttl: value.ttl.into(),
400 }
401 }
402}
403
404pub struct SlateDbStorageReader {
409 reader: Arc<DbReader>,
410}
411
412impl SlateDbStorageReader {
413 pub fn new(reader: Arc<DbReader>) -> Self {
415 Self { reader }
416 }
417}
418
419#[async_trait]
420impl StorageRead for SlateDbStorageReader {
421 #[tracing::instrument(level = "trace", skip_all)]
422 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
423 let value = self
424 .reader
425 .get(&key)
426 .await
427 .map_err(StorageError::from_storage)?;
428
429 match value {
430 Some(v) => Ok(Some(Record::new(key, v))),
431 None => Ok(None),
432 }
433 }
434
435 #[tracing::instrument(level = "trace", skip_all)]
436 async fn scan_iter(
437 &self,
438 range: BytesRange,
439 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
440 let iter = self
441 .reader
442 .scan_with_options(range, &default_scan_options())
443 .await
444 .map_err(StorageError::from_storage)?;
445 Ok(Box::new(SlateDbIterator { iter }))
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use crate::BytesRange;
453 use slatedb::DbBuilder;
454 use slatedb::config::{DbReaderOptions, Settings};
455 use slatedb::object_store::memory::InMemory;
456 use slatedb_common::clock::MockSystemClock;
457
458 #[tokio::test]
459 async fn should_read_data_written_by_storage_via_reader() {
460 let object_store = Arc::new(InMemory::new());
461 let path = "/test/db";
462
463 let db = DbBuilder::new(path, object_store.clone())
465 .build()
466 .await
467 .unwrap();
468 let storage = SlateDbStorage::new(Arc::new(db));
469
470 storage
471 .put(vec![
472 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
473 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
474 ])
475 .await
476 .unwrap();
477 storage.flush().await.unwrap();
478
479 let reader = DbReader::open(path, object_store, None, Default::default())
481 .await
482 .unwrap();
483 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
484
485 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
486 assert!(record.is_some());
487 assert_eq!(record.unwrap().value, Bytes::from("value1"));
488
489 let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
490 assert!(record.is_some());
491 assert_eq!(record.unwrap().value, Bytes::from("value2"));
492
493 let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
494 assert!(record.is_none());
495
496 storage.close().await.unwrap();
497 }
498
499 #[tokio::test]
500 async fn should_scan_data_written_by_storage_via_reader() {
501 let object_store = Arc::new(InMemory::new());
502 let path = "/test/db";
503
504 let db = DbBuilder::new(path, object_store.clone())
506 .build()
507 .await
508 .unwrap();
509 let storage = SlateDbStorage::new(Arc::new(db));
510
511 storage
512 .put(vec![
513 Record::new(Bytes::from("a"), Bytes::from("1")).into(),
514 Record::new(Bytes::from("b"), Bytes::from("2")).into(),
515 Record::new(Bytes::from("c"), Bytes::from("3")).into(),
516 ])
517 .await
518 .unwrap();
519 storage.flush().await.unwrap();
520
521 let reader = DbReader::open(path, object_store, None, Default::default())
523 .await
524 .unwrap();
525 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
526
527 let mut iter = storage_reader
528 .scan_iter(BytesRange::unbounded())
529 .await
530 .unwrap();
531 let mut results = Vec::new();
532 while let Some(record) = iter.next().await.unwrap() {
533 results.push((record.key, record.value));
534 }
535
536 assert_eq!(results.len(), 3);
537 assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
538 assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
539 assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
540
541 storage.close().await.unwrap();
542 }
543
544 #[tokio::test]
545 async fn should_coexist_writer_and_reader_without_fencing_error() {
546 let object_store = Arc::new(InMemory::new());
547 let path = "/test/db";
548
549 let db = DbBuilder::new(path, object_store.clone())
551 .build()
552 .await
553 .unwrap();
554 let storage = SlateDbStorage::new(Arc::new(db));
555
556 storage
558 .put(vec![
559 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
560 ])
561 .await
562 .unwrap();
563 storage.flush().await.unwrap();
564
565 let reader = DbReader::open(path, object_store, None, Default::default())
567 .await
568 .unwrap();
569 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
570
571 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
573 assert!(record.is_some());
574 assert_eq!(record.unwrap().value, Bytes::from("value1"));
575
576 storage
578 .put(vec![
579 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
580 ])
581 .await
582 .unwrap();
583 storage.flush().await.unwrap();
584
585 storage.close().await.unwrap();
586 }
587
588 #[tokio::test]
589 async fn should_expire_records_based_on_ttl() {
590 let object_store = Arc::new(InMemory::new());
592 let path = "/test/ttl_db";
593 let clock = Arc::new(MockSystemClock::new());
594
595 let db = DbBuilder::new(path, object_store)
596 .with_settings(Settings {
597 default_ttl: Some(30_000),
598 ..Default::default()
599 })
600 .with_system_clock(clock.clone())
601 .build()
602 .await
603 .unwrap();
604 let storage = SlateDbStorage::new(Arc::new(db));
605
606 storage
611 .put(vec![
612 PutRecordOp::new_with_options(
613 Record::new(Bytes::from("key1"), Bytes::from("value1")),
614 PutOptions {
615 ttl: Ttl::ExpireAfter(20_000),
616 },
617 ),
618 PutRecordOp::new_with_options(
619 Record::new(Bytes::from("key2"), Bytes::from("value2")),
620 PutOptions { ttl: Ttl::Default },
621 ),
622 PutRecordOp::new_with_options(
623 Record::new(Bytes::from("key3"), Bytes::from("value3")),
624 PutOptions { ttl: Ttl::NoExpiry },
625 ),
626 ])
627 .await
628 .unwrap();
629
630 assert!(storage.get(Bytes::from("key1")).await.unwrap().is_some());
632 assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
633 assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
634
635 clock.set(25_000);
637
638 assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
640 assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
641 assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
642
643 clock.set(35_000);
645
646 assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
648 assert!(storage.get(Bytes::from("key2")).await.unwrap().is_none());
649 let record = storage.get(Bytes::from("key3")).await.unwrap();
650 assert!(record.is_some());
651 assert_eq!(record.unwrap().value, Bytes::from("value3"));
652
653 storage.close().await.unwrap();
654 }
655
656 struct ConcatMergeOperator;
658
659 impl MergeOperator for ConcatMergeOperator {
660 fn merge(&self, _key: &Bytes, existing_value: Option<Bytes>, new_value: Bytes) -> Bytes {
661 match existing_value {
662 Some(existing) => {
663 let mut result = Vec::with_capacity(existing.len() + new_value.len());
664 result.extend_from_slice(&existing);
665 result.extend_from_slice(&new_value);
666 Bytes::from(result)
667 }
668 None => new_value,
669 }
670 }
671 }
672
673 #[tokio::test]
674 async fn should_expire_merge_records_based_on_ttl() {
675 let object_store = Arc::new(InMemory::new());
677 let path = "/test/merge_ttl_db";
678 let clock = Arc::new(MockSystemClock::new());
679
680 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
681 let slate_merge_op = SlateDbStorage::merge_operator_adapter(merge_op);
682 let db = DbBuilder::new(path, object_store)
683 .with_settings(Settings {
684 default_ttl: Some(30_000),
685 ..Default::default()
686 })
687 .with_system_clock(clock.clone())
688 .with_merge_operator(Arc::new(slate_merge_op))
689 .build()
690 .await
691 .unwrap();
692 let storage = SlateDbStorage::new(Arc::new(db));
693
694 storage
699 .merge(vec![
700 MergeRecordOp::new_with_ttl(
701 Record::new(Bytes::from("key1"), Bytes::from("v1")),
702 MergeOptions {
703 ttl: Ttl::ExpireAfter(20_000),
704 },
705 ),
706 MergeRecordOp::new_with_ttl(
707 Record::new(Bytes::from("key2"), Bytes::from("v2")),
708 MergeOptions { ttl: Ttl::Default },
709 ),
710 MergeRecordOp::new_with_ttl(
711 Record::new(Bytes::from("key3"), Bytes::from("v3")),
712 MergeOptions { ttl: Ttl::NoExpiry },
713 ),
714 ])
715 .await
716 .unwrap();
717
718 assert_eq!(
720 storage
721 .get(Bytes::from("key1"))
722 .await
723 .unwrap()
724 .unwrap()
725 .value,
726 Bytes::from("v1")
727 );
728 assert_eq!(
729 storage
730 .get(Bytes::from("key2"))
731 .await
732 .unwrap()
733 .unwrap()
734 .value,
735 Bytes::from("v2")
736 );
737 assert_eq!(
738 storage
739 .get(Bytes::from("key3"))
740 .await
741 .unwrap()
742 .unwrap()
743 .value,
744 Bytes::from("v3")
745 );
746
747 clock.set(25_000);
749
750 assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
752 assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
753 assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
754
755 clock.set(35_000);
757
758 assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
760 assert!(storage.get(Bytes::from("key2")).await.unwrap().is_none());
761 let record = storage.get(Bytes::from("key3")).await.unwrap();
762 assert!(record.is_some());
763 assert_eq!(record.unwrap().value, Bytes::from("v3"));
764
765 storage.close().await.unwrap();
766 }
767
768 async fn reader_can_see(path: &str, object_store: Arc<InMemory>, key: &str) -> bool {
771 reader_can_see_with_merge_op(path, object_store, key, None).await
772 }
773
774 async fn reader_can_see_with_merge_op(
775 path: &str,
776 object_store: Arc<InMemory>,
777 key: &str,
778 merge_op: Option<Arc<dyn SlateDbMergeOperator + Send + Sync>>,
779 ) -> bool {
780 let options = DbReaderOptions {
781 merge_operator: merge_op,
782 ..Default::default()
783 };
784 let reader = DbReader::open(path, object_store, None, options)
785 .await
786 .unwrap();
787 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
788 storage_reader
789 .get(Bytes::from(key.to_owned()))
790 .await
791 .unwrap()
792 .is_some()
793 }
794
795 #[tokio::test]
796 async fn put_defaults_to_not_await_durable() {
797 let object_store = Arc::new(InMemory::new());
798 let path = "/test/put_default_durability";
799
800 let db = DbBuilder::new(path, object_store.clone())
801 .build()
802 .await
803 .unwrap();
804 let storage = SlateDbStorage::new(Arc::new(db));
805
806 storage
808 .put(vec![
809 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
810 ])
811 .await
812 .unwrap();
813
814 assert!(!reader_can_see(path, object_store.clone(), "k1").await);
816
817 storage.flush().await.unwrap();
819 assert!(reader_can_see(path, object_store.clone(), "k1").await);
820
821 storage.close().await.unwrap();
822 }
823
824 #[tokio::test]
825 async fn put_with_await_durable_true_is_visible_to_reader() {
826 let object_store = Arc::new(InMemory::new());
827 let path = "/test/put_durable";
828
829 let db = DbBuilder::new(path, object_store.clone())
830 .build()
831 .await
832 .unwrap();
833 let storage = SlateDbStorage::new(Arc::new(db));
834
835 storage
837 .put_with_options(
838 vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
839 WriteOptions {
840 await_durable: true,
841 },
842 )
843 .await
844 .unwrap();
845
846 assert!(reader_can_see(path, object_store.clone(), "k1").await);
848
849 storage.close().await.unwrap();
850 }
851
852 #[tokio::test]
853 async fn apply_defaults_to_not_await_durable() {
854 let object_store = Arc::new(InMemory::new());
855 let path = "/test/apply_default_durability";
856
857 let db = DbBuilder::new(path, object_store.clone())
858 .build()
859 .await
860 .unwrap();
861 let storage = SlateDbStorage::new(Arc::new(db));
862
863 storage
865 .apply(vec![RecordOp::Put(
866 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
867 )])
868 .await
869 .unwrap();
870
871 assert!(!reader_can_see(path, object_store.clone(), "k1").await);
872
873 storage.flush().await.unwrap();
874 assert!(reader_can_see(path, object_store.clone(), "k1").await);
875
876 storage.close().await.unwrap();
877 }
878
879 #[tokio::test]
880 async fn apply_with_await_durable_true_is_visible_to_reader() {
881 let object_store = Arc::new(InMemory::new());
882 let path = "/test/apply_durable";
883
884 let db = DbBuilder::new(path, object_store.clone())
885 .build()
886 .await
887 .unwrap();
888 let storage = SlateDbStorage::new(Arc::new(db));
889
890 storage
891 .apply_with_options(
892 vec![RecordOp::Put(
893 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
894 )],
895 WriteOptions {
896 await_durable: true,
897 },
898 )
899 .await
900 .unwrap();
901
902 assert!(reader_can_see(path, object_store.clone(), "k1").await);
903
904 storage.close().await.unwrap();
905 }
906
907 #[tokio::test]
908 async fn merge_defaults_to_not_await_durable() {
909 let object_store = Arc::new(InMemory::new());
910 let path = "/test/merge_default_durability";
911
912 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
913 let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
914 let db = DbBuilder::new(path, object_store.clone())
915 .with_merge_operator(slate_merge_op.clone())
916 .build()
917 .await
918 .unwrap();
919 let storage = SlateDbStorage::new(Arc::new(db));
920
921 storage
923 .merge(vec![
924 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
925 ])
926 .await
927 .unwrap();
928
929 let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
930 Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
931 assert!(
932 !reader_can_see_with_merge_op(
933 path,
934 object_store.clone(),
935 "k1",
936 Some(reader_merge_op.clone()),
937 )
938 .await
939 );
940
941 storage.flush().await.unwrap();
942 assert!(
943 reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
944 .await
945 );
946
947 storage.close().await.unwrap();
948 }
949
950 #[tokio::test]
951 async fn merge_with_await_durable_true_is_visible_to_reader() {
952 let object_store = Arc::new(InMemory::new());
953 let path = "/test/merge_durable";
954
955 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
956 let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
957 let db = DbBuilder::new(path, object_store.clone())
958 .with_merge_operator(slate_merge_op.clone())
959 .build()
960 .await
961 .unwrap();
962 let storage = SlateDbStorage::new(Arc::new(db));
963
964 storage
965 .merge_with_options(
966 vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
967 WriteOptions {
968 await_durable: true,
969 },
970 )
971 .await
972 .unwrap();
973
974 let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
975 Arc::new(SlateDbStorage::merge_operator_adapter(merge_op));
976 assert!(
977 reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
978 .await
979 );
980
981 storage.close().await.unwrap();
982 }
983}
984
985#[cfg(all(test, feature = "metrics"))]
986mod metrics_tests {
987 use super::ReadableStatGauge;
988 use prometheus_client::encoding::EncodeMetric;
989 use slatedb::stats::{MetricType as SlateMetricType, ReadableStat};
990 use std::sync::Arc;
991
992 #[derive(Debug)]
993 struct MockStat {
994 metric_type: SlateMetricType,
995 }
996
997 impl ReadableStat for MockStat {
998 fn get(&self) -> i64 {
999 0
1000 }
1001
1002 fn metric_type(&self) -> SlateMetricType {
1003 self.metric_type
1004 }
1005 }
1006
1007 #[test]
1008 fn should_return_counter_when_slate_metric_type_is_counter() {
1009 let stat = Arc::new(MockStat {
1011 metric_type: SlateMetricType::Counter,
1012 });
1013 let gauge = ReadableStatGauge(stat);
1014
1015 let result = gauge.metric_type();
1017
1018 assert!(matches!(
1020 result,
1021 prometheus_client::metrics::MetricType::Counter,
1022 ));
1023 }
1024
1025 #[test]
1026 fn should_return_gauge_when_slate_metric_type_is_gauge() {
1027 let stat = Arc::new(MockStat {
1029 metric_type: SlateMetricType::Gauge,
1030 });
1031 let gauge = ReadableStatGauge(stat);
1032
1033 let result = gauge.metric_type();
1035
1036 assert!(matches!(
1038 result,
1039 prometheus_client::metrics::MetricType::Gauge,
1040 ));
1041 }
1042}