1use std::sync::Arc;
2
3use crate::storage::factory::OwnedHybridCache;
4use crate::storage::{MergeOptions, PutOptions};
5use crate::{
6 BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl,
7 storage::{
8 MergeOperator, MergeRecordOp, PutRecordOp, RecordOp, Storage, StorageSnapshot,
9 WriteOptions, WriteResult,
10 },
11};
12use async_trait::async_trait;
13use bytes::Bytes;
14use slatedb::IterationOrder;
15use slatedb::config::ScanOptions;
16use slatedb::{
17 Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
18 MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
19};
20use tokio::sync::watch;
21use tracing::warn;
22
23pub struct SlateDbMergeOperatorAdapter {
27 operator: Arc<dyn MergeOperator>,
28}
29
30impl SlateDbMergeOperatorAdapter {
31 fn new(operator: Arc<dyn MergeOperator>) -> Self {
32 Self { operator }
33 }
34}
35
36impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
37 fn merge(
38 &self,
39 key: &Bytes,
40 existing_value: Option<Bytes>,
41 value: Bytes,
42 ) -> Result<Bytes, MergeOperatorError> {
43 Ok(self.operator.merge_batch(key, existing_value, &[value]))
44 }
45
46 fn merge_batch(
47 &self,
48 key: &Bytes,
49 existing_value: Option<Bytes>,
50 operands: &[Bytes],
51 ) -> Result<Bytes, MergeOperatorError> {
52 if operands.is_empty() && existing_value.is_none() {
53 return Err(MergeOperatorError::EmptyBatch);
54 }
55 Ok(self.operator.merge_batch(key, existing_value, operands))
56 }
57}
58
59fn default_scan_options() -> ScanOptions {
61 ScanOptions {
62 durability_filter: Default::default(),
63 dirty: false,
64 read_ahead_bytes: 1024 * 1024,
65 cache_blocks: true,
66 max_fetch_tasks: 4,
67 order: IterationOrder::Ascending,
68 }
69}
70
71pub struct SlateDbStorage {
76 pub(super) db: Arc<Db>,
77 durable_tx: watch::Sender<u64>,
78 durable_bridge_abort: tokio::task::AbortHandle,
79 managed_cache: Option<OwnedHybridCache>,
82}
83
84impl SlateDbStorage {
85 pub fn new(db: Arc<Db>) -> Self {
87 Self::new_with_managed_cache(db, None)
88 }
89
90 pub(crate) fn new_with_managed_cache(
94 db: Arc<Db>,
95 managed_cache: Option<OwnedHybridCache>,
96 ) -> Self {
97 let slate_rx = db.subscribe();
98 let (durable_tx, _) = watch::channel(slate_rx.borrow().durable_seq);
99 let task = tokio::spawn({
100 let tx = durable_tx.clone();
101 async move {
102 let mut slate_rx = slate_rx;
103 while slate_rx.changed().await.is_ok() {
104 let durable_seq = slate_rx.borrow_and_update().durable_seq;
105 if tx.send(durable_seq).is_err() {
106 break;
107 }
108 }
109 }
110 });
111
112 Self {
113 db,
114 durable_tx,
115 durable_bridge_abort: task.abort_handle(),
116 managed_cache,
117 }
118 }
119
120 pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
136 SlateDbMergeOperatorAdapter::new(operator)
137 }
138}
139
140#[async_trait]
141impl StorageRead for SlateDbStorage {
142 #[tracing::instrument(level = "trace", skip_all)]
146 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
147 let value = self
148 .db
149 .get(&key)
150 .await
151 .map_err(StorageError::from_storage)?;
152
153 match value {
154 Some(v) => Ok(Some(Record::new(key, v))),
155 None => Ok(None),
156 }
157 }
158
159 #[tracing::instrument(level = "trace", skip_all)]
160 async fn scan_iter(
161 &self,
162 range: BytesRange,
163 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
164 let iter = self
165 .db
166 .scan_with_options(range, &default_scan_options())
167 .await
168 .map_err(StorageError::from_storage)?;
169 Ok(Box::new(SlateDbIterator { iter }))
170 }
171
172 async fn close(&self) -> StorageResult<()> {
173 self.durable_bridge_abort.abort();
175 self.db.close().await.map_err(StorageError::from_storage)?;
176 if let Some(cache) = &self.managed_cache
181 && let Err(e) = cache.close().await
182 {
183 warn!(error = ?e, "foyer hybrid cache close failed");
184 }
185 Ok(())
186 }
187}
188
189pub(super) struct SlateDbIterator {
190 iter: DbIterator,
191}
192
193#[async_trait]
194impl StorageIterator for SlateDbIterator {
195 #[tracing::instrument(level = "trace", skip_all)]
196 async fn next(&mut self) -> StorageResult<Option<Record>> {
197 match self.iter.next().await.map_err(StorageError::from_storage)? {
198 Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
199 None => Ok(None),
200 }
201 }
202}
203
204pub struct SlateDbStorageSnapshot {
208 snapshot: Arc<DbSnapshot>,
209}
210
211#[async_trait]
212impl StorageRead for SlateDbStorageSnapshot {
213 #[tracing::instrument(level = "trace", skip_all)]
214 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
215 let value = self
216 .snapshot
217 .get(&key)
218 .await
219 .map_err(StorageError::from_storage)?;
220
221 match value {
222 Some(v) => Ok(Some(Record::new(key, v))),
223 None => Ok(None),
224 }
225 }
226
227 #[tracing::instrument(level = "trace", skip_all)]
228 async fn scan_iter(
229 &self,
230 range: BytesRange,
231 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
232 let iter = self
233 .snapshot
234 .scan_with_options(range, &default_scan_options())
235 .await
236 .map_err(StorageError::from_storage)?;
237 Ok(Box::new(SlateDbIterator { iter }))
238 }
239}
240
241#[async_trait]
242impl StorageSnapshot for SlateDbStorageSnapshot {}
243
244#[async_trait]
245impl Storage for SlateDbStorage {
246 async fn apply_with_options(
247 &self,
248 records: Vec<RecordOp>,
249 options: WriteOptions,
250 ) -> StorageResult<WriteResult> {
251 let mut batch = WriteBatch::new();
252 for op in records {
253 match op {
254 RecordOp::Put(op) => {
255 batch.put_with_options(op.record.key, op.record.value, &op.options.into())
256 }
257 RecordOp::Merge(op) => {
258 batch.merge_with_options(op.record.key, op.record.value, &op.options.into())
259 }
260 RecordOp::Delete(key) => batch.delete(key),
261 }
262 }
263 let slate_options = SlateDbWriteOptions {
264 await_durable: options.await_durable,
265 };
266 let write_handle = self
267 .db
268 .write_with_options(batch, &slate_options)
269 .await
270 .map_err(StorageError::from_storage)?;
271 Ok(WriteResult {
272 seqnum: write_handle.seqnum(),
273 })
274 }
275
276 async fn put_with_options(
277 &self,
278 records: Vec<PutRecordOp>,
279 options: WriteOptions,
280 ) -> StorageResult<WriteResult> {
281 let mut batch = WriteBatch::new();
282 for op in records {
283 batch.put_with_options(op.record.key, op.record.value, &op.options.into());
284 }
285 let slate_options = SlateDbWriteOptions {
286 await_durable: options.await_durable,
287 };
288 let write_handle = self
289 .db
290 .write_with_options(batch, &slate_options)
291 .await
292 .map_err(StorageError::from_storage)?;
293 Ok(WriteResult {
294 seqnum: write_handle.seqnum(),
295 })
296 }
297
298 async fn merge_with_options(
299 &self,
300 records: Vec<MergeRecordOp>,
301 options: WriteOptions,
302 ) -> StorageResult<WriteResult> {
303 let mut batch = WriteBatch::new();
304 for op in records {
305 batch.merge_with_options(op.record.key, op.record.value, &op.options.into());
306 }
307 let slate_options = SlateDbWriteOptions {
308 await_durable: options.await_durable,
309 };
310 let write_handle = self
311 .db
312 .write_with_options(batch, &slate_options)
313 .await
314 .map_err(|e| {
315 let error_msg = e.to_string();
316 if error_msg.contains("merge operator") || error_msg.contains("not configured") {
317 StorageError::Storage(
318 "Merge operator not configured for this database".to_string(),
319 )
320 } else {
321 StorageError::from_storage(e)
322 }
323 })?;
324 Ok(WriteResult {
325 seqnum: write_handle.seqnum(),
326 })
327 }
328
329 fn subscribe_durable(&self) -> watch::Receiver<u64> {
330 self.durable_tx.subscribe()
331 }
332
333 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
334 let snapshot = self
335 .db
336 .snapshot()
337 .await
338 .map_err(StorageError::from_storage)?;
339 Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
340 }
341
342 async fn flush(&self) -> StorageResult<()> {
343 self.db.flush().await.map_err(StorageError::from_storage)?;
344 Ok(())
345 }
346}
347
348impl From<Ttl> for slatedb::config::Ttl {
349 fn from(value: Ttl) -> Self {
350 match value {
351 Ttl::Default => slatedb::config::Ttl::Default,
352 Ttl::NoExpiry => slatedb::config::Ttl::NoExpiry,
353 Ttl::ExpireAfter(ts) => slatedb::config::Ttl::ExpireAfter(ts),
354 }
355 }
356}
357
358impl From<PutOptions> for slatedb::config::PutOptions {
359 fn from(value: PutOptions) -> Self {
360 Self {
361 ttl: value.ttl.into(),
362 }
363 }
364}
365
366impl From<MergeOptions> for slatedb::config::MergeOptions {
367 fn from(value: MergeOptions) -> Self {
368 Self {
369 ttl: value.ttl.into(),
370 }
371 }
372}
373
374pub struct SlateDbStorageReader {
379 reader: Arc<DbReader>,
380 managed_cache: Option<OwnedHybridCache>,
383}
384
385impl SlateDbStorageReader {
386 pub fn new(reader: Arc<DbReader>) -> Self {
388 Self::new_with_managed_cache(reader, None)
389 }
390
391 pub(crate) fn new_with_managed_cache(
394 reader: Arc<DbReader>,
395 managed_cache: Option<OwnedHybridCache>,
396 ) -> Self {
397 Self {
398 reader,
399 managed_cache,
400 }
401 }
402}
403
404#[async_trait]
405impl StorageRead for SlateDbStorageReader {
406 #[tracing::instrument(level = "trace", skip_all)]
407 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
408 let value = self
409 .reader
410 .get(&key)
411 .await
412 .map_err(StorageError::from_storage)?;
413
414 match value {
415 Some(v) => Ok(Some(Record::new(key, v))),
416 None => Ok(None),
417 }
418 }
419
420 #[tracing::instrument(level = "trace", skip_all)]
421 async fn scan_iter(
422 &self,
423 range: BytesRange,
424 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
425 let iter = self
426 .reader
427 .scan_with_options(range, &default_scan_options())
428 .await
429 .map_err(StorageError::from_storage)?;
430 Ok(Box::new(SlateDbIterator { iter }))
431 }
432
433 async fn close(&self) -> StorageResult<()> {
434 self.reader
435 .close()
436 .await
437 .map_err(StorageError::from_storage)?;
438 if let Some(cache) = &self.managed_cache
441 && let Err(e) = cache.close().await
442 {
443 warn!(error = ?e, "foyer hybrid cache close failed");
444 }
445 Ok(())
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use crate::BytesRange;
453 use slatedb::DbBuilder;
454 use slatedb::config::Settings;
455 use slatedb::object_store::memory::InMemory;
456 use slatedb_common::clock::MockSystemClock;
457
458 #[tokio::test]
459 async fn should_read_data_written_by_storage_via_reader() {
460 let object_store = Arc::new(InMemory::new());
461 let path = "/test/db";
462
463 let db = DbBuilder::new(path, object_store.clone())
465 .build()
466 .await
467 .unwrap();
468 let storage = SlateDbStorage::new(Arc::new(db));
469
470 storage
471 .put(vec![
472 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
473 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
474 ])
475 .await
476 .unwrap();
477 storage.flush().await.unwrap();
478
479 let reader = DbReader::builder(path, object_store).build().await.unwrap();
481 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
482
483 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
484 assert!(record.is_some());
485 assert_eq!(record.unwrap().value, Bytes::from("value1"));
486
487 let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
488 assert!(record.is_some());
489 assert_eq!(record.unwrap().value, Bytes::from("value2"));
490
491 let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
492 assert!(record.is_none());
493
494 storage.close().await.unwrap();
495 }
496
497 #[tokio::test]
498 async fn should_scan_data_written_by_storage_via_reader() {
499 let object_store = Arc::new(InMemory::new());
500 let path = "/test/db";
501
502 let db = DbBuilder::new(path, object_store.clone())
504 .build()
505 .await
506 .unwrap();
507 let storage = SlateDbStorage::new(Arc::new(db));
508
509 storage
510 .put(vec![
511 Record::new(Bytes::from("a"), Bytes::from("1")).into(),
512 Record::new(Bytes::from("b"), Bytes::from("2")).into(),
513 Record::new(Bytes::from("c"), Bytes::from("3")).into(),
514 ])
515 .await
516 .unwrap();
517 storage.flush().await.unwrap();
518
519 let reader = DbReader::builder(path, object_store).build().await.unwrap();
521 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
522
523 let mut iter = storage_reader
524 .scan_iter(BytesRange::unbounded())
525 .await
526 .unwrap();
527 let mut results = Vec::new();
528 while let Some(record) = iter.next().await.unwrap() {
529 results.push((record.key, record.value));
530 }
531
532 assert_eq!(results.len(), 3);
533 assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
534 assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
535 assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
536
537 storage.close().await.unwrap();
538 }
539
540 #[tokio::test]
541 async fn should_coexist_writer_and_reader_without_fencing_error() {
542 let object_store = Arc::new(InMemory::new());
543 let path = "/test/db";
544
545 let db = DbBuilder::new(path, object_store.clone())
547 .build()
548 .await
549 .unwrap();
550 let storage = SlateDbStorage::new(Arc::new(db));
551
552 storage
554 .put(vec![
555 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
556 ])
557 .await
558 .unwrap();
559 storage.flush().await.unwrap();
560
561 let reader = DbReader::builder(path, object_store).build().await.unwrap();
563 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
564
565 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
567 assert!(record.is_some());
568 assert_eq!(record.unwrap().value, Bytes::from("value1"));
569
570 storage
572 .put(vec![
573 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
574 ])
575 .await
576 .unwrap();
577 storage.flush().await.unwrap();
578
579 storage.close().await.unwrap();
580 }
581
582 #[tokio::test]
583 async fn should_set_expire_ts_based_on_ttl() {
584 let object_store = Arc::new(InMemory::new());
586 let path = "/test/ttl_db";
587 let clock = Arc::new(MockSystemClock::new());
588
589 let db = DbBuilder::new(path, object_store)
590 .with_settings(Settings {
591 default_ttl: Some(30_000),
592 ..Default::default()
593 })
594 .with_system_clock(clock.clone())
595 .build()
596 .await
597 .unwrap();
598 let storage = SlateDbStorage::new(Arc::new(db));
599
600 storage
605 .put(vec![
606 PutRecordOp::new_with_options(
607 Record::new(Bytes::from("key1"), Bytes::from("value1")),
608 PutOptions {
609 ttl: Ttl::ExpireAfter(20_000),
610 },
611 ),
612 PutRecordOp::new_with_options(
613 Record::new(Bytes::from("key2"), Bytes::from("value2")),
614 PutOptions { ttl: Ttl::Default },
615 ),
616 PutRecordOp::new_with_options(
617 Record::new(Bytes::from("key3"), Bytes::from("value3")),
618 PutOptions { ttl: Ttl::NoExpiry },
619 ),
620 ])
621 .await
622 .unwrap();
623
624 let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
626 assert_eq!(kv1.expire_ts, Some(20_000));
627
628 let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
630 assert_eq!(kv2.expire_ts, Some(30_000));
631
632 let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
634 assert_eq!(kv3.expire_ts, None);
635
636 storage.close().await.unwrap();
637 }
638
639 struct ConcatMergeOperator;
641
642 impl MergeOperator for ConcatMergeOperator {
643 fn merge_batch(
644 &self,
645 _key: &Bytes,
646 existing_value: Option<Bytes>,
647 operands: &[Bytes],
648 ) -> Bytes {
649 let mut result = existing_value.unwrap_or_default().to_vec();
650 for operand in operands {
651 result.extend_from_slice(operand);
652 }
653 Bytes::from(result)
654 }
655 }
656
657 #[tokio::test]
658 async fn should_set_expire_ts_on_merge_records_based_on_ttl() {
659 let object_store = Arc::new(InMemory::new());
661 let path = "/test/merge_ttl_db";
662 let clock = Arc::new(MockSystemClock::new());
663
664 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
665 let slate_merge_op = SlateDbStorage::merge_operator_adapter(merge_op);
666 let db = DbBuilder::new(path, object_store)
667 .with_settings(Settings {
668 default_ttl: Some(30_000),
669 ..Default::default()
670 })
671 .with_system_clock(clock.clone())
672 .with_merge_operator(Arc::new(slate_merge_op))
673 .build()
674 .await
675 .unwrap();
676 let storage = SlateDbStorage::new(Arc::new(db));
677
678 storage
683 .merge(vec![
684 MergeRecordOp::new_with_ttl(
685 Record::new(Bytes::from("key1"), Bytes::from("v1")),
686 MergeOptions {
687 ttl: Ttl::ExpireAfter(20_000),
688 },
689 ),
690 MergeRecordOp::new_with_ttl(
691 Record::new(Bytes::from("key2"), Bytes::from("v2")),
692 MergeOptions { ttl: Ttl::Default },
693 ),
694 MergeRecordOp::new_with_ttl(
695 Record::new(Bytes::from("key3"), Bytes::from("v3")),
696 MergeOptions { ttl: Ttl::NoExpiry },
697 ),
698 ])
699 .await
700 .unwrap();
701
702 let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
704 assert_eq!(kv1.value, Bytes::from("v1"));
705 assert_eq!(kv1.expire_ts, Some(20_000));
706
707 let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
709 assert_eq!(kv2.value, Bytes::from("v2"));
710 assert_eq!(kv2.expire_ts, Some(30_000));
711
712 let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
714 assert_eq!(kv3.value, Bytes::from("v3"));
715 assert_eq!(kv3.expire_ts, None);
716
717 storage.close().await.unwrap();
718 }
719
720 async fn reader_can_see(path: &str, object_store: Arc<InMemory>, key: &str) -> bool {
723 reader_can_see_with_merge_op(path, object_store, key, None).await
724 }
725
726 async fn reader_can_see_with_merge_op(
727 path: &str,
728 object_store: Arc<InMemory>,
729 key: &str,
730 merge_op: Option<Arc<dyn SlateDbMergeOperator + Send + Sync>>,
731 ) -> bool {
732 let mut builder = DbReader::builder(path, object_store);
733 if let Some(op) = merge_op {
734 builder = builder.with_merge_operator(op);
735 }
736 let reader = builder.build().await.unwrap();
737 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
738 storage_reader
739 .get(Bytes::from(key.to_owned()))
740 .await
741 .unwrap()
742 .is_some()
743 }
744
745 #[tokio::test]
746 async fn put_defaults_to_not_await_durable() {
747 let object_store = Arc::new(InMemory::new());
748 let path = "/test/put_default_durability";
749
750 let db = DbBuilder::new(path, object_store.clone())
751 .build()
752 .await
753 .unwrap();
754 let storage = SlateDbStorage::new(Arc::new(db));
755
756 storage
758 .put(vec![
759 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
760 ])
761 .await
762 .unwrap();
763
764 assert!(!reader_can_see(path, object_store.clone(), "k1").await);
766
767 storage.flush().await.unwrap();
769 assert!(reader_can_see(path, object_store.clone(), "k1").await);
770
771 storage.close().await.unwrap();
772 }
773
774 #[tokio::test]
775 async fn put_with_await_durable_true_is_visible_to_reader() {
776 let object_store = Arc::new(InMemory::new());
777 let path = "/test/put_durable";
778
779 let db = DbBuilder::new(path, object_store.clone())
780 .build()
781 .await
782 .unwrap();
783 let storage = SlateDbStorage::new(Arc::new(db));
784
785 storage
787 .put_with_options(
788 vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
789 WriteOptions {
790 await_durable: true,
791 },
792 )
793 .await
794 .unwrap();
795
796 assert!(reader_can_see(path, object_store.clone(), "k1").await);
798
799 storage.close().await.unwrap();
800 }
801
802 #[tokio::test]
803 async fn apply_defaults_to_not_await_durable() {
804 let object_store = Arc::new(InMemory::new());
805 let path = "/test/apply_default_durability";
806
807 let db = DbBuilder::new(path, object_store.clone())
808 .build()
809 .await
810 .unwrap();
811 let storage = SlateDbStorage::new(Arc::new(db));
812
813 storage
815 .apply(vec![RecordOp::Put(
816 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
817 )])
818 .await
819 .unwrap();
820
821 assert!(!reader_can_see(path, object_store.clone(), "k1").await);
822
823 storage.flush().await.unwrap();
824 assert!(reader_can_see(path, object_store.clone(), "k1").await);
825
826 storage.close().await.unwrap();
827 }
828
829 #[tokio::test]
830 async fn apply_with_await_durable_true_is_visible_to_reader() {
831 let object_store = Arc::new(InMemory::new());
832 let path = "/test/apply_durable";
833
834 let db = DbBuilder::new(path, object_store.clone())
835 .build()
836 .await
837 .unwrap();
838 let storage = SlateDbStorage::new(Arc::new(db));
839
840 storage
841 .apply_with_options(
842 vec![RecordOp::Put(
843 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
844 )],
845 WriteOptions {
846 await_durable: true,
847 },
848 )
849 .await
850 .unwrap();
851
852 assert!(reader_can_see(path, object_store.clone(), "k1").await);
853
854 storage.close().await.unwrap();
855 }
856
857 #[tokio::test]
858 async fn merge_defaults_to_not_await_durable() {
859 let object_store = Arc::new(InMemory::new());
860 let path = "/test/merge_default_durability";
861
862 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
863 let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
864 let db = DbBuilder::new(path, object_store.clone())
865 .with_merge_operator(slate_merge_op.clone())
866 .build()
867 .await
868 .unwrap();
869 let storage = SlateDbStorage::new(Arc::new(db));
870
871 storage
873 .merge(vec![
874 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
875 ])
876 .await
877 .unwrap();
878
879 let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
880 Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
881 assert!(
882 !reader_can_see_with_merge_op(
883 path,
884 object_store.clone(),
885 "k1",
886 Some(reader_merge_op.clone()),
887 )
888 .await
889 );
890
891 storage.flush().await.unwrap();
892 assert!(
893 reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
894 .await
895 );
896
897 storage.close().await.unwrap();
898 }
899
900 #[tokio::test]
901 async fn merge_with_await_durable_true_is_visible_to_reader() {
902 let object_store = Arc::new(InMemory::new());
903 let path = "/test/merge_durable";
904
905 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
906 let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
907 let db = DbBuilder::new(path, object_store.clone())
908 .with_merge_operator(slate_merge_op.clone())
909 .build()
910 .await
911 .unwrap();
912 let storage = SlateDbStorage::new(Arc::new(db));
913
914 storage
915 .merge_with_options(
916 vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
917 WriteOptions {
918 await_durable: true,
919 },
920 )
921 .await
922 .unwrap();
923
924 let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
925 Arc::new(SlateDbStorage::merge_operator_adapter(merge_op));
926 assert!(
927 reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
928 .await
929 );
930
931 storage.close().await.unwrap();
932 }
933}