1use std::error::Error;
2use std::net::SocketAddr;
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use async_trait::async_trait;
8use datacake_crdt::{HLCTimestamp, Key};
9use datacake_node::NodeId;
10use datacake_rpc::Channel;
11
12use crate::core::{Document, DocumentMetadata};
13
14pub struct ProgressWatcher {
16 inner: ProgressTracker,
17 timeout: Duration,
18 last_tick: Instant,
19 last_observed_counter: u64,
20}
21
22impl ProgressWatcher {
23 pub fn new(inner: ProgressTracker, timeout: Duration) -> Self {
24 Self {
25 inner,
26 timeout,
27 last_tick: Instant::now(),
28 last_observed_counter: 0,
29 }
30 }
31
32 pub fn has_expired(&mut self) -> bool {
34 if self.is_done() {
35 return false;
36 }
37
38 let counter = self.inner.progress_counter.load(Ordering::Relaxed);
39
40 if counter > self.last_observed_counter {
41 self.last_tick = Instant::now();
42 self.last_observed_counter = counter;
43 return false;
44 }
45
46 self.last_tick.elapsed() > self.timeout
47 }
48
49 pub fn is_done(&self) -> bool {
51 self.inner.done.load(Ordering::Relaxed)
52 }
53}
54
55#[derive(Default, Debug, Clone)]
56pub struct ProgressTracker {
62 pub(crate) progress_counter: Arc<AtomicU64>,
63 pub(crate) done: Arc<AtomicBool>,
64}
65
66impl ProgressTracker {
67 pub fn register_progress(&self) {
72 self.progress_counter.fetch_add(1, Ordering::Relaxed);
73 }
74
75 pub fn set_done(&self) {
77 self.done.store(true, Ordering::Relaxed);
78 }
79}
80
81#[derive(Clone)]
82pub struct PutContext {
87 pub(crate) progress: ProgressTracker,
89
90 pub(crate) remote_node_id: NodeId,
92 pub(crate) remote_addr: SocketAddr,
93 pub(crate) remote_rpc_channel: Channel,
94}
95
96impl PutContext {
97 #[inline]
98 pub fn register_progress(&self) {
103 self.progress.register_progress()
104 }
105
106 #[inline]
107 pub fn remote_node_id(&self) -> NodeId {
109 self.remote_node_id
110 }
111
112 #[inline]
113 pub fn remote_addr(&self) -> SocketAddr {
115 self.remote_addr
116 }
117
118 #[inline]
119 pub fn remote_channel(&self) -> &Channel {
125 &self.remote_rpc_channel
126 }
127}
128
129#[derive(Debug, thiserror::Error)]
130#[error("The operation was not completely successful due to error {inner}")]
131pub struct BulkMutationError<E>
134where
135 E: Error + Send + 'static,
136{
137 pub(crate) inner: E,
138 pub(crate) successful_doc_ids: Vec<Key>,
139}
140
141impl<E> BulkMutationError<E>
142where
143 E: Error + Send + 'static,
144{
145 pub fn empty_with_error(error: E) -> Self {
155 Self::new(error, Vec::new())
156 }
157
158 pub fn new(error: E, successful_doc_ids: Vec<Key>) -> Self {
168 Self {
169 inner: error,
170 successful_doc_ids,
171 }
172 }
173
174 #[inline]
175 pub fn cause(&self) -> &E {
177 &self.inner
178 }
179
180 #[inline]
181 pub fn into_inner(self) -> E {
183 self.inner
184 }
185
186 #[inline]
187 pub fn successful_doc_ids(&self) -> &[Key] {
189 &self.successful_doc_ids
190 }
191}
192
193#[async_trait]
195pub trait Storage: Send + Sync + 'static {
199 type Error: Error + Send + Sync + 'static;
200 type DocsIter: Iterator<Item = Document>;
201 type MetadataIter: Iterator<Item = (Key, HLCTimestamp, bool)>;
202
203 async fn get_keyspace_list(&self) -> Result<Vec<String>, Self::Error>;
205
206 async fn iter_metadata(
210 &self,
211 keyspace: &str,
212 ) -> Result<Self::MetadataIter, Self::Error>;
213
214 async fn remove_tombstones(
219 &self,
220 keyspace: &str,
221 keys: impl Iterator<Item = Key> + Send,
222 ) -> Result<(), BulkMutationError<Self::Error>>;
223
224 async fn put_with_ctx(
242 &self,
243 keyspace: &str,
244 document: Document,
245 _ctx: Option<&PutContext>,
246 ) -> Result<(), Self::Error> {
247 self.put(keyspace, document).await
248 }
249
250 async fn put(&self, keyspace: &str, document: Document) -> Result<(), Self::Error>;
261
262 async fn multi_put_with_ctx(
274 &self,
275 keyspace: &str,
276 documents: impl Iterator<Item = Document> + Send,
277 _ctx: Option<&PutContext>,
278 ) -> Result<(), BulkMutationError<Self::Error>> {
279 self.multi_put(keyspace, documents).await
280 }
281
282 async fn multi_put(
287 &self,
288 keyspace: &str,
289 documents: impl Iterator<Item = Document> + Send,
290 ) -> Result<(), BulkMutationError<Self::Error>>;
291
292 async fn mark_as_tombstone(
304 &self,
305 keyspace: &str,
306 doc_id: Key,
307 timestamp: HLCTimestamp,
308 ) -> Result<(), Self::Error>;
309
310 async fn mark_many_as_tombstone(
322 &self,
323 keyspace: &str,
324 documents: impl Iterator<Item = DocumentMetadata> + Send,
325 ) -> Result<(), BulkMutationError<Self::Error>>;
326
327 async fn get(
329 &self,
330 keyspace: &str,
331 doc_id: Key,
332 ) -> Result<Option<Document>, Self::Error>;
333
334 async fn multi_get(
339 &self,
340 keyspace: &str,
341 doc_ids: impl Iterator<Item = Key> + Send,
342 ) -> Result<Self::DocsIter, Self::Error>;
343}
344
345#[cfg(any(test, feature = "test-utils", feature = "test-suite"))]
346pub mod test_suite {
347 use std::any::type_name;
348 use std::collections::HashSet;
349 use std::hash::Hash;
350
351 use datacake_crdt::{HLCTimestamp, Key};
352
353 use crate::core::Document;
354 use crate::storage::Storage;
355 use crate::{BulkMutationError, DocumentMetadata, PutContext};
356
357 pub struct InstrumentedStorage<S: Storage>(pub S);
362
363 impl<S: Storage + Clone> Clone for InstrumentedStorage<S> {
364 fn clone(&self) -> Self {
365 Self(self.0.clone())
366 }
367 }
368
369 #[async_trait::async_trait]
370 impl<S: Storage> Storage for InstrumentedStorage<S> {
371 type Error = S::Error;
372 type DocsIter = S::DocsIter;
373 type MetadataIter = S::MetadataIter;
374
375 async fn get_keyspace_list(&self) -> Result<Vec<String>, Self::Error> {
376 info!("get_keyspace_list");
377 self.0.get_keyspace_list().await
378 }
379
380 async fn iter_metadata(
381 &self,
382 keyspace: &str,
383 ) -> Result<Self::MetadataIter, Self::Error> {
384 info!(keyspace = keyspace, "iter_metadata");
385 self.0.iter_metadata(keyspace).await
386 }
387
388 async fn remove_tombstones(
389 &self,
390 keyspace: &str,
391 keys: impl Iterator<Item = Key> + Send,
392 ) -> Result<(), BulkMutationError<Self::Error>> {
393 let keys = keys.collect::<Vec<_>>();
394 info!(keyspace = keyspace, keys = ?keys, "remove_many_metadata");
395 self.0.remove_tombstones(keyspace, keys.into_iter()).await
396 }
397
398 async fn put_with_ctx(
399 &self,
400 keyspace: &str,
401 document: Document,
402 ctx: Option<&PutContext>,
403 ) -> Result<(), Self::Error> {
404 info!(keyspace = keyspace, document = ?document, "put_with_ctx");
405 self.0.put_with_ctx(keyspace, document, ctx).await
406 }
407
408 async fn put(
409 &self,
410 keyspace: &str,
411 document: Document,
412 ) -> Result<(), Self::Error> {
413 info!(keyspace = keyspace, document = ?document, "put");
414 self.0.put(keyspace, document).await
415 }
416
417 async fn multi_put_with_ctx(
418 &self,
419 keyspace: &str,
420 documents: impl Iterator<Item = Document> + Send,
421 ctx: Option<&PutContext>,
422 ) -> Result<(), BulkMutationError<Self::Error>> {
423 let documents = documents.collect::<Vec<_>>();
424 info!(keyspace = keyspace, documents = ?documents, "put_with_ctx");
425 self.0
426 .multi_put_with_ctx(keyspace, documents.into_iter(), ctx)
427 .await
428 }
429
430 async fn multi_put(
431 &self,
432 keyspace: &str,
433 documents: impl Iterator<Item = Document> + Send,
434 ) -> Result<(), BulkMutationError<Self::Error>> {
435 let documents = documents.collect::<Vec<_>>();
436 info!(keyspace = keyspace, documents = ?documents, "multi_put");
437 self.0.multi_put(keyspace, documents.into_iter()).await
438 }
439
440 async fn mark_as_tombstone(
441 &self,
442 keyspace: &str,
443 doc_id: Key,
444 timestamp: HLCTimestamp,
445 ) -> Result<(), Self::Error> {
446 info!(keyspace = keyspace, doc_id = doc_id, timestamp = %timestamp, "mark_as_tombstone");
447 self.0.mark_as_tombstone(keyspace, doc_id, timestamp).await
448 }
449
450 async fn mark_many_as_tombstone(
451 &self,
452 keyspace: &str,
453 documents: impl Iterator<Item = DocumentMetadata> + Send,
454 ) -> Result<(), BulkMutationError<Self::Error>> {
455 let documents = documents.collect::<Vec<_>>();
456 info!(keyspace = keyspace, documents = ?documents, "mark_many_as_tombstone");
457 self.0
458 .mark_many_as_tombstone(keyspace, documents.into_iter())
459 .await
460 }
461
462 async fn get(
463 &self,
464 keyspace: &str,
465 doc_id: Key,
466 ) -> Result<Option<Document>, Self::Error> {
467 info!(keyspace = keyspace, doc_id = doc_id, "get");
468 self.0.get(keyspace, doc_id).await
469 }
470
471 async fn multi_get(
472 &self,
473 keyspace: &str,
474 doc_ids: impl Iterator<Item = Key> + Send,
475 ) -> Result<Self::DocsIter, Self::Error> {
476 let doc_ids = doc_ids.collect::<Vec<_>>();
477 info!(keyspace = keyspace, doc_ids = ?doc_ids, "multi_get");
478 self.0.multi_get(keyspace, doc_ids.into_iter()).await
479 }
480 }
481
482 #[tokio::test]
483 async fn test_suite_semantics() {
484 use crate::test_utils::MemStore;
485 let _ = tracing_subscriber::fmt::try_init();
486 run_test_suite(MemStore::default()).await
487 }
488
489 pub async fn run_test_suite<S: Storage>(storage: S) {
490 let mut clock = HLCTimestamp::now(0, 0);
491 info!("Starting test suite for storage: {}", type_name::<S>());
492
493 let storage = InstrumentedStorage(storage);
494
495 test_keyspace_semantics(&storage, &mut clock).await;
496 info!("test_keyspace_semantics OK");
497
498 test_basic_persistence_test(&storage, &mut clock).await;
499 info!("test_basic_persistence_test OK");
500
501 test_basic_metadata_test(&storage, &mut clock).await;
502 info!("test_basic_metadata_test OK");
503 }
504
505 #[instrument(name = "test_keyspace_semantics", skip(storage))]
506 async fn test_keyspace_semantics<S: Storage + Sync>(
507 storage: &S,
508 clock: &mut HLCTimestamp,
509 ) {
510 info!("Starting test");
511
512 static KEYSPACE: &str = "first-keyspace";
513
514 let res = storage.iter_metadata(KEYSPACE).await;
515 if let Err(e) = res {
516 panic!(
517 "Iterating through keyspace metadata should return OK. Got {:?}",
518 e
519 );
520 }
521
522 let metadata = storage
523 .iter_metadata(KEYSPACE)
524 .await
525 .expect("Produce metadata iterator.")
526 .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
527 assert_eq!(metadata, to_hashset([]), "New keyspace should be empty.");
528
529 let doc = Document::new(1, clock.send().unwrap(), Vec::new());
530 let res = storage.put_with_ctx(KEYSPACE, doc, None).await;
531 assert!(
532 res.is_ok(),
533 "Setting metadata on a new keyspace should not error. Got {:?}",
534 res
535 );
536
537 let doc = Document::new(2, clock.send().unwrap(), Vec::new());
538 let res = storage.put_with_ctx(KEYSPACE, doc, None).await;
539 assert!(
540 res.is_ok(),
541 "Setting metadata on a existing keyspace should not error. Got {:?}",
542 res
543 );
544
545 let metadata = storage
546 .iter_metadata(KEYSPACE)
547 .await
548 .expect("Produce metadata iterator.")
549 .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
550 assert_eq!(
551 metadata.len(),
552 2,
553 "First keyspace should contain 2 entries."
554 );
555
556 let keyspace_list = storage
557 .get_keyspace_list()
558 .await
559 .expect("Get keyspace list");
560 assert_eq!(
561 keyspace_list,
562 vec![KEYSPACE.to_string()],
563 "Returned keyspace list (left) should match value provided (right)."
564 );
565
566 let metadata = storage
567 .iter_metadata("second-keyspace")
568 .await
569 .expect("Produce metadata iterator.")
570 .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
571 assert_eq!(metadata, to_hashset([]), "Second keyspace should be empty.");
572 }
573
574 #[instrument(name = "test_basic_metadata_test", skip(storage))]
575 async fn test_basic_metadata_test<S: Storage>(
576 storage: &S,
577 clock: &mut HLCTimestamp,
578 ) {
579 info!("Starting test");
580
581 static KEYSPACE: &str = "metadata-test-keyspace";
582
583 let mut doc_1 = Document::new(1, clock.send().unwrap(), Vec::new());
584 let mut doc_2 = Document::new(2, clock.send().unwrap(), Vec::new());
585 let mut doc_3 = Document::new(3, clock.send().unwrap(), Vec::new());
586 storage
587 .multi_put(
588 KEYSPACE,
589 [doc_1.clone(), doc_2.clone(), doc_3.clone()].into_iter(),
590 )
591 .await
592 .expect("Put documents");
593
594 doc_3.metadata.last_updated = clock.send().unwrap();
595 storage
596 .mark_as_tombstone(KEYSPACE, doc_3.id(), doc_3.last_updated())
597 .await
598 .expect("Mark document as tombstone.");
599
600 let metadata = storage
601 .iter_metadata(KEYSPACE)
602 .await
603 .expect("Produce metadata iterator.")
604 .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
605 assert_eq!(
606 metadata,
607 to_hashset([
608 (doc_1.id(), doc_1.last_updated(), false),
609 (doc_2.id(), doc_2.last_updated(), false),
610 (doc_3.id(), doc_3.last_updated(), true),
611 ]),
612 "Persisted metadata entries should match expected values."
613 );
614
615 doc_1.metadata.last_updated = clock.send().unwrap();
616 doc_2.metadata.last_updated = clock.send().unwrap();
617 storage
618 .mark_many_as_tombstone(
619 KEYSPACE,
620 [doc_1.metadata, doc_2.metadata].into_iter(),
621 )
622 .await
623 .expect("Mark documents as tombstones.");
624 let metadata = storage
625 .iter_metadata(KEYSPACE)
626 .await
627 .expect("Produce metadata iterator.")
628 .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
629 assert_eq!(
630 metadata,
631 to_hashset([
632 (doc_1.id(), doc_1.last_updated(), true),
633 (doc_2.id(), doc_2.last_updated(), true),
634 (doc_3.id(), doc_3.last_updated(), true),
635 ]),
636 "Persisted metadata entries should match expected values."
637 );
638
639 storage
640 .remove_tombstones(KEYSPACE, [1, 2].into_iter())
641 .await
642 .expect("Remove tombstone entries.");
643 let metadata = storage
644 .iter_metadata(KEYSPACE)
645 .await
646 .expect("Produce metadata iterator.")
647 .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
648 assert_eq!(
649 metadata,
650 to_hashset([(doc_3.id(), doc_3.last_updated(), true)]),
651 "Persisted metadata entries should match expected values after removal."
652 );
653
654 doc_1.metadata.last_updated = clock.send().unwrap();
655 doc_2.metadata.last_updated = clock.send().unwrap();
656 doc_3.metadata.last_updated = clock.send().unwrap();
657 storage
658 .multi_put(
659 KEYSPACE,
660 [doc_1.clone(), doc_2.clone(), doc_3.clone()].into_iter(),
661 )
662 .await
663 .expect("Set metadata entry 3.");
664 let metadata = storage
665 .iter_metadata(KEYSPACE)
666 .await
667 .expect("Produce metadata iterator.")
668 .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
669 assert_eq!(
670 metadata,
671 to_hashset([
672 (doc_1.id(), doc_1.last_updated(), false),
673 (doc_2.id(), doc_2.last_updated(), false),
674 (doc_3.id(), doc_3.last_updated(), false),
675 ]),
676 "Persisted metadata entries should match expected values after update."
677 );
678
679 doc_1.metadata.last_updated = clock.send().unwrap();
680 doc_2.metadata.last_updated = clock.send().unwrap();
681 doc_3.metadata.last_updated = clock.send().unwrap();
682 storage
683 .mark_many_as_tombstone(
684 KEYSPACE,
685 [doc_1.metadata, doc_2.metadata, doc_3.metadata].into_iter(),
686 )
687 .await
688 .expect("Mark documents as tombstones.");
689 let res = storage
690 .remove_tombstones(KEYSPACE, [1, 2, 3].into_iter())
691 .await;
692 assert!(
693 res.is_ok(),
694 "Expected successful removal of given metadata keys. Got: {:?}",
695 res
696 );
697
698 let metadata = storage
699 .iter_metadata(KEYSPACE)
700 .await
701 .expect("Produce metadata iterator.")
702 .count();
703 assert_eq!(
704 metadata, 0,
705 "Persisted metadata entries should be empty after tombstone purge."
706 );
707
708 doc_1.metadata.last_updated = clock.send().unwrap();
709 doc_2.metadata.last_updated = clock.send().unwrap();
710 doc_3.metadata.last_updated = clock.send().unwrap();
711 let doc_4_ts = clock.send().unwrap();
712 storage
713 .mark_many_as_tombstone(
714 KEYSPACE,
715 [
716 doc_1.metadata,
717 doc_2.metadata,
718 doc_3.metadata,
719 DocumentMetadata::new(4, doc_4_ts),
720 ]
721 .into_iter(),
722 )
723 .await
724 .expect("Mark documents as tombstones.");
725 let metadata = storage
726 .iter_metadata(KEYSPACE)
727 .await
728 .expect("Produce metadata iterator.")
729 .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
730 assert_eq!(
731 metadata,
732 to_hashset([
733 (doc_1.id(), doc_1.last_updated(), true),
734 (doc_2.id(), doc_2.last_updated(), true),
735 (doc_3.id(), doc_3.last_updated(), true),
736 (4, doc_4_ts, true),
737 ]),
738 "Persisted tombstones should be tracked."
739 );
740 }
741
742 #[instrument(name = "test_basic_persistence_test", skip(storage))]
743 async fn test_basic_persistence_test<S: Storage + Sync>(
744 storage: &S,
745 clock: &mut HLCTimestamp,
746 ) {
747 info!("Starting test");
748
749 static KEYSPACE: &str = "persistence-test-keyspace";
750
751 let res = storage.get(KEYSPACE, 1).await;
752 assert!(
753 res.is_ok(),
754 "Expected successful get request. Got: {:?}",
755 res
756 );
757 assert!(
758 res.unwrap().is_none(),
759 "Expected no document to be returned."
760 );
761
762 #[allow(clippy::needless_collect)]
763 let res = storage
764 .multi_get(KEYSPACE, [1, 2, 3].into_iter())
765 .await
766 .expect("Expected successful get request.")
767 .collect::<Vec<_>>();
768 assert!(res.is_empty(), "Expected no document to be returned.");
769
770 let mut doc_1 =
771 Document::new(1, clock.send().unwrap(), b"Hello, world!".to_vec());
772 let mut doc_2 = Document::new(2, clock.send().unwrap(), Vec::new());
773 let mut doc_3 = Document::new(
774 3,
775 clock.send().unwrap(),
776 b"Hello, from document 3!".to_vec(),
777 );
778 let doc_3_updated = Document::new(
779 3,
780 clock.send().unwrap(),
781 b"Hello, from document 3 With an update!".to_vec(),
782 );
783
784 storage
785 .put_with_ctx(KEYSPACE, doc_1.clone(), None)
786 .await
787 .expect("Put document in persistent store.");
788 let res = storage.get(KEYSPACE, 1).await;
789 assert!(
790 res.is_ok(),
791 "Expected successful get request. Got: {:?}",
792 res
793 );
794 let doc = res
795 .unwrap()
796 .expect("Expected document to be returned after inserting doc.");
797 assert_eq!(doc, doc_1, "Returned document should match.");
798
799 storage
800 .multi_put(KEYSPACE, [doc_3.clone(), doc_2.clone()].into_iter())
801 .await
802 .expect("Put document in persistent store.");
803 let res = storage
804 .multi_get(KEYSPACE, [1, 2, 3].into_iter())
805 .await
806 .expect("Expected successful get request.")
807 .collect::<HashSet<_>>();
808 assert_eq!(
809 res,
810 to_hashset([doc_1.clone(), doc_2.clone(), doc_3.clone()]),
811 "Documents returned should match provided."
812 );
813
814 storage
815 .put_with_ctx(KEYSPACE, doc_3_updated.clone(), None)
816 .await
817 .expect("Put updated document in persistent store.");
818 let res = storage
819 .get(KEYSPACE, 3)
820 .await
821 .expect("Get updated document.");
822 let doc = res.expect("Expected document to be returned after updating doc.");
823 assert_eq!(doc, doc_3_updated, "Returned document should match.");
824
825 doc_2.metadata.last_updated = clock.send().unwrap();
826 storage
827 .mark_as_tombstone(KEYSPACE, doc_2.id(), doc_2.last_updated())
828 .await
829 .expect("Mark document as tombstone.");
830 let res = storage.get(KEYSPACE, 2).await;
831 assert!(
832 res.is_ok(),
833 "Expected successful get request. Got: {:?}",
834 res
835 );
836 assert!(
837 res.unwrap().is_none(),
838 "Expected no document to be returned."
839 );
840
841 doc_1.metadata.last_updated = clock.send().unwrap();
842 doc_2.metadata.last_updated = clock.send().unwrap();
843 storage
844 .mark_many_as_tombstone(
845 KEYSPACE,
846 [
847 doc_1.metadata,
848 doc_2.metadata,
849 DocumentMetadata::new(4, clock.send().unwrap()),
850 ]
851 .into_iter(),
852 )
853 .await
854 .expect("Merk documents as tombstones");
855 let res = storage
856 .multi_get(KEYSPACE, [1, 2, 3].into_iter())
857 .await
858 .expect("Expected successful get request.")
859 .collect::<HashSet<_>>();
860 assert_eq!(
861 res,
862 to_hashset([doc_3_updated]),
863 "Expected returned documents to match.",
864 );
865
866 doc_3.metadata.last_updated = clock.send().unwrap();
867 storage
868 .mark_as_tombstone(KEYSPACE, doc_3.id(), doc_3.last_updated())
869 .await
870 .expect("Delete documents from store.");
871 #[allow(clippy::needless_collect)]
872 let res = storage
873 .multi_get(KEYSPACE, [1, 2, 3].into_iter())
874 .await
875 .expect("Expected successful get request.")
876 .collect::<Vec<_>>();
877 assert!(res.is_empty(), "Expected no documents to be returned.");
878 }
879
880 fn to_hashset<T: Hash + Eq>(iter: impl IntoIterator<Item = T>) -> HashSet<T> {
881 iter.into_iter().collect()
882 }
883}