datacake_eventual_consistency/
storage.rs

1use std::error::Error;
2use std::net::SocketAddr;
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::Arc;
5use std::time::{Duration, Instant};
6
7use async_trait::async_trait;
8use datacake_crdt::{HLCTimestamp, Key};
9use datacake_node::NodeId;
10use datacake_rpc::Channel;
11
12use crate::core::{Document, DocumentMetadata};
13
14/// A utility for tracking the progress a task has made.
15pub struct ProgressWatcher {
16    inner: ProgressTracker,
17    timeout: Duration,
18    last_tick: Instant,
19    last_observed_counter: u64,
20}
21
22impl ProgressWatcher {
23    pub fn new(inner: ProgressTracker, timeout: Duration) -> Self {
24        Self {
25            inner,
26            timeout,
27            last_tick: Instant::now(),
28            last_observed_counter: 0,
29        }
30    }
31
32    /// Checks if the task has expired or made progress.
33    pub fn has_expired(&mut self) -> bool {
34        if self.is_done() {
35            return false;
36        }
37
38        let counter = self.inner.progress_counter.load(Ordering::Relaxed);
39
40        if counter > self.last_observed_counter {
41            self.last_tick = Instant::now();
42            self.last_observed_counter = counter;
43            return false;
44        }
45
46        self.last_tick.elapsed() > self.timeout
47    }
48
49    /// Returns if the task is complete or not.
50    pub fn is_done(&self) -> bool {
51        self.inner.done.load(Ordering::Relaxed)
52    }
53}
54
55#[derive(Default, Debug, Clone)]
56/// A simple atomic counter to indicate to supervisors that the given
57/// operation is making progress.
58///
59/// This can be used in order to prevent supervisors timing out tasks
60/// because they have not been completed within the target time frame.
61pub struct ProgressTracker {
62    pub(crate) progress_counter: Arc<AtomicU64>,
63    pub(crate) done: Arc<AtomicBool>,
64}
65
66impl ProgressTracker {
67    /// Adds a marker to the progress tracker.
68    ///
69    /// This is so any supervisors don't accidentally cancel or abort a task if it's
70    /// taking longer than it expected.
71    pub fn register_progress(&self) {
72        self.progress_counter.fetch_add(1, Ordering::Relaxed);
73    }
74
75    /// Marks the task as complete.
76    pub fn set_done(&self) {
77        self.done.store(true, Ordering::Relaxed);
78    }
79}
80
81#[derive(Clone)]
82/// Additional information related to the operation which can be useful.
83///
84/// This can be very useful if you wish to extend Datacake's storage system
85/// in order to support objects which don't fit in memory etc...
86pub struct PutContext {
87    // Info relating to the task itself.
88    pub(crate) progress: ProgressTracker,
89
90    // Info relating to the remote node.
91    pub(crate) remote_node_id: NodeId,
92    pub(crate) remote_addr: SocketAddr,
93    pub(crate) remote_rpc_channel: Channel,
94}
95
96impl PutContext {
97    #[inline]
98    /// Adds a marker to the progress tracker.
99    ///
100    /// This is so any supervisors don't accidentally cancel or abort a task if it's
101    /// taking longer than it expected.
102    pub fn register_progress(&self) {
103        self.progress.register_progress()
104    }
105
106    #[inline]
107    /// The unique ID of the remote node.
108    pub fn remote_node_id(&self) -> NodeId {
109        self.remote_node_id
110    }
111
112    #[inline]
113    /// The socket address of the remote node.
114    pub fn remote_addr(&self) -> SocketAddr {
115        self.remote_addr
116    }
117
118    #[inline]
119    /// The existing connection channel which can be used
120    /// to communicate with services ran by the Datacake server.
121    ///
122    /// Additional services can be registered to the server ran by Datacake
123    /// using the `ServiceRegistry` trait.
124    pub fn remote_channel(&self) -> &Channel {
125        &self.remote_rpc_channel
126    }
127}
128
129#[derive(Debug, thiserror::Error)]
130#[error("The operation was not completely successful due to error {inner}")]
131/// An error which occurred while mutating the state not allowing the operation
132/// to proceed any further but also having some part of the operation complete.
133pub struct BulkMutationError<E>
134where
135    E: Error + Send + 'static,
136{
137    pub(crate) inner: E,
138    pub(crate) successful_doc_ids: Vec<Key>,
139}
140
141impl<E> BulkMutationError<E>
142where
143    E: Error + Send + 'static,
144{
145    /// Creates a new mutation error from the provided inner error.
146    ///
147    /// This essentially means that what ever change that was going to happen
148    /// was atomic and has therefore been revered.
149    ///
150    /// WARNING:
151    /// *You should under no circumstances return an empty mutation error if **any**
152    /// part of the state has been mutated and will not be reversed. Doing so will lead
153    /// to state divergence within the cluster*
154    pub fn empty_with_error(error: E) -> Self {
155        Self::new(error, Vec::new())
156    }
157
158    /// Creates a new mutation error from the provided inner error.
159    ///
160    /// This essentially means that although we ran into an error, we were able to
161    /// complete some part of the operation on some documents.
162    ///
163    /// WARNING:
164    /// *You should under no circumstances return an empty mutation error if **any**
165    /// part of the state has been mutated and will not be reversed. Doing so will lead
166    /// to state divergence within the cluster*
167    pub fn new(error: E, successful_doc_ids: Vec<Key>) -> Self {
168        Self {
169            inner: error,
170            successful_doc_ids,
171        }
172    }
173
174    #[inline]
175    /// The cause of the error.
176    pub fn cause(&self) -> &E {
177        &self.inner
178    }
179
180    #[inline]
181    /// Consumes the error returning the inner error.
182    pub fn into_inner(self) -> E {
183        self.inner
184    }
185
186    #[inline]
187    /// The document ids which the operation was successful on.
188    pub fn successful_doc_ids(&self) -> &[Key] {
189        &self.successful_doc_ids
190    }
191}
192
193// TODO: Add default methods with more complicated handlers in order to allow room for lnx stuff.
194#[async_trait]
195/// The generic storage trait which encapsulates all the required persistence logic.
196///
197/// A test suite is available for ensuring correct behavour of stores.
198pub trait Storage: Send + Sync + 'static {
199    type Error: Error + Send + Sync + 'static;
200    type DocsIter: Iterator<Item = Document>;
201    type MetadataIter: Iterator<Item = (Key, HLCTimestamp, bool)>;
202
203    /// Retrieves all keyspace currently persisted.
204    async fn get_keyspace_list(&self) -> Result<Vec<String>, Self::Error>;
205
206    /// Retrieves an iterator producing all values contained within the store.
207    ///
208    /// This should contain the document ID, when it was last updated and if it's a tombstone or not.
209    async fn iter_metadata(
210        &self,
211        keyspace: &str,
212    ) -> Result<Self::MetadataIter, Self::Error>;
213
214    /// Remove a set of keys which are marked as tombstones store.
215    ///
216    /// If the given `keyspace` does not exist, it should be created. A new keyspace name should
217    /// not result in an error being returned by the storage trait.
218    async fn remove_tombstones(
219        &self,
220        keyspace: &str,
221        keys: impl Iterator<Item = Key> + Send,
222    ) -> Result<(), BulkMutationError<Self::Error>>;
223
224    /// Inserts or updates a document in the persistent store.
225    ///
226    /// This is the base call for any `put` operation, and is passed the additional
227    /// [PutContext] parameter which can provided additional information.
228    ///
229    /// In the case the context is `None`, this indicates that the operation originates
230    /// from the local node itself. If context is `Some(ctx)` then it has originated from
231    /// a remote node.
232    ///
233    /// If the given `keyspace` does not exist, it should be created. A new keyspace name should
234    /// not result in an error being returned by the storage trait.
235    ///
236    /// NOTE:
237    ///     It is the implementors responsibility to ensure that this operation is atomic and durable.
238    ///     Partially setting the document metadata and failing to also set the data can lead to
239    ///     split sate and the system will fail to converge unless a new operation comes in to modify
240    ///     the document again.
241    async fn put_with_ctx(
242        &self,
243        keyspace: &str,
244        document: Document,
245        _ctx: Option<&PutContext>,
246    ) -> Result<(), Self::Error> {
247        self.put(keyspace, document).await
248    }
249
250    /// Inserts or updates a document in the persistent store.
251    ///
252    /// If the given `keyspace` does not exist, it should be created. A new keyspace name should
253    /// not result in an error being returned by the storage trait.
254    ///
255    /// NOTE:
256    ///     It is the implementors responsibility to ensure that this operation is atomic and durable.
257    ///     Partially setting the document metadata and failing to also set the data can lead to
258    ///     split sate and the system will fail to converge unless a new operation comes in to modify
259    ///     the document again.
260    async fn put(&self, keyspace: &str, document: Document) -> Result<(), Self::Error>;
261
262    /// Inserts or updates a set of documents in the persistent store.
263    ///
264    /// This is the base call for any `multi_put` operation, and is passed the additional
265    /// [PutContext] parameter which can provided additional information.
266    ///
267    /// In the case the context is `None`, this indicates that the operation originates
268    /// from the local node itself. If context is `Some(ctx)` then it has originated from
269    /// a remote node.
270    ///
271    /// If the given `keyspace` does not exist, it should be created. A new keyspace name should
272    /// not result in an error being returned by the storage trait.
273    async fn multi_put_with_ctx(
274        &self,
275        keyspace: &str,
276        documents: impl Iterator<Item = Document> + Send,
277        _ctx: Option<&PutContext>,
278    ) -> Result<(), BulkMutationError<Self::Error>> {
279        self.multi_put(keyspace, documents).await
280    }
281
282    /// Inserts or updates a set of documents in the persistent store.
283    ///
284    /// If the given `keyspace` does not exist, it should be created. A new keyspace name should
285    /// not result in an error being returned by the storage trait.
286    async fn multi_put(
287        &self,
288        keyspace: &str,
289        documents: impl Iterator<Item = Document> + Send,
290    ) -> Result<(), BulkMutationError<Self::Error>>;
291
292    /// Marks a document in the store as a tombstone.
293    ///
294    /// If the document does not exist this should be a no-op.
295    ///
296    /// If the given `keyspace` does not exist, it should be created. A new keyspace name should
297    /// not result in an error being returned by the storage trait.
298    ///
299    /// NOTE:
300    ///     This operation is permitted to delete the actual value of the document, but there
301    ///     must be a marker indicating that the given document has been marked as deleted at
302    ///     the provided timestamp.
303    async fn mark_as_tombstone(
304        &self,
305        keyspace: &str,
306        doc_id: Key,
307        timestamp: HLCTimestamp,
308    ) -> Result<(), Self::Error>;
309
310    /// Marks a set of documents in the store as a tombstone.
311    ///
312    /// If the document does not exist this should be a no-op.
313    ///
314    /// If the given `keyspace` does not exist, it should be created. A new keyspace name should
315    /// not result in an error being returned by the storage trait.
316    ///
317    /// NOTE:
318    ///     This operation is permitted to delete the actual value of the document, but there
319    ///     must be a marker indicating that the given document has been marked as deleted at
320    ///     the provided timestamp.
321    async fn mark_many_as_tombstone(
322        &self,
323        keyspace: &str,
324        documents: impl Iterator<Item = DocumentMetadata> + Send,
325    ) -> Result<(), BulkMutationError<Self::Error>>;
326
327    /// Retrieves a single document belonging to a given keyspace from the store.
328    async fn get(
329        &self,
330        keyspace: &str,
331        doc_id: Key,
332    ) -> Result<Option<Document>, Self::Error>;
333
334    /// Retrieves a set of documents belonging to a given keyspace from the store.
335    ///
336    /// No error should be returned if a document id cannot be found, instead it should
337    /// just be ignored.
338    async fn multi_get(
339        &self,
340        keyspace: &str,
341        doc_ids: impl Iterator<Item = Key> + Send,
342    ) -> Result<Self::DocsIter, Self::Error>;
343}
344
345#[cfg(any(test, feature = "test-utils", feature = "test-suite"))]
346pub mod test_suite {
347    use std::any::type_name;
348    use std::collections::HashSet;
349    use std::hash::Hash;
350
351    use datacake_crdt::{HLCTimestamp, Key};
352
353    use crate::core::Document;
354    use crate::storage::Storage;
355    use crate::{BulkMutationError, DocumentMetadata, PutContext};
356
357    /// A wrapping type around another `Storage` implementation that
358    /// logs all the activity going into and out of the store.
359    ///
360    /// This is a very useful system for debugging issues with your store.
361    pub struct InstrumentedStorage<S: Storage>(pub S);
362
363    impl<S: Storage + Clone> Clone for InstrumentedStorage<S> {
364        fn clone(&self) -> Self {
365            Self(self.0.clone())
366        }
367    }
368
369    #[async_trait::async_trait]
370    impl<S: Storage> Storage for InstrumentedStorage<S> {
371        type Error = S::Error;
372        type DocsIter = S::DocsIter;
373        type MetadataIter = S::MetadataIter;
374
375        async fn get_keyspace_list(&self) -> Result<Vec<String>, Self::Error> {
376            info!("get_keyspace_list");
377            self.0.get_keyspace_list().await
378        }
379
380        async fn iter_metadata(
381            &self,
382            keyspace: &str,
383        ) -> Result<Self::MetadataIter, Self::Error> {
384            info!(keyspace = keyspace, "iter_metadata");
385            self.0.iter_metadata(keyspace).await
386        }
387
388        async fn remove_tombstones(
389            &self,
390            keyspace: &str,
391            keys: impl Iterator<Item = Key> + Send,
392        ) -> Result<(), BulkMutationError<Self::Error>> {
393            let keys = keys.collect::<Vec<_>>();
394            info!(keyspace = keyspace, keys = ?keys, "remove_many_metadata");
395            self.0.remove_tombstones(keyspace, keys.into_iter()).await
396        }
397
398        async fn put_with_ctx(
399            &self,
400            keyspace: &str,
401            document: Document,
402            ctx: Option<&PutContext>,
403        ) -> Result<(), Self::Error> {
404            info!(keyspace = keyspace, document = ?document, "put_with_ctx");
405            self.0.put_with_ctx(keyspace, document, ctx).await
406        }
407
408        async fn put(
409            &self,
410            keyspace: &str,
411            document: Document,
412        ) -> Result<(), Self::Error> {
413            info!(keyspace = keyspace, document = ?document, "put");
414            self.0.put(keyspace, document).await
415        }
416
417        async fn multi_put_with_ctx(
418            &self,
419            keyspace: &str,
420            documents: impl Iterator<Item = Document> + Send,
421            ctx: Option<&PutContext>,
422        ) -> Result<(), BulkMutationError<Self::Error>> {
423            let documents = documents.collect::<Vec<_>>();
424            info!(keyspace = keyspace, documents = ?documents, "put_with_ctx");
425            self.0
426                .multi_put_with_ctx(keyspace, documents.into_iter(), ctx)
427                .await
428        }
429
430        async fn multi_put(
431            &self,
432            keyspace: &str,
433            documents: impl Iterator<Item = Document> + Send,
434        ) -> Result<(), BulkMutationError<Self::Error>> {
435            let documents = documents.collect::<Vec<_>>();
436            info!(keyspace = keyspace, documents = ?documents, "multi_put");
437            self.0.multi_put(keyspace, documents.into_iter()).await
438        }
439
440        async fn mark_as_tombstone(
441            &self,
442            keyspace: &str,
443            doc_id: Key,
444            timestamp: HLCTimestamp,
445        ) -> Result<(), Self::Error> {
446            info!(keyspace = keyspace, doc_id = doc_id, timestamp = %timestamp, "mark_as_tombstone");
447            self.0.mark_as_tombstone(keyspace, doc_id, timestamp).await
448        }
449
450        async fn mark_many_as_tombstone(
451            &self,
452            keyspace: &str,
453            documents: impl Iterator<Item = DocumentMetadata> + Send,
454        ) -> Result<(), BulkMutationError<Self::Error>> {
455            let documents = documents.collect::<Vec<_>>();
456            info!(keyspace = keyspace, documents = ?documents, "mark_many_as_tombstone");
457            self.0
458                .mark_many_as_tombstone(keyspace, documents.into_iter())
459                .await
460        }
461
462        async fn get(
463            &self,
464            keyspace: &str,
465            doc_id: Key,
466        ) -> Result<Option<Document>, Self::Error> {
467            info!(keyspace = keyspace, doc_id = doc_id, "get");
468            self.0.get(keyspace, doc_id).await
469        }
470
471        async fn multi_get(
472            &self,
473            keyspace: &str,
474            doc_ids: impl Iterator<Item = Key> + Send,
475        ) -> Result<Self::DocsIter, Self::Error> {
476            let doc_ids = doc_ids.collect::<Vec<_>>();
477            info!(keyspace = keyspace, doc_ids = ?doc_ids, "multi_get");
478            self.0.multi_get(keyspace, doc_ids.into_iter()).await
479        }
480    }
481
482    #[tokio::test]
483    async fn test_suite_semantics() {
484        use crate::test_utils::MemStore;
485        let _ = tracing_subscriber::fmt::try_init();
486        run_test_suite(MemStore::default()).await
487    }
488
489    pub async fn run_test_suite<S: Storage>(storage: S) {
490        let mut clock = HLCTimestamp::now(0, 0);
491        info!("Starting test suite for storage: {}", type_name::<S>());
492
493        let storage = InstrumentedStorage(storage);
494
495        test_keyspace_semantics(&storage, &mut clock).await;
496        info!("test_keyspace_semantics OK");
497
498        test_basic_persistence_test(&storage, &mut clock).await;
499        info!("test_basic_persistence_test OK");
500
501        test_basic_metadata_test(&storage, &mut clock).await;
502        info!("test_basic_metadata_test OK");
503    }
504
505    #[instrument(name = "test_keyspace_semantics", skip(storage))]
506    async fn test_keyspace_semantics<S: Storage + Sync>(
507        storage: &S,
508        clock: &mut HLCTimestamp,
509    ) {
510        info!("Starting test");
511
512        static KEYSPACE: &str = "first-keyspace";
513
514        let res = storage.iter_metadata(KEYSPACE).await;
515        if let Err(e) = res {
516            panic!(
517                "Iterating through keyspace metadata should return OK. Got {:?}",
518                e
519            );
520        }
521
522        let metadata = storage
523            .iter_metadata(KEYSPACE)
524            .await
525            .expect("Produce metadata iterator.")
526            .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
527        assert_eq!(metadata, to_hashset([]), "New keyspace should be empty.");
528
529        let doc = Document::new(1, clock.send().unwrap(), Vec::new());
530        let res = storage.put_with_ctx(KEYSPACE, doc, None).await;
531        assert!(
532            res.is_ok(),
533            "Setting metadata on a new keyspace should not error. Got {:?}",
534            res
535        );
536
537        let doc = Document::new(2, clock.send().unwrap(), Vec::new());
538        let res = storage.put_with_ctx(KEYSPACE, doc, None).await;
539        assert!(
540            res.is_ok(),
541            "Setting metadata on a existing keyspace should not error. Got {:?}",
542            res
543        );
544
545        let metadata = storage
546            .iter_metadata(KEYSPACE)
547            .await
548            .expect("Produce metadata iterator.")
549            .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
550        assert_eq!(
551            metadata.len(),
552            2,
553            "First keyspace should contain 2 entries."
554        );
555
556        let keyspace_list = storage
557            .get_keyspace_list()
558            .await
559            .expect("Get keyspace list");
560        assert_eq!(
561            keyspace_list,
562            vec![KEYSPACE.to_string()],
563            "Returned keyspace list (left) should match value provided (right)."
564        );
565
566        let metadata = storage
567            .iter_metadata("second-keyspace")
568            .await
569            .expect("Produce metadata iterator.")
570            .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
571        assert_eq!(metadata, to_hashset([]), "Second keyspace should be empty.");
572    }
573
574    #[instrument(name = "test_basic_metadata_test", skip(storage))]
575    async fn test_basic_metadata_test<S: Storage>(
576        storage: &S,
577        clock: &mut HLCTimestamp,
578    ) {
579        info!("Starting test");
580
581        static KEYSPACE: &str = "metadata-test-keyspace";
582
583        let mut doc_1 = Document::new(1, clock.send().unwrap(), Vec::new());
584        let mut doc_2 = Document::new(2, clock.send().unwrap(), Vec::new());
585        let mut doc_3 = Document::new(3, clock.send().unwrap(), Vec::new());
586        storage
587            .multi_put(
588                KEYSPACE,
589                [doc_1.clone(), doc_2.clone(), doc_3.clone()].into_iter(),
590            )
591            .await
592            .expect("Put documents");
593
594        doc_3.metadata.last_updated = clock.send().unwrap();
595        storage
596            .mark_as_tombstone(KEYSPACE, doc_3.id(), doc_3.last_updated())
597            .await
598            .expect("Mark document as tombstone.");
599
600        let metadata = storage
601            .iter_metadata(KEYSPACE)
602            .await
603            .expect("Produce metadata iterator.")
604            .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
605        assert_eq!(
606            metadata,
607            to_hashset([
608                (doc_1.id(), doc_1.last_updated(), false),
609                (doc_2.id(), doc_2.last_updated(), false),
610                (doc_3.id(), doc_3.last_updated(), true),
611            ]),
612            "Persisted metadata entries should match expected values."
613        );
614
615        doc_1.metadata.last_updated = clock.send().unwrap();
616        doc_2.metadata.last_updated = clock.send().unwrap();
617        storage
618            .mark_many_as_tombstone(
619                KEYSPACE,
620                [doc_1.metadata, doc_2.metadata].into_iter(),
621            )
622            .await
623            .expect("Mark documents as tombstones.");
624        let metadata = storage
625            .iter_metadata(KEYSPACE)
626            .await
627            .expect("Produce metadata iterator.")
628            .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
629        assert_eq!(
630            metadata,
631            to_hashset([
632                (doc_1.id(), doc_1.last_updated(), true),
633                (doc_2.id(), doc_2.last_updated(), true),
634                (doc_3.id(), doc_3.last_updated(), true),
635            ]),
636            "Persisted metadata entries should match expected values."
637        );
638
639        storage
640            .remove_tombstones(KEYSPACE, [1, 2].into_iter())
641            .await
642            .expect("Remove tombstone entries.");
643        let metadata = storage
644            .iter_metadata(KEYSPACE)
645            .await
646            .expect("Produce metadata iterator.")
647            .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
648        assert_eq!(
649            metadata,
650            to_hashset([(doc_3.id(), doc_3.last_updated(), true)]),
651            "Persisted metadata entries should match expected values after removal."
652        );
653
654        doc_1.metadata.last_updated = clock.send().unwrap();
655        doc_2.metadata.last_updated = clock.send().unwrap();
656        doc_3.metadata.last_updated = clock.send().unwrap();
657        storage
658            .multi_put(
659                KEYSPACE,
660                [doc_1.clone(), doc_2.clone(), doc_3.clone()].into_iter(),
661            )
662            .await
663            .expect("Set metadata entry 3.");
664        let metadata = storage
665            .iter_metadata(KEYSPACE)
666            .await
667            .expect("Produce metadata iterator.")
668            .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
669        assert_eq!(
670            metadata,
671            to_hashset([
672                (doc_1.id(), doc_1.last_updated(), false),
673                (doc_2.id(), doc_2.last_updated(), false),
674                (doc_3.id(), doc_3.last_updated(), false),
675            ]),
676            "Persisted metadata entries should match expected values after update."
677        );
678
679        doc_1.metadata.last_updated = clock.send().unwrap();
680        doc_2.metadata.last_updated = clock.send().unwrap();
681        doc_3.metadata.last_updated = clock.send().unwrap();
682        storage
683            .mark_many_as_tombstone(
684                KEYSPACE,
685                [doc_1.metadata, doc_2.metadata, doc_3.metadata].into_iter(),
686            )
687            .await
688            .expect("Mark documents as tombstones.");
689        let res = storage
690            .remove_tombstones(KEYSPACE, [1, 2, 3].into_iter())
691            .await;
692        assert!(
693            res.is_ok(),
694            "Expected successful removal of given metadata keys. Got: {:?}",
695            res
696        );
697
698        let metadata = storage
699            .iter_metadata(KEYSPACE)
700            .await
701            .expect("Produce metadata iterator.")
702            .count();
703        assert_eq!(
704            metadata, 0,
705            "Persisted metadata entries should be empty after tombstone purge."
706        );
707
708        doc_1.metadata.last_updated = clock.send().unwrap();
709        doc_2.metadata.last_updated = clock.send().unwrap();
710        doc_3.metadata.last_updated = clock.send().unwrap();
711        let doc_4_ts = clock.send().unwrap();
712        storage
713            .mark_many_as_tombstone(
714                KEYSPACE,
715                [
716                    doc_1.metadata,
717                    doc_2.metadata,
718                    doc_3.metadata,
719                    DocumentMetadata::new(4, doc_4_ts),
720                ]
721                .into_iter(),
722            )
723            .await
724            .expect("Mark documents as tombstones.");
725        let metadata = storage
726            .iter_metadata(KEYSPACE)
727            .await
728            .expect("Produce metadata iterator.")
729            .collect::<HashSet<(Key, HLCTimestamp, bool)>>();
730        assert_eq!(
731            metadata,
732            to_hashset([
733                (doc_1.id(), doc_1.last_updated(), true),
734                (doc_2.id(), doc_2.last_updated(), true),
735                (doc_3.id(), doc_3.last_updated(), true),
736                (4, doc_4_ts, true),
737            ]),
738            "Persisted tombstones should be tracked."
739        );
740    }
741
742    #[instrument(name = "test_basic_persistence_test", skip(storage))]
743    async fn test_basic_persistence_test<S: Storage + Sync>(
744        storage: &S,
745        clock: &mut HLCTimestamp,
746    ) {
747        info!("Starting test");
748
749        static KEYSPACE: &str = "persistence-test-keyspace";
750
751        let res = storage.get(KEYSPACE, 1).await;
752        assert!(
753            res.is_ok(),
754            "Expected successful get request. Got: {:?}",
755            res
756        );
757        assert!(
758            res.unwrap().is_none(),
759            "Expected no document to be returned."
760        );
761
762        #[allow(clippy::needless_collect)]
763        let res = storage
764            .multi_get(KEYSPACE, [1, 2, 3].into_iter())
765            .await
766            .expect("Expected successful get request.")
767            .collect::<Vec<_>>();
768        assert!(res.is_empty(), "Expected no document to be returned.");
769
770        let mut doc_1 =
771            Document::new(1, clock.send().unwrap(), b"Hello, world!".to_vec());
772        let mut doc_2 = Document::new(2, clock.send().unwrap(), Vec::new());
773        let mut doc_3 = Document::new(
774            3,
775            clock.send().unwrap(),
776            b"Hello, from document 3!".to_vec(),
777        );
778        let doc_3_updated = Document::new(
779            3,
780            clock.send().unwrap(),
781            b"Hello, from document 3 With an update!".to_vec(),
782        );
783
784        storage
785            .put_with_ctx(KEYSPACE, doc_1.clone(), None)
786            .await
787            .expect("Put document in persistent store.");
788        let res = storage.get(KEYSPACE, 1).await;
789        assert!(
790            res.is_ok(),
791            "Expected successful get request. Got: {:?}",
792            res
793        );
794        let doc = res
795            .unwrap()
796            .expect("Expected document to be returned after inserting doc.");
797        assert_eq!(doc, doc_1, "Returned document should match.");
798
799        storage
800            .multi_put(KEYSPACE, [doc_3.clone(), doc_2.clone()].into_iter())
801            .await
802            .expect("Put document in persistent store.");
803        let res = storage
804            .multi_get(KEYSPACE, [1, 2, 3].into_iter())
805            .await
806            .expect("Expected successful get request.")
807            .collect::<HashSet<_>>();
808        assert_eq!(
809            res,
810            to_hashset([doc_1.clone(), doc_2.clone(), doc_3.clone()]),
811            "Documents returned should match provided."
812        );
813
814        storage
815            .put_with_ctx(KEYSPACE, doc_3_updated.clone(), None)
816            .await
817            .expect("Put updated document in persistent store.");
818        let res = storage
819            .get(KEYSPACE, 3)
820            .await
821            .expect("Get updated document.");
822        let doc = res.expect("Expected document to be returned after updating doc.");
823        assert_eq!(doc, doc_3_updated, "Returned document should match.");
824
825        doc_2.metadata.last_updated = clock.send().unwrap();
826        storage
827            .mark_as_tombstone(KEYSPACE, doc_2.id(), doc_2.last_updated())
828            .await
829            .expect("Mark document as tombstone.");
830        let res = storage.get(KEYSPACE, 2).await;
831        assert!(
832            res.is_ok(),
833            "Expected successful get request. Got: {:?}",
834            res
835        );
836        assert!(
837            res.unwrap().is_none(),
838            "Expected no document to be returned."
839        );
840
841        doc_1.metadata.last_updated = clock.send().unwrap();
842        doc_2.metadata.last_updated = clock.send().unwrap();
843        storage
844            .mark_many_as_tombstone(
845                KEYSPACE,
846                [
847                    doc_1.metadata,
848                    doc_2.metadata,
849                    DocumentMetadata::new(4, clock.send().unwrap()),
850                ]
851                .into_iter(),
852            )
853            .await
854            .expect("Merk documents as tombstones");
855        let res = storage
856            .multi_get(KEYSPACE, [1, 2, 3].into_iter())
857            .await
858            .expect("Expected successful get request.")
859            .collect::<HashSet<_>>();
860        assert_eq!(
861            res,
862            to_hashset([doc_3_updated]),
863            "Expected returned documents to match.",
864        );
865
866        doc_3.metadata.last_updated = clock.send().unwrap();
867        storage
868            .mark_as_tombstone(KEYSPACE, doc_3.id(), doc_3.last_updated())
869            .await
870            .expect("Delete documents from store.");
871        #[allow(clippy::needless_collect)]
872        let res = storage
873            .multi_get(KEYSPACE, [1, 2, 3].into_iter())
874            .await
875            .expect("Expected successful get request.")
876            .collect::<Vec<_>>();
877        assert!(res.is_empty(), "Expected no documents to be returned.");
878    }
879
880    fn to_hashset<T: Hash + Eq>(iter: impl IntoIterator<Item = T>) -> HashSet<T> {
881        iter.into_iter().collect()
882    }
883}