1use std::sync::Arc;
2
3use crate::storage::factory::OwnedHybridCache;
4use crate::storage::{MergeOptions, PutOptions};
5use crate::{
6 BytesRange, CheckpointInfo, Record, StorageError, StorageIterator, StorageRead, StorageResult,
7 Ttl,
8 storage::{
9 MergeOperator, MergeRecordOp, PutRecordOp, RecordOp, Storage, StorageSnapshot,
10 WriteOptions, WriteResult,
11 },
12};
13use async_trait::async_trait;
14use bytes::Bytes;
15use slatedb::IterationOrder;
16use slatedb::config::{CheckpointOptions, CheckpointScope, ScanOptions};
17use slatedb::{
18 Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
19 MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
20};
21use tokio::sync::watch;
22use tracing::warn;
23
24pub struct SlateDbMergeOperatorAdapter {
28 operator: Arc<dyn MergeOperator>,
29}
30
31impl SlateDbMergeOperatorAdapter {
32 fn new(operator: Arc<dyn MergeOperator>) -> Self {
33 Self { operator }
34 }
35}
36
37impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
38 fn merge(
39 &self,
40 key: &Bytes,
41 existing_value: Option<Bytes>,
42 value: Bytes,
43 ) -> Result<Bytes, MergeOperatorError> {
44 Ok(self.operator.merge_batch(key, existing_value, &[value]))
45 }
46
47 fn merge_batch(
48 &self,
49 key: &Bytes,
50 existing_value: Option<Bytes>,
51 operands: &[Bytes],
52 ) -> Result<Bytes, MergeOperatorError> {
53 if operands.is_empty() && existing_value.is_none() {
54 return Err(MergeOperatorError::EmptyBatch);
55 }
56 Ok(self.operator.merge_batch(key, existing_value, operands))
57 }
58}
59
60fn default_scan_options() -> ScanOptions {
62 ScanOptions {
63 durability_filter: Default::default(),
64 dirty: false,
65 read_ahead_bytes: 1024 * 1024,
66 cache_blocks: true,
67 max_fetch_tasks: 4,
68 order: IterationOrder::Ascending,
69 filter_context: None,
70 }
71}
72
73pub struct SlateDbStorage {
78 pub(super) db: Arc<Db>,
79 durable_tx: watch::Sender<u64>,
80 durable_bridge_abort: tokio::task::AbortHandle,
81 managed_cache: Option<OwnedHybridCache>,
84}
85
86impl SlateDbStorage {
87 pub fn new(db: Arc<Db>) -> Self {
89 Self::new_with_managed_cache(db, None)
90 }
91
92 pub(crate) fn new_with_managed_cache(
96 db: Arc<Db>,
97 managed_cache: Option<OwnedHybridCache>,
98 ) -> Self {
99 let slate_rx = db.subscribe();
100 let (durable_tx, _) = watch::channel(slate_rx.borrow().durable_seq);
101 let task = tokio::spawn({
102 let tx = durable_tx.clone();
103 async move {
104 let mut slate_rx = slate_rx;
105 while slate_rx.changed().await.is_ok() {
106 let durable_seq = slate_rx.borrow_and_update().durable_seq;
107 tx.send_replace(durable_seq);
114 }
115 }
116 });
117
118 Self {
119 db,
120 durable_tx,
121 durable_bridge_abort: task.abort_handle(),
122 managed_cache,
123 }
124 }
125
126 pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
142 SlateDbMergeOperatorAdapter::new(operator)
143 }
144}
145
146#[async_trait]
147impl StorageRead for SlateDbStorage {
148 #[tracing::instrument(level = "trace", skip_all)]
152 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
153 let value = self
154 .db
155 .get(&key)
156 .await
157 .map_err(StorageError::from_storage)?;
158
159 match value {
160 Some(v) => Ok(Some(Record::new(key, v))),
161 None => Ok(None),
162 }
163 }
164
165 #[tracing::instrument(level = "trace", skip_all)]
166 async fn scan_iter(
167 &self,
168 range: BytesRange,
169 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
170 let iter = self
171 .db
172 .scan_with_options(range, &default_scan_options())
173 .await
174 .map_err(StorageError::from_storage)?;
175 Ok(Box::new(SlateDbIterator { iter }))
176 }
177
178 async fn close(&self) -> StorageResult<()> {
179 self.durable_bridge_abort.abort();
181 self.db.close().await.map_err(StorageError::from_storage)?;
182 if let Some(cache) = &self.managed_cache
187 && let Err(e) = cache.close().await
188 {
189 warn!(error = ?e, "foyer hybrid cache close failed");
190 }
191 Ok(())
192 }
193}
194
195pub(super) struct SlateDbIterator {
196 iter: DbIterator,
197}
198
199#[async_trait]
200impl StorageIterator for SlateDbIterator {
201 #[tracing::instrument(level = "trace", skip_all)]
202 async fn next(&mut self) -> StorageResult<Option<Record>> {
203 match self.iter.next().await.map_err(StorageError::from_storage)? {
204 Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
205 None => Ok(None),
206 }
207 }
208}
209
210pub struct SlateDbStorageSnapshot {
214 snapshot: Arc<DbSnapshot>,
215}
216
217#[async_trait]
218impl StorageRead for SlateDbStorageSnapshot {
219 #[tracing::instrument(level = "trace", skip_all)]
220 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
221 let value = self
222 .snapshot
223 .get(&key)
224 .await
225 .map_err(StorageError::from_storage)?;
226
227 match value {
228 Some(v) => Ok(Some(Record::new(key, v))),
229 None => Ok(None),
230 }
231 }
232
233 #[tracing::instrument(level = "trace", skip_all)]
234 async fn scan_iter(
235 &self,
236 range: BytesRange,
237 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
238 let iter = self
239 .snapshot
240 .scan_with_options(range, &default_scan_options())
241 .await
242 .map_err(StorageError::from_storage)?;
243 Ok(Box::new(SlateDbIterator { iter }))
244 }
245}
246
247#[async_trait]
248impl StorageSnapshot for SlateDbStorageSnapshot {}
249
250#[async_trait]
251impl Storage for SlateDbStorage {
252 async fn apply_with_options(
253 &self,
254 records: Vec<RecordOp>,
255 options: WriteOptions,
256 ) -> StorageResult<WriteResult> {
257 let mut batch = WriteBatch::new();
258 for op in records {
259 match op {
260 RecordOp::Put(op) => {
261 batch.put_with_options(op.record.key, op.record.value, &op.options.into())
262 }
263 RecordOp::Merge(op) => {
264 batch.merge_with_options(op.record.key, op.record.value, &op.options.into())
265 }
266 RecordOp::Delete(key) => batch.delete(key),
267 }
268 }
269 let slate_options = SlateDbWriteOptions {
270 await_durable: options.await_durable,
271 ..SlateDbWriteOptions::default()
272 };
273 let write_handle = self
274 .db
275 .write_with_options(batch, &slate_options)
276 .await
277 .map_err(StorageError::from_storage)?;
278 Ok(WriteResult {
279 seqnum: write_handle.seqnum(),
280 })
281 }
282
283 async fn put_with_options(
284 &self,
285 records: Vec<PutRecordOp>,
286 options: WriteOptions,
287 ) -> StorageResult<WriteResult> {
288 let mut batch = WriteBatch::new();
289 for op in records {
290 batch.put_with_options(op.record.key, op.record.value, &op.options.into());
291 }
292 let slate_options = SlateDbWriteOptions {
293 await_durable: options.await_durable,
294 ..SlateDbWriteOptions::default()
295 };
296 let write_handle = self
297 .db
298 .write_with_options(batch, &slate_options)
299 .await
300 .map_err(StorageError::from_storage)?;
301 Ok(WriteResult {
302 seqnum: write_handle.seqnum(),
303 })
304 }
305
306 async fn merge_with_options(
307 &self,
308 records: Vec<MergeRecordOp>,
309 options: WriteOptions,
310 ) -> StorageResult<WriteResult> {
311 let mut batch = WriteBatch::new();
312 for op in records {
313 batch.merge_with_options(op.record.key, op.record.value, &op.options.into());
314 }
315 let slate_options = SlateDbWriteOptions {
316 await_durable: options.await_durable,
317 ..SlateDbWriteOptions::default()
318 };
319 let write_handle = self
320 .db
321 .write_with_options(batch, &slate_options)
322 .await
323 .map_err(|e| {
324 let error_msg = e.to_string();
325 if error_msg.contains("merge operator") || error_msg.contains("not configured") {
326 StorageError::Storage(
327 "Merge operator not configured for this database".to_string(),
328 )
329 } else {
330 StorageError::from_storage(e)
331 }
332 })?;
333 Ok(WriteResult {
334 seqnum: write_handle.seqnum(),
335 })
336 }
337
338 fn subscribe_durable(&self) -> watch::Receiver<u64> {
339 self.durable_tx.subscribe()
340 }
341
342 async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
343 let snapshot = self
344 .db
345 .snapshot()
346 .await
347 .map_err(StorageError::from_storage)?;
348 Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
349 }
350
351 async fn flush(&self) -> StorageResult<()> {
352 self.db.flush().await.map_err(StorageError::from_storage)?;
353 Ok(())
354 }
355
356 async fn create_checkpoint(&self) -> StorageResult<CheckpointInfo> {
357 let result = self
358 .db
359 .create_checkpoint(CheckpointScope::All, &CheckpointOptions::default())
360 .await
361 .map_err(StorageError::from_storage)?;
362 Ok(CheckpointInfo {
363 id: result.id,
364 manifest_id: result.manifest_id,
365 })
366 }
367}
368
369impl From<Ttl> for slatedb::config::Ttl {
370 fn from(value: Ttl) -> Self {
371 match value {
372 Ttl::Default => slatedb::config::Ttl::Default,
373 Ttl::NoExpiry => slatedb::config::Ttl::NoExpiry,
374 Ttl::ExpireAfter(ts) => slatedb::config::Ttl::ExpireAfter(ts),
375 Ttl::ExpireAt(ts) => slatedb::config::Ttl::ExpireAt(ts),
376 }
377 }
378}
379
380impl From<PutOptions> for slatedb::config::PutOptions {
381 fn from(value: PutOptions) -> Self {
382 Self {
383 ttl: value.ttl.into(),
384 }
385 }
386}
387
388impl From<MergeOptions> for slatedb::config::MergeOptions {
389 fn from(value: MergeOptions) -> Self {
390 Self {
391 ttl: value.ttl.into(),
392 }
393 }
394}
395
396pub struct SlateDbStorageReader {
401 reader: Arc<DbReader>,
402 managed_cache: Option<OwnedHybridCache>,
405}
406
407impl SlateDbStorageReader {
408 pub fn new(reader: Arc<DbReader>) -> Self {
410 Self::new_with_managed_cache(reader, None)
411 }
412
413 pub(crate) fn new_with_managed_cache(
416 reader: Arc<DbReader>,
417 managed_cache: Option<OwnedHybridCache>,
418 ) -> Self {
419 Self {
420 reader,
421 managed_cache,
422 }
423 }
424}
425
426#[async_trait]
427impl StorageRead for SlateDbStorageReader {
428 #[tracing::instrument(level = "trace", skip_all)]
429 async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
430 let value = self
431 .reader
432 .get(&key)
433 .await
434 .map_err(StorageError::from_storage)?;
435
436 match value {
437 Some(v) => Ok(Some(Record::new(key, v))),
438 None => Ok(None),
439 }
440 }
441
442 #[tracing::instrument(level = "trace", skip_all)]
443 async fn scan_iter(
444 &self,
445 range: BytesRange,
446 ) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
447 let iter = self
448 .reader
449 .scan_with_options(range, &default_scan_options())
450 .await
451 .map_err(StorageError::from_storage)?;
452 Ok(Box::new(SlateDbIterator { iter }))
453 }
454
455 async fn close(&self) -> StorageResult<()> {
456 self.reader
457 .close()
458 .await
459 .map_err(StorageError::from_storage)?;
460 if let Some(cache) = &self.managed_cache
463 && let Err(e) = cache.close().await
464 {
465 warn!(error = ?e, "foyer hybrid cache close failed");
466 }
467 Ok(())
468 }
469}
470
471#[cfg(test)]
472mod tests {
473 use super::*;
474 use crate::BytesRange;
475 use slatedb::DbBuilder;
476 use slatedb::config::Settings;
477 use slatedb::object_store::memory::InMemory;
478 use slatedb_common::clock::MockSystemClock;
479
480 #[tokio::test]
481 async fn should_read_data_written_by_storage_via_reader() {
482 let object_store = Arc::new(InMemory::new());
483 let path = "/test/db";
484
485 let db = DbBuilder::new(path, object_store.clone())
487 .build()
488 .await
489 .unwrap();
490 let storage = SlateDbStorage::new(Arc::new(db));
491
492 storage
493 .put(vec![
494 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
495 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
496 ])
497 .await
498 .unwrap();
499 storage.flush().await.unwrap();
500
501 let reader = DbReader::builder(path, object_store.clone())
503 .build()
504 .await
505 .unwrap();
506 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
507
508 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
509 assert!(record.is_some());
510 assert_eq!(record.unwrap().value, Bytes::from("value1"));
511
512 let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
513 assert!(record.is_some());
514 assert_eq!(record.unwrap().value, Bytes::from("value2"));
515
516 let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
517 assert!(record.is_none());
518
519 storage.close().await.unwrap();
520 }
521
522 #[tokio::test]
523 async fn should_scan_data_written_by_storage_via_reader() {
524 let object_store = Arc::new(InMemory::new());
525 let path = "/test/db";
526
527 let db = DbBuilder::new(path, object_store.clone())
529 .build()
530 .await
531 .unwrap();
532 let storage = SlateDbStorage::new(Arc::new(db));
533
534 storage
535 .put(vec![
536 Record::new(Bytes::from("a"), Bytes::from("1")).into(),
537 Record::new(Bytes::from("b"), Bytes::from("2")).into(),
538 Record::new(Bytes::from("c"), Bytes::from("3")).into(),
539 ])
540 .await
541 .unwrap();
542 storage.flush().await.unwrap();
543
544 let reader = DbReader::builder(path, object_store.clone())
546 .build()
547 .await
548 .unwrap();
549 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
550
551 let mut iter = storage_reader
552 .scan_iter(BytesRange::unbounded())
553 .await
554 .unwrap();
555 let mut results = Vec::new();
556 while let Some(record) = iter.next().await.unwrap() {
557 results.push((record.key, record.value));
558 }
559
560 assert_eq!(results.len(), 3);
561 assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
562 assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
563 assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
564
565 storage.close().await.unwrap();
566 }
567
568 #[tokio::test]
569 async fn should_coexist_writer_and_reader_without_fencing_error() {
570 let object_store = Arc::new(InMemory::new());
571 let path = "/test/db";
572
573 let db = DbBuilder::new(path, object_store.clone())
575 .build()
576 .await
577 .unwrap();
578 let storage = SlateDbStorage::new(Arc::new(db));
579
580 storage
582 .put(vec![
583 Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
584 ])
585 .await
586 .unwrap();
587 storage.flush().await.unwrap();
588
589 let reader = DbReader::builder(path, object_store.clone())
591 .build()
592 .await
593 .unwrap();
594 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
595
596 let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
598 assert!(record.is_some());
599 assert_eq!(record.unwrap().value, Bytes::from("value1"));
600
601 storage
603 .put(vec![
604 Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
605 ])
606 .await
607 .unwrap();
608 storage.flush().await.unwrap();
609
610 storage.close().await.unwrap();
611 }
612
613 #[tokio::test]
614 async fn should_set_expire_ts_based_on_ttl() {
615 let object_store = Arc::new(InMemory::new());
617 let path = "/test/ttl_db";
618 let clock = Arc::new(MockSystemClock::new());
619
620 let db = DbBuilder::new(path, object_store.clone())
621 .with_settings(Settings {
622 default_ttl: Some(30_000),
623 ..Default::default()
624 })
625 .with_system_clock(clock.clone())
626 .build()
627 .await
628 .unwrap();
629 let storage = SlateDbStorage::new(Arc::new(db));
630
631 storage
636 .put(vec![
637 PutRecordOp::new_with_options(
638 Record::new(Bytes::from("key1"), Bytes::from("value1")),
639 PutOptions {
640 ttl: Ttl::ExpireAfter(20_000),
641 },
642 ),
643 PutRecordOp::new_with_options(
644 Record::new(Bytes::from("key2"), Bytes::from("value2")),
645 PutOptions { ttl: Ttl::Default },
646 ),
647 PutRecordOp::new_with_options(
648 Record::new(Bytes::from("key3"), Bytes::from("value3")),
649 PutOptions { ttl: Ttl::NoExpiry },
650 ),
651 ])
652 .await
653 .unwrap();
654
655 let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
657 assert_eq!(kv1.expire_ts, Some(20_000));
658
659 let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
661 assert_eq!(kv2.expire_ts, Some(30_000));
662
663 let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
665 assert_eq!(kv3.expire_ts, None);
666
667 storage.close().await.unwrap();
668 }
669
670 struct ConcatMergeOperator;
672
673 impl MergeOperator for ConcatMergeOperator {
674 fn merge_batch(
675 &self,
676 _key: &Bytes,
677 existing_value: Option<Bytes>,
678 operands: &[Bytes],
679 ) -> Bytes {
680 let mut result = existing_value.unwrap_or_default().to_vec();
681 for operand in operands {
682 result.extend_from_slice(operand);
683 }
684 Bytes::from(result)
685 }
686 }
687
688 #[tokio::test]
689 async fn should_set_expire_ts_on_merge_records_based_on_ttl() {
690 let object_store = Arc::new(InMemory::new());
692 let path = "/test/merge_ttl_db";
693 let clock = Arc::new(MockSystemClock::new());
694
695 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
696 let slate_merge_op = SlateDbStorage::merge_operator_adapter(merge_op);
697 let db = DbBuilder::new(path, object_store.clone())
698 .with_settings(Settings {
699 default_ttl: Some(30_000),
700 ..Default::default()
701 })
702 .with_system_clock(clock.clone())
703 .with_merge_operator(Arc::new(slate_merge_op))
704 .build()
705 .await
706 .unwrap();
707 let storage = SlateDbStorage::new(Arc::new(db));
708
709 storage
714 .merge(vec![
715 MergeRecordOp::new_with_ttl(
716 Record::new(Bytes::from("key1"), Bytes::from("v1")),
717 MergeOptions {
718 ttl: Ttl::ExpireAfter(20_000),
719 },
720 ),
721 MergeRecordOp::new_with_ttl(
722 Record::new(Bytes::from("key2"), Bytes::from("v2")),
723 MergeOptions { ttl: Ttl::Default },
724 ),
725 MergeRecordOp::new_with_ttl(
726 Record::new(Bytes::from("key3"), Bytes::from("v3")),
727 MergeOptions { ttl: Ttl::NoExpiry },
728 ),
729 ])
730 .await
731 .unwrap();
732
733 let kv1 = storage.db.get_key_value(b"key1").await.unwrap().unwrap();
735 assert_eq!(kv1.value, Bytes::from("v1"));
736 assert_eq!(kv1.expire_ts, Some(20_000));
737
738 let kv2 = storage.db.get_key_value(b"key2").await.unwrap().unwrap();
740 assert_eq!(kv2.value, Bytes::from("v2"));
741 assert_eq!(kv2.expire_ts, Some(30_000));
742
743 let kv3 = storage.db.get_key_value(b"key3").await.unwrap().unwrap();
745 assert_eq!(kv3.value, Bytes::from("v3"));
746 assert_eq!(kv3.expire_ts, None);
747
748 storage.close().await.unwrap();
749 }
750
751 async fn reader_can_see(path: &str, object_store: Arc<InMemory>, key: &str) -> bool {
754 reader_can_see_with_merge_op(path, object_store, key, None).await
755 }
756
757 async fn reader_can_see_with_merge_op(
758 path: &str,
759 object_store: Arc<InMemory>,
760 key: &str,
761 merge_op: Option<Arc<dyn SlateDbMergeOperator + Send + Sync>>,
762 ) -> bool {
763 let mut builder = DbReader::builder(path, object_store);
764 if let Some(op) = merge_op {
765 builder = builder.with_merge_operator(op);
766 }
767 let reader = builder.build().await.unwrap();
768 let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
769 storage_reader
770 .get(Bytes::from(key.to_owned()))
771 .await
772 .unwrap()
773 .is_some()
774 }
775
776 #[tokio::test]
777 async fn put_defaults_to_not_await_durable() {
778 let object_store = Arc::new(InMemory::new());
779 let path = "/test/put_default_durability";
780
781 let db = DbBuilder::new(path, object_store.clone())
782 .build()
783 .await
784 .unwrap();
785 let storage = SlateDbStorage::new(Arc::new(db));
786
787 storage
789 .put(vec![
790 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
791 ])
792 .await
793 .unwrap();
794
795 assert!(!reader_can_see(path, object_store.clone(), "k1").await);
797
798 storage.flush().await.unwrap();
800 assert!(reader_can_see(path, object_store.clone(), "k1").await);
801
802 storage.close().await.unwrap();
803 }
804
805 #[tokio::test]
806 async fn put_with_await_durable_true_is_visible_to_reader() {
807 let object_store = Arc::new(InMemory::new());
808 let path = "/test/put_durable";
809
810 let db = DbBuilder::new(path, object_store.clone())
811 .build()
812 .await
813 .unwrap();
814 let storage = SlateDbStorage::new(Arc::new(db));
815
816 storage
818 .put_with_options(
819 vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
820 WriteOptions {
821 await_durable: true,
822 },
823 )
824 .await
825 .unwrap();
826
827 assert!(reader_can_see(path, object_store.clone(), "k1").await);
829
830 storage.close().await.unwrap();
831 }
832
833 #[tokio::test]
834 async fn apply_defaults_to_not_await_durable() {
835 let object_store = Arc::new(InMemory::new());
836 let path = "/test/apply_default_durability";
837
838 let db = DbBuilder::new(path, object_store.clone())
839 .build()
840 .await
841 .unwrap();
842 let storage = SlateDbStorage::new(Arc::new(db));
843
844 storage
846 .apply(vec![RecordOp::Put(
847 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
848 )])
849 .await
850 .unwrap();
851
852 assert!(!reader_can_see(path, object_store.clone(), "k1").await);
853
854 storage.flush().await.unwrap();
855 assert!(reader_can_see(path, object_store.clone(), "k1").await);
856
857 storage.close().await.unwrap();
858 }
859
860 #[tokio::test]
861 async fn apply_with_await_durable_true_is_visible_to_reader() {
862 let object_store = Arc::new(InMemory::new());
863 let path = "/test/apply_durable";
864
865 let db = DbBuilder::new(path, object_store.clone())
866 .build()
867 .await
868 .unwrap();
869 let storage = SlateDbStorage::new(Arc::new(db));
870
871 storage
872 .apply_with_options(
873 vec![RecordOp::Put(
874 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
875 )],
876 WriteOptions {
877 await_durable: true,
878 },
879 )
880 .await
881 .unwrap();
882
883 assert!(reader_can_see(path, object_store.clone(), "k1").await);
884
885 storage.close().await.unwrap();
886 }
887
888 #[tokio::test]
889 async fn merge_defaults_to_not_await_durable() {
890 let object_store = Arc::new(InMemory::new());
891 let path = "/test/merge_default_durability";
892
893 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
894 let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
895 let db = DbBuilder::new(path, object_store.clone())
896 .with_merge_operator(slate_merge_op.clone())
897 .build()
898 .await
899 .unwrap();
900 let storage = SlateDbStorage::new(Arc::new(db));
901
902 storage
904 .merge(vec![
905 Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
906 ])
907 .await
908 .unwrap();
909
910 let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
911 Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
912 assert!(
913 !reader_can_see_with_merge_op(
914 path,
915 object_store.clone(),
916 "k1",
917 Some(reader_merge_op.clone()),
918 )
919 .await
920 );
921
922 storage.flush().await.unwrap();
923 assert!(
924 reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
925 .await
926 );
927
928 storage.close().await.unwrap();
929 }
930
931 #[tokio::test]
932 async fn merge_with_await_durable_true_is_visible_to_reader() {
933 let object_store = Arc::new(InMemory::new());
934 let path = "/test/merge_durable";
935
936 let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
937 let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
938 let db = DbBuilder::new(path, object_store.clone())
939 .with_merge_operator(slate_merge_op.clone())
940 .build()
941 .await
942 .unwrap();
943 let storage = SlateDbStorage::new(Arc::new(db));
944
945 storage
946 .merge_with_options(
947 vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
948 WriteOptions {
949 await_durable: true,
950 },
951 )
952 .await
953 .unwrap();
954
955 let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
956 Arc::new(SlateDbStorage::merge_operator_adapter(merge_op));
957 assert!(
958 reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
959 .await
960 );
961
962 storage.close().await.unwrap();
963 }
964}