1use std::sync::Arc;
2
3use crate::storage::factory::OwnedHybridCache;
4use crate::storage::{MergeOptions, PutOptions};
5use crate::{
6 BytesRange, CheckpointInfo, Record, StorageError, StorageIterator, StorageRead, StorageResult,
7 Ttl,
8 storage::{
9 MergeOperator, MergeRecordOp, PutRecordOp, RecordOp, Storage, StorageSnapshot,
10 WriteOptions, WriteResult,
11 },
12};
13use async_trait::async_trait;
14use bytes::Bytes;
15use slatedb::IterationOrder;
16use slatedb::config::{CheckpointOptions, CheckpointScope, ScanOptions};
17use slatedb::{
18 Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
19 MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
20};
21use tokio::sync::watch;
22use tracing::warn;
23
24pub struct SlateDbMergeOperatorAdapter {
28 operator: Arc<dyn MergeOperator>,
29}
30
31impl SlateDbMergeOperatorAdapter {
32 fn new(operator: Arc<dyn MergeOperator>) -> Self {
33 Self { operator }
34 }
35}
36
37impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
38 fn merge(
39 &self,
40 key: &Bytes,
41 existing_value: Option<Bytes>,
42 value: Bytes,
43 ) -> Result<Bytes, MergeOperatorError> {
44 Ok(self.operator.merge_batch(key, existing_value, &[value]))
45 }
46
47 fn merge_batch(
48 &self,
49 key: &Bytes,
50 existing_value: Option<Bytes>,
51 operands: &[Bytes],
52 ) -> Result<Bytes, MergeOperatorError> {
53 if operands.is_empty() && existing_value.is_none() {
54 return Err(MergeOperatorError::EmptyBatch);
55 }
56 Ok(self.operator.merge_batch(key, existing_value, operands))
57 }
58}
59
60fn default_scan_options() -> ScanOptions {
62 ScanOptions {
63 durability_filter: Default::default(),
64 dirty: false,
65 read_ahead_bytes: 1024 * 1024,
66 cache_blocks: true,
67 max_fetch_tasks: 4,
68 order: IterationOrder::Ascending,
69 }
70}
71
72pub struct SlateDbStorage {
77 pub(super) db: Arc<Db>,
78 durable_tx: watch::Sender<u64>,
79 durable_bridge_abort: tokio::task::AbortHandle,
80 managed_cache: Option<OwnedHybridCache>,
83}
84
85impl SlateDbStorage {
86 pub fn new(db: Arc<Db>) -> Self {
88 Self::new_with_managed_cache(db, None)
89 }
90
91 pub(crate) fn new_with_managed_cache(
95 db: Arc<Db>,
96 managed_cache: Option<OwnedHybridCache>,
97 ) -> Self {
98 let slate_rx = db.subscribe();
99 let (durable_tx, _) = watch::channel(slate_rx.borrow().durable_seq);
100 let task = tokio::spawn({
101 let tx = durable_tx.clone();
102 async move {
103 let mut slate_rx = slate_rx;
104 while slate_rx.changed().await.is_ok() {
105 let durable_seq = slate_rx.borrow_and_update().durable_seq;
106 if tx.send(durable_seq).is_err() {
107 break;
108 }
109 }
110 }
111 });
112
113 Self {
114 db,
115 durable_tx,
116 durable_bridge_abort: task.abort_handle(),
117 managed_cache,
118 }
119 }
120
121 pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
137 SlateDbMergeOperatorAdapter::new(operator)
138 }
139}
140
141#[async_trait]
142impl StorageRead for SlateDbStorage {
143 #[tracing::instrument(level = "trace", skip_all)]
147 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
148 let value = self
149 .db
150 .get(&key)
151 .await
152 .map_err(StorageError::from_storage)?;
153
154 match value {
155 Some(v) => Ok(Some(Record::new(key, v))),
156 None => Ok(None),
157 }
158 }
159
160 #[tracing::instrument(level = "trace", skip_all)]
161 async fn scan_iter(
162 &self,
163 range: BytesRange,
164 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
165 let iter = self
166 .db
167 .scan_with_options(range, &default_scan_options())
168 .await
169 .map_err(StorageError::from_storage)?;
170 Ok(Box::new(SlateDbIterator { iter }))
171 }
172
173 async fn close(&self) -> StorageResult<()> {
174 self.durable_bridge_abort.abort();
176 self.db.close().await.map_err(StorageError::from_storage)?;
177 if let Some(cache) = &self.managed_cache
182 && let Err(e) = cache.close().await
183 {
184 warn!(error = ?e, "foyer hybrid cache close failed");
185 }
186 Ok(())
187 }
188}
189
190pub(super) struct SlateDbIterator {
191 iter: DbIterator,
192}
193
194#[async_trait]
195impl StorageIterator for SlateDbIterator {
196 #[tracing::instrument(level = "trace", skip_all)]
197 async fn next(&mut self) -> StorageResult<Option<Record>> {
198 match self.iter.next().await.map_err(StorageError::from_storage)? {
199 Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
200 None => Ok(None),
201 }
202 }
203}
204
205pub struct SlateDbStorageSnapshot {
209 snapshot: Arc<DbSnapshot>,
210}
211
212#[async_trait]
213impl StorageRead for SlateDbStorageSnapshot {
214 #[tracing::instrument(level = "trace", skip_all)]
215 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
216 let value = self
217 .snapshot
218 .get(&key)
219 .await
220 .map_err(StorageError::from_storage)?;
221
222 match value {
223 Some(v) => Ok(Some(Record::new(key, v))),
224 None => Ok(None),
225 }
226 }
227
228 #[tracing::instrument(level = "trace", skip_all)]
229 async fn scan_iter(
230 &self,
231 range: BytesRange,
232 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
233 let iter = self
234 .snapshot
235 .scan_with_options(range, &default_scan_options())
236 .await
237 .map_err(StorageError::from_storage)?;
238 Ok(Box::new(SlateDbIterator { iter }))
239 }
240}
241
242#[async_trait]
243impl StorageSnapshot for SlateDbStorageSnapshot {}
244
245#[async_trait]
246impl Storage for SlateDbStorage {
247 async fn apply_with_options(
248 &self,
249 records: Vec<RecordOp>,
250 options: WriteOptions,
251 ) -> StorageResult<WriteResult> {
252 let mut batch = WriteBatch::new();
253 for op in records {
254 match op {
255 RecordOp::Put(op) => {
256 batch.put_with_options(op.record.key, op.record.value, &op.options.into())
257 }
258 RecordOp::Merge(op) => {
259 batch.merge_with_options(op.record.key, op.record.value, &op.options.into())
260 }
261 RecordOp::Delete(key) => batch.delete(key),
262 }
263 }
264 let slate_options = SlateDbWriteOptions {
265 await_durable: options.await_durable,
266 };
267 let write_handle = self
268 .db
269 .write_with_options(batch, &slate_options)
270 .await
271 .map_err(StorageError::from_storage)?;
272 Ok(WriteResult {
273 seqnum: write_handle.seqnum(),
274 })
275 }
276
277 async fn put_with_options(
278 &self,
279 records: Vec<PutRecordOp>,
280 options: WriteOptions,
281 ) -> StorageResult<WriteResult> {
282 let mut batch = WriteBatch::new();
283 for op in records {
284 batch.put_with_options(op.record.key, op.record.value, &op.options.into());
285 }
286 let slate_options = SlateDbWriteOptions {
287 await_durable: options.await_durable,
288 };
289 let write_handle = self
290 .db
291 .write_with_options(batch, &slate_options)
292 .await
293 .map_err(StorageError::from_storage)?;
294 Ok(WriteResult {
295 seqnum: write_handle.seqnum(),
296 })
297 }
298
299 async fn merge_with_options(
300 &self,
301 records: Vec<MergeRecordOp>,
302 options: WriteOptions,
303 ) -> StorageResult<WriteResult> {
304 let mut batch = WriteBatch::new();
305 for op in records {
306 batch.merge_with_options(op.record.key, op.record.value, &op.options.into());
307 }
308 let slate_options = SlateDbWriteOptions {
309 await_durable: options.await_durable,
310 };
311 let write_handle = self
312 .db
313 .write_with_options(batch, &slate_options)
314 .await
315 .map_err(|e| {
316 let error_msg = e.to_string();
317 if error_msg.contains("merge operator") || error_msg.contains("not configured") {
318 StorageError::Storage(
319 "Merge operator not configured for this database".to_string(),
320 )
321 } else {
322 StorageError::from_storage(e)
323 }
324 })?;
325 Ok(WriteResult {
326 seqnum: write_handle.seqnum(),
327 })
328 }
329
330 fn subscribe_durable(&self) -> watch::Receiver<u64> {
331 self.durable_tx.subscribe()
332 }
333
334 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
335 let snapshot = self
336 .db
337 .snapshot()
338 .await
339 .map_err(StorageError::from_storage)?;
340 Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
341 }
342
343 async fn flush(&self) -> StorageResult<()> {
344 self.db.flush().await.map_err(StorageError::from_storage)?;
345 Ok(())
346 }
347
348 async fn create_checkpoint(&self) -> StorageResult<CheckpointInfo> {
349 let result = self
350 .db
351 .create_checkpoint(CheckpointScope::All, &CheckpointOptions::default())
352 .await
353 .map_err(StorageError::from_storage)?;
354 Ok(CheckpointInfo {
355 id: result.id,
356 manifest_id: result.manifest_id,
357 })
358 }
359}
360
361impl From<Ttl> for slatedb::config::Ttl {
362 fn from(value: Ttl) -> Self {
363 match value {
364 Ttl::Default => slatedb::config::Ttl::Default,
365 Ttl::NoExpiry => slatedb::config::Ttl::NoExpiry,
366 Ttl::ExpireAfter(ts) => slatedb::config::Ttl::ExpireAfter(ts),
367 Ttl::ExpireAt(ts) => slatedb::config::Ttl::ExpireAt(ts),
368 }
369 }
370}
371
372impl From<PutOptions> for slatedb::config::PutOptions {
373 fn from(value: PutOptions) -> Self {
374 Self {
375 ttl: value.ttl.into(),
376 }
377 }
378}
379
380impl From<MergeOptions> for slatedb::config::MergeOptions {
381 fn from(value: MergeOptions) -> Self {
382 Self {
383 ttl: value.ttl.into(),
384 }
385 }
386}
387
388pub struct SlateDbStorageReader {
393 reader: Arc<DbReader>,
394 managed_cache: Option<OwnedHybridCache>,
397}
398
399impl SlateDbStorageReader {
400 pub fn new(reader: Arc<DbReader>) -> Self {
402 Self::new_with_managed_cache(reader, None)
403 }
404
405 pub(crate) fn new_with_managed_cache(
408 reader: Arc<DbReader>,
409 managed_cache: Option<OwnedHybridCache>,
410 ) -> Self {
411 Self {
412 reader,
413 managed_cache,
414 }
415 }
416}
417
418#[async_trait]
419impl StorageRead for SlateDbStorageReader {
420 #[tracing::instrument(level = "trace", skip_all)]
421 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
422 let value = self
423 .reader
424 .get(&key)
425 .await
426 .map_err(StorageError::from_storage)?;
427
428 match value {
429 Some(v) => Ok(Some(Record::new(key, v))),
430 None => Ok(None),
431 }
432 }
433
434 #[tracing::instrument(level = "trace", skip_all)]
435 async fn scan_iter(
436 &self,
437 range: BytesRange,
438 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
439 let iter = self
440 .reader
441 .scan_with_options(range, &default_scan_options())
442 .await
443 .map_err(StorageError::from_storage)?;
444 Ok(Box::new(SlateDbIterator { iter }))
445 }
446
447 async fn close(&self) -> StorageResult<()> {
448 self.reader
449 .close()
450 .await
451 .map_err(StorageError::from_storage)?;
452 if let Some(cache) = &self.managed_cache
455 && let Err(e) = cache.close().await
456 {
457 warn!(error = ?e, "foyer hybrid cache close failed");
458 }
459 Ok(())
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466 use crate::BytesRange;
467 use slatedb::DbBuilder;
468 use slatedb::config::Settings;
469 use slatedb::object_store::memory::InMemory;
470 use slatedb_common::clock::MockSystemClock;
471
472 #[tokio::test]
473 async fn should_read_data_written_by_storage_via_reader() {
474 let object_store = Arc::new(InMemory::new());
475 let path = "/test/db";
476
477 let db = DbBuilder::new(path, object_store.clone())
479 .build()
480 .await
481 .unwrap();
482 let storage = SlateDbStorage::new(Arc::new(db));
483
484 storage
485 .put(vec![
486 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
487 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
488 ])
489 .await
490 .unwrap();
491 storage.flush().await.unwrap();
492
493 let reader = DbReader::builder(path, object_store).build().await.unwrap();
495 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
496
497 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
498 assert!(record.is_some());
499 assert_eq!(record.unwrap().value, Bytes::from("value1"));
500
501 let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
502 assert!(record.is_some());
503 assert_eq!(record.unwrap().value, Bytes::from("value2"));
504
505 let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
506 assert!(record.is_none());
507
508 storage.close().await.unwrap();
509 }
510
511 #[tokio::test]
512 async fn should_scan_data_written_by_storage_via_reader() {
513 let object_store = Arc::new(InMemory::new());
514 let path = "/test/db";
515
516 let db = DbBuilder::new(path, object_store.clone())
518 .build()
519 .await
520 .unwrap();
521 let storage = SlateDbStorage::new(Arc::new(db));
522
523 storage
524 .put(vec![
525 Record::new(Bytes::from("a"), Bytes::from("1")).into(),
526 Record::new(Bytes::from("b"), Bytes::from("2")).into(),
527 Record::new(Bytes::from("c"), Bytes::from("3")).into(),
528 ])
529 .await
530 .unwrap();
531 storage.flush().await.unwrap();
532
533 let reader = DbReader::builder(path, object_store).build().await.unwrap();
535 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
536
537 let mut iter = storage_reader
538 .scan_iter(BytesRange::unbounded())
539 .await
540 .unwrap();
541 let mut results = Vec::new();
542 while let Some(record) = iter.next().await.unwrap() {
543 results.push((record.key, record.value));
544 }
545
546 assert_eq!(results.len(), 3);
547 assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
548 assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
549 assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
550
551 storage.close().await.unwrap();
552 }
553
554 #[tokio::test]
555 async fn should_coexist_writer_and_reader_without_fencing_error() {
556 let object_store = Arc::new(InMemory::new());
557 let path = "/test/db";
558
559 let db = DbBuilder::new(path, object_store.clone())
561 .build()
562 .await
563 .unwrap();
564 let storage = SlateDbStorage::new(Arc::new(db));
565
566 storage
568 .put(vec![
569 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
570 ])
571 .await
572 .unwrap();
573 storage.flush().await.unwrap();
574
575 let reader = DbReader::builder(path, object_store).build().await.unwrap();
577 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
578
579 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
581 assert!(record.is_some());
582 assert_eq!(record.unwrap().value, Bytes::from("value1"));
583
584 storage
586 .put(vec![
587 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
588 ])
589 .await
590 .unwrap();
591 storage.flush().await.unwrap();
592
593 storage.close().await.unwrap();
594 }
595
596 #[tokio::test]
597 async fn should_set_expire_ts_based_on_ttl() {
598 let object_store = Arc::new(InMemory::new());
600 let path = "/test/ttl_db";
601 let clock = Arc::new(MockSystemClock::new());
602
603 let db = DbBuilder::new(path, object_store)
604 .with_settings(Settings {
605 default_ttl: Some(30_000),
606 ..Default::default()
607 })
608 .with_system_clock(clock.clone())
609 .build()
610 .await
611 .unwrap();
612 let storage = SlateDbStorage::new(Arc::new(db));
613
614 storage
619 .put(vec![
620 PutRecordOp::new_with_options(
621 Record::new(Bytes::from("key1"), Bytes::from("value1")),
622 PutOptions {
623 ttl: Ttl::ExpireAfter(20_000),
624 },
625 ),
626 PutRecordOp::new_with_options(
627 Record::new(Bytes::from("key2"), Bytes::from("value2")),
628 PutOptions { ttl: Ttl::Default },
629 ),
630 PutRecordOp::new_with_options(
631 Record::new(Bytes::from("key3"), Bytes::from("value3")),
632 PutOptions { ttl: Ttl::NoExpiry },
633 ),
634 ])
635 .await
636 .unwrap();
637
638 let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
640 assert_eq!(kv1.expire_ts, Some(20_000));
641
642 let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
644 assert_eq!(kv2.expire_ts, Some(30_000));
645
646 let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
648 assert_eq!(kv3.expire_ts, None);
649
650 storage.close().await.unwrap();
651 }
652
653 struct ConcatMergeOperator;
655
656 impl MergeOperator for ConcatMergeOperator {
657 fn merge_batch(
658 &self,
659 _key: &Bytes,
660 existing_value: Option<Bytes>,
661 operands: &[Bytes],
662 ) -> Bytes {
663 let mut result = existing_value.unwrap_or_default().to_vec();
664 for operand in operands {
665 result.extend_from_slice(operand);
666 }
667 Bytes::from(result)
668 }
669 }
670
671 #[tokio::test]
672 async fn should_set_expire_ts_on_merge_records_based_on_ttl() {
673 let object_store = Arc::new(InMemory::new());
675 let path = "/test/merge_ttl_db";
676 let clock = Arc::new(MockSystemClock::new());
677
678 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
679 let slate_merge_op = SlateDbStorage::merge_operator_adapter(merge_op);
680 let db = DbBuilder::new(path, object_store)
681 .with_settings(Settings {
682 default_ttl: Some(30_000),
683 ..Default::default()
684 })
685 .with_system_clock(clock.clone())
686 .with_merge_operator(Arc::new(slate_merge_op))
687 .build()
688 .await
689 .unwrap();
690 let storage = SlateDbStorage::new(Arc::new(db));
691
692 storage
697 .merge(vec![
698 MergeRecordOp::new_with_ttl(
699 Record::new(Bytes::from("key1"), Bytes::from("v1")),
700 MergeOptions {
701 ttl: Ttl::ExpireAfter(20_000),
702 },
703 ),
704 MergeRecordOp::new_with_ttl(
705 Record::new(Bytes::from("key2"), Bytes::from("v2")),
706 MergeOptions { ttl: Ttl::Default },
707 ),
708 MergeRecordOp::new_with_ttl(
709 Record::new(Bytes::from("key3"), Bytes::from("v3")),
710 MergeOptions { ttl: Ttl::NoExpiry },
711 ),
712 ])
713 .await
714 .unwrap();
715
716 let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
718 assert_eq!(kv1.value, Bytes::from("v1"));
719 assert_eq!(kv1.expire_ts, Some(20_000));
720
721 let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
723 assert_eq!(kv2.value, Bytes::from("v2"));
724 assert_eq!(kv2.expire_ts, Some(30_000));
725
726 let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
728 assert_eq!(kv3.value, Bytes::from("v3"));
729 assert_eq!(kv3.expire_ts, None);
730
731 storage.close().await.unwrap();
732 }
733
734 async fn reader_can_see(path: &str, object_store: Arc<InMemory>, key: &str) -> bool {
737 reader_can_see_with_merge_op(path, object_store, key, None).await
738 }
739
740 async fn reader_can_see_with_merge_op(
741 path: &str,
742 object_store: Arc<InMemory>,
743 key: &str,
744 merge_op: Option<Arc<dyn SlateDbMergeOperator + Send + Sync>>,
745 ) -> bool {
746 let mut builder = DbReader::builder(path, object_store);
747 if let Some(op) = merge_op {
748 builder = builder.with_merge_operator(op);
749 }
750 let reader = builder.build().await.unwrap();
751 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
752 storage_reader
753 .get(Bytes::from(key.to_owned()))
754 .await
755 .unwrap()
756 .is_some()
757 }
758
759 #[tokio::test]
760 async fn put_defaults_to_not_await_durable() {
761 let object_store = Arc::new(InMemory::new());
762 let path = "/test/put_default_durability";
763
764 let db = DbBuilder::new(path, object_store.clone())
765 .build()
766 .await
767 .unwrap();
768 let storage = SlateDbStorage::new(Arc::new(db));
769
770 storage
772 .put(vec![
773 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
774 ])
775 .await
776 .unwrap();
777
778 assert!(!reader_can_see(path, object_store.clone(), "k1").await);
780
781 storage.flush().await.unwrap();
783 assert!(reader_can_see(path, object_store.clone(), "k1").await);
784
785 storage.close().await.unwrap();
786 }
787
788 #[tokio::test]
789 async fn put_with_await_durable_true_is_visible_to_reader() {
790 let object_store = Arc::new(InMemory::new());
791 let path = "/test/put_durable";
792
793 let db = DbBuilder::new(path, object_store.clone())
794 .build()
795 .await
796 .unwrap();
797 let storage = SlateDbStorage::new(Arc::new(db));
798
799 storage
801 .put_with_options(
802 vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
803 WriteOptions {
804 await_durable: true,
805 },
806 )
807 .await
808 .unwrap();
809
810 assert!(reader_can_see(path, object_store.clone(), "k1").await);
812
813 storage.close().await.unwrap();
814 }
815
816 #[tokio::test]
817 async fn apply_defaults_to_not_await_durable() {
818 let object_store = Arc::new(InMemory::new());
819 let path = "/test/apply_default_durability";
820
821 let db = DbBuilder::new(path, object_store.clone())
822 .build()
823 .await
824 .unwrap();
825 let storage = SlateDbStorage::new(Arc::new(db));
826
827 storage
829 .apply(vec![RecordOp::Put(
830 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
831 )])
832 .await
833 .unwrap();
834
835 assert!(!reader_can_see(path, object_store.clone(), "k1").await);
836
837 storage.flush().await.unwrap();
838 assert!(reader_can_see(path, object_store.clone(), "k1").await);
839
840 storage.close().await.unwrap();
841 }
842
843 #[tokio::test]
844 async fn apply_with_await_durable_true_is_visible_to_reader() {
845 let object_store = Arc::new(InMemory::new());
846 let path = "/test/apply_durable";
847
848 let db = DbBuilder::new(path, object_store.clone())
849 .build()
850 .await
851 .unwrap();
852 let storage = SlateDbStorage::new(Arc::new(db));
853
854 storage
855 .apply_with_options(
856 vec![RecordOp::Put(
857 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
858 )],
859 WriteOptions {
860 await_durable: true,
861 },
862 )
863 .await
864 .unwrap();
865
866 assert!(reader_can_see(path, object_store.clone(), "k1").await);
867
868 storage.close().await.unwrap();
869 }
870
871 #[tokio::test]
872 async fn merge_defaults_to_not_await_durable() {
873 let object_store = Arc::new(InMemory::new());
874 let path = "/test/merge_default_durability";
875
876 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
877 let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
878 let db = DbBuilder::new(path, object_store.clone())
879 .with_merge_operator(slate_merge_op.clone())
880 .build()
881 .await
882 .unwrap();
883 let storage = SlateDbStorage::new(Arc::new(db));
884
885 storage
887 .merge(vec![
888 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
889 ])
890 .await
891 .unwrap();
892
893 let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
894 Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
895 assert!(
896 !reader_can_see_with_merge_op(
897 path,
898 object_store.clone(),
899 "k1",
900 Some(reader_merge_op.clone()),
901 )
902 .await
903 );
904
905 storage.flush().await.unwrap();
906 assert!(
907 reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
908 .await
909 );
910
911 storage.close().await.unwrap();
912 }
913
914 #[tokio::test]
915 async fn merge_with_await_durable_true_is_visible_to_reader() {
916 let object_store = Arc::new(InMemory::new());
917 let path = "/test/merge_durable";
918
919 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
920 let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
921 let db = DbBuilder::new(path, object_store.clone())
922 .with_merge_operator(slate_merge_op.clone())
923 .build()
924 .await
925 .unwrap();
926 let storage = SlateDbStorage::new(Arc::new(db));
927
928 storage
929 .merge_with_options(
930 vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
931 WriteOptions {
932 await_durable: true,
933 },
934 )
935 .await
936 .unwrap();
937
938 let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
939 Arc::new(SlateDbStorage::merge_operator_adapter(merge_op));
940 assert!(
941 reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
942 .await
943 );
944
945 storage.close().await.unwrap();
946 }
947}