Skip to main content

p2panda_store/
memory.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! In-memory persistence for p2panda operations and logs.
4use std::collections::{BTreeSet, HashMap};
5use std::convert::Infallible;
6use std::fmt::Debug;
7use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
8
9use p2panda_core::{Body, Extensions, Hash, Header, PublicKey, RawOperation};
10
11use crate::operations::{LogId, LogStore, OperationStore};
12
13type SeqNum = u64;
14type Timestamp = u64;
15type RawHeader = Vec<u8>;
16
17type LogMeta = (SeqNum, Timestamp, Hash);
18type StoredOperation<L, E> = (L, Header<E>, Option<Body>, RawHeader);
19
20/// An in-memory store for core p2panda data types: `Operation` and `Log`.
21#[derive(Clone, Debug)]
22pub struct InnerMemoryStore<L, E> {
23    operations: HashMap<Hash, StoredOperation<L, E>>,
24    logs: HashMap<(PublicKey, L), BTreeSet<LogMeta>>,
25}
26
27/// An in-memory store for core p2panda data types: `Operation` and log.
28///
29/// `MemoryStore` supports usage in asynchronous and multi-threaded contexts by wrapping an
30/// `InnerMemoryStore` with an `RwLock` and `Arc`. Convenience methods are provided to obtain a
31/// read- or write-lock on the underlying store.
32#[derive(Clone, Debug)]
33pub struct MemoryStore<L, E = ()> {
34    inner: Arc<RwLock<InnerMemoryStore<L, E>>>,
35}
36
37impl<L, E> MemoryStore<L, E> {
38    /// Create a new in-memory store.
39    pub fn new() -> Self {
40        let inner = InnerMemoryStore {
41            operations: HashMap::new(),
42            logs: HashMap::new(),
43        };
44
45        Self {
46            inner: Arc::new(RwLock::new(inner)),
47        }
48    }
49}
50
51impl<T> Default for MemoryStore<T, ()> {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57impl<T, E> MemoryStore<T, E> {
58    /// Obtain a read-lock on the store.
59    pub fn read_store(&self) -> RwLockReadGuard<'_, InnerMemoryStore<T, E>> {
60        self.inner
61            .read()
62            .expect("acquire shared read access on store")
63    }
64
65    /// Obtain a write-lock on the store.
66    pub fn write_store(&self) -> RwLockWriteGuard<'_, InnerMemoryStore<T, E>> {
67        self.inner
68            .write()
69            .expect("acquire exclusive write access on store")
70    }
71}
72
73impl<L, E> OperationStore<L, E> for MemoryStore<L, E>
74where
75    L: LogId + Send + Sync,
76    E: Extensions + Send + Sync,
77{
78    type Error = Infallible;
79
80    async fn insert_operation(
81        &mut self,
82        hash: Hash,
83        header: &Header<E>,
84        body: Option<&Body>,
85        header_bytes: &[u8],
86        log_id: &L,
87    ) -> Result<bool, Self::Error> {
88        let mut store = self.write_store();
89
90        let log_meta = (header.seq_num, header.timestamp, hash);
91        let insertion_occured = store
92            .logs
93            .entry((header.public_key, log_id.to_owned()))
94            .or_default()
95            .insert(log_meta);
96
97        if insertion_occured {
98            let entry = (
99                log_id.to_owned(),
100                header.to_owned(),
101                body.cloned(),
102                header_bytes.to_vec(),
103            );
104            store.operations.insert(hash, entry);
105        }
106
107        Ok(insertion_occured)
108    }
109
110    async fn get_operation(
111        &self,
112        hash: Hash,
113    ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
114        match self.read_store().operations.get(&hash) {
115            Some((_, header, body, _)) => Ok(Some((header.clone(), body.clone()))),
116            None => Ok(None),
117        }
118    }
119
120    async fn get_raw_operation(&self, hash: Hash) -> Result<Option<RawOperation>, Self::Error> {
121        match self.read_store().operations.get(&hash) {
122            Some((_, _, body, header_bytes)) => Ok(Some((
123                header_bytes.clone(),
124                body.as_ref().map(|body| body.to_bytes()),
125            ))),
126            None => Ok(None),
127        }
128    }
129
130    async fn has_operation(&self, hash: Hash) -> Result<bool, Self::Error> {
131        Ok(self.read_store().operations.contains_key(&hash))
132    }
133
134    async fn delete_operation(&mut self, hash: Hash) -> Result<bool, Self::Error> {
135        let mut store = self.write_store();
136        let Some((_, header, _, _)) = store.operations.remove(&hash) else {
137            return Ok(false);
138        };
139        store.logs = store
140            .logs
141            .clone()
142            .into_iter()
143            .filter_map(|(key, mut log)| {
144                log.remove(&(header.seq_num, header.timestamp, hash));
145                if log.is_empty() {
146                    None
147                } else {
148                    Some((key, log))
149                }
150            })
151            .collect();
152
153        Ok(true)
154    }
155
156    async fn delete_payload(&mut self, hash: Hash) -> Result<bool, Self::Error> {
157        if let Some(operation) = self.write_store().operations.get_mut(&hash) {
158            operation.2 = None;
159            Ok(true)
160        } else {
161            Ok(false)
162        }
163    }
164}
165
166impl<L, E> LogStore<L, E> for MemoryStore<L, E>
167where
168    L: LogId + Send + Sync,
169    E: Extensions + Send + Sync,
170{
171    type Error = Infallible;
172
173    async fn get_log(
174        &self,
175        public_key: &PublicKey,
176        log_id: &L,
177        from: Option<u64>,
178    ) -> Result<Option<Vec<(Header<E>, Option<Body>)>>, Self::Error> {
179        let store = self.read_store();
180        match store.logs.get(&(*public_key, log_id.to_owned())) {
181            Some(log) => {
182                let mut result = Vec::new();
183                if let Some(from) = from {
184                    log.iter().for_each(|(seq_num, _, hash)| {
185                        if *seq_num >= from {
186                            let (_, header, body, _) =
187                                store.operations.get(hash).expect("exists in hash map");
188                            result.push((header.to_owned(), body.to_owned()));
189                        }
190                    });
191                } else {
192                    log.iter().for_each(|(_, _, hash)| {
193                        let (_, header, body, _) =
194                            store.operations.get(hash).expect("exists in hash map");
195                        result.push((header.to_owned(), body.to_owned()));
196                    });
197                }
198                Ok(Some(result))
199            }
200            None => Ok(None),
201        }
202    }
203
204    async fn get_raw_log(
205        &self,
206        public_key: &PublicKey,
207        log_id: &L,
208        from: Option<u64>,
209    ) -> Result<Option<Vec<RawOperation>>, Self::Error> {
210        let store = self.read_store();
211        match store.logs.get(&(*public_key, log_id.to_owned())) {
212            Some(log) => {
213                let mut result = Vec::new();
214                if let Some(from) = from {
215                    log.iter().for_each(|(seq_num, _, hash)| {
216                        if *seq_num >= from {
217                            let (_, _, body, header_bytes) =
218                                store.operations.get(hash).expect("exists in hash map");
219                            result.push((
220                                header_bytes.clone(),
221                                body.as_ref().map(|body| body.to_bytes()),
222                            ));
223                        }
224                    });
225                } else {
226                    log.iter().for_each(|(_, _, hash)| {
227                        let (_, _, body, header_bytes) =
228                            store.operations.get(hash).expect("exists in hash map");
229                        result.push((
230                            header_bytes.clone(),
231                            body.as_ref().map(|body| body.to_bytes()),
232                        ));
233                    });
234                }
235                Ok(Some(result))
236            }
237            None => Ok(None),
238        }
239    }
240
241    async fn get_log_size(
242        &self,
243        public_key: &PublicKey,
244        log_id: &L,
245        from: Option<u64>,
246    ) -> Result<Option<u64>, Self::Error> {
247        let store = self.read_store();
248        match store.logs.get(&(*public_key, log_id.to_owned())) {
249            Some(log) => {
250                let mut bytes_count = 0;
251                if let Some(from) = from {
252                    log.iter().for_each(|(seq_num, _, hash)| {
253                        if *seq_num >= from {
254                            let (_, header, _, header_bytes) =
255                                store.operations.get(hash).expect("exists in hash map");
256                            bytes_count += header.payload_size;
257                            bytes_count += header_bytes.len() as u64;
258                        }
259                    });
260                } else {
261                    log.iter().for_each(|(_, _, hash)| {
262                        let (_, header, _, header_bytes) =
263                            store.operations.get(hash).expect("exists in hash map");
264                        bytes_count += header.payload_size;
265                        bytes_count += header_bytes.len() as u64;
266                    });
267                }
268                Ok(Some(bytes_count))
269            }
270            None => Ok(None),
271        }
272    }
273
274    async fn get_log_hashes(
275        &self,
276        public_key: &PublicKey,
277        log_id: &L,
278        from: Option<u64>,
279    ) -> Result<Option<Vec<(u64, Hash)>>, Self::Error> {
280        let store = self.read_store();
281        match store.logs.get(&(*public_key, log_id.to_owned())) {
282            Some(log) => {
283                let mut hashes = Vec::new();
284                if let Some(from) = from {
285                    log.iter().for_each(|(seq_num, _, hash)| {
286                        if *seq_num >= from {
287                            let (_, header, _, _) =
288                                store.operations.get(hash).expect("exists in hash map");
289                            hashes.push((header.seq_num, header.hash()));
290                        }
291                    });
292                } else {
293                    log.iter().for_each(|(_, _, hash)| {
294                        let (_, header, _, _) =
295                            store.operations.get(hash).expect("exists in hash map");
296                        hashes.push((header.seq_num, header.hash()));
297                    });
298                }
299                Ok(Some(hashes))
300            }
301            None => Ok(None),
302        }
303    }
304
305    async fn latest_operation(
306        &self,
307        public_key: &PublicKey,
308        log_id: &L,
309    ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
310        let store = self.read_store();
311
312        let Some(log) = store.logs.get(&(*public_key, log_id.to_owned())) else {
313            return Ok(None);
314        };
315
316        let Some((_, _, hash)) = log.last() else {
317            return Ok(None);
318        };
319
320        let Some((_, header, body, _)) = store.operations.get(hash) else {
321            return Ok(None);
322        };
323
324        Ok(Some((header.to_owned(), body.to_owned())))
325    }
326
327    async fn delete_operations(
328        &mut self,
329        public_key: &PublicKey,
330        log_id: &L,
331        before: u64,
332    ) -> Result<bool, Self::Error> {
333        let mut deleted = vec![];
334        let mut store = self.write_store();
335        if let Some(log) = store.logs.get_mut(&(*public_key, log_id.to_owned())) {
336            log.retain(|(seq_num, _, hash)| {
337                let remove = *seq_num < before;
338                if remove {
339                    deleted.push(*hash);
340                };
341                !remove
342            });
343        };
344        store.operations.retain(|hash, _| !deleted.contains(hash));
345        Ok(!deleted.is_empty())
346    }
347
348    async fn delete_payloads(
349        &mut self,
350        public_key: &PublicKey,
351        log_id: &L,
352        from: u64,
353        to: u64,
354    ) -> Result<bool, Self::Error> {
355        let mut deleted = vec![];
356        {
357            let store = self.read_store();
358            if let Some(log) = store.logs.get(&(*public_key, log_id.to_owned())) {
359                log.iter().for_each(|(seq_num, _, hash)| {
360                    if *seq_num >= from && *seq_num < to {
361                        deleted.push(*hash)
362                    };
363                });
364            };
365        }
366        let mut store = self.write_store();
367        for hash in &deleted {
368            let operation = store
369                .operations
370                .get_mut(hash)
371                .expect("operation exists in store");
372            operation.2 = None;
373        }
374        Ok(!deleted.is_empty())
375    }
376
377    async fn get_log_heights(&self, log_id: &L) -> Result<Vec<(PublicKey, SeqNum)>, Self::Error> {
378        let log_heights = self
379            .read_store()
380            .logs
381            .iter()
382            .filter_map(|((public_key, inner_log_id), log)| {
383                if inner_log_id == log_id {
384                    let log_height = log
385                        .last()
386                        .expect("all logs contain at least one operation")
387                        .0;
388                    Some((*public_key, log_height))
389                } else {
390                    None
391                }
392            })
393            .collect();
394        Ok(log_heights)
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use p2panda_core::{Body, Hash, Header, PrivateKey};
401    use serde::{Deserialize, Serialize};
402
403    use crate::operations::{LogStore, OperationStore};
404
405    use super::MemoryStore;
406
407    fn create_operation(
408        private_key: &PrivateKey,
409        body: &Body,
410        seq_num: u64,
411        timestamp: u64,
412        backlink: Option<Hash>,
413    ) -> (Hash, Header<()>, Vec<u8>) {
414        let mut header = Header {
415            version: 1,
416            public_key: private_key.public_key(),
417            signature: None,
418            payload_size: body.size(),
419            payload_hash: Some(body.hash()),
420            timestamp,
421            seq_num,
422            backlink,
423            previous: vec![],
424            extensions: (),
425        };
426        header.sign(private_key);
427        let header_bytes = header.to_bytes();
428        (header.hash(), header, header_bytes)
429    }
430
431    #[tokio::test]
432    async fn default_memory_store() {
433        let mut store = MemoryStore::default();
434        let private_key = PrivateKey::new();
435        let body = Body::new("hello!".as_bytes());
436
437        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
438        let inserted = store
439            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
440            .await
441            .expect("no errors");
442        assert!(inserted);
443    }
444
445    #[tokio::test]
446    async fn generic_extensions_mem_store() {
447        // Define our own custom extension type.
448        #[derive(Clone, Debug, Default, Serialize, Deserialize)]
449        struct MyExtension {}
450
451        // Construct a new store.
452        let mut store = MemoryStore::new();
453
454        // Construct an operation using the custom extension.
455        let private_key = PrivateKey::new();
456        let body = Body::new("hello!".as_bytes());
457        let mut header = Header {
458            version: 1,
459            public_key: private_key.public_key(),
460            signature: None,
461            payload_size: body.size(),
462            payload_hash: Some(body.hash()),
463            timestamp: 0,
464            seq_num: 0,
465            backlink: None,
466            previous: vec![],
467            extensions: Some(MyExtension {}),
468        };
469        header.sign(&private_key);
470
471        // Insert the operation into the store, the extension type is inferred.
472        let inserted = store
473            .insert_operation(header.hash(), &header, Some(&body), &header.to_bytes(), &0)
474            .await
475            .expect("no errors");
476        assert!(inserted);
477    }
478
479    #[tokio::test]
480    async fn insert_get_operation() {
481        let mut store = MemoryStore::default();
482        let private_key = PrivateKey::new();
483        let body = Body::new("hello!".as_bytes());
484
485        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
486
487        let inserted = store
488            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
489            .await
490            .expect("no errors");
491        assert!(inserted);
492        assert!(store.has_operation(hash).await.expect("no error"));
493
494        let (header_again, body_again) = store
495            .get_operation(hash)
496            .await
497            .expect("no error")
498            .expect("operation exist");
499
500        assert_eq!(header.hash(), header_again.hash());
501        assert_eq!(Some(body.clone()), body_again);
502
503        let (header_bytes_again, body_bytes_again) = store
504            .get_raw_operation(hash)
505            .await
506            .expect("no error")
507            .expect("operation exist");
508
509        assert_eq!(header_bytes_again, header_bytes);
510        assert_eq!(body_bytes_again, Some(body.to_bytes()));
511    }
512
513    #[tokio::test]
514    async fn delete_operation() {
515        let mut store: MemoryStore<i32> = MemoryStore::default();
516        let private_key = PrivateKey::new();
517        let body = Body::new("hello!".as_bytes());
518
519        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
520
521        // Insert one operation.
522        let inserted = store
523            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
524            .await
525            .expect("no errors");
526        assert!(inserted);
527
528        // We expect one log and one operation.
529        assert_eq!(store.read_store().logs.len(), 1);
530        assert_eq!(store.read_store().operations.len(), 1);
531
532        // Delete the operation.
533        assert!(store.delete_operation(hash).await.expect("no error"));
534
535        // We expect no logs and no operations.
536        assert_eq!(store.read_store().logs.len(), 0);
537        assert_eq!(store.read_store().operations.len(), 0);
538
539        let deleted_operation = store.get_operation(hash).await.expect("no error");
540        assert!(deleted_operation.is_none());
541        assert!(!store.has_operation(hash).await.expect("no error"));
542
543        let deleted_raw_operation = store.get_raw_operation(hash).await.expect("no error");
544        assert!(deleted_raw_operation.is_none());
545    }
546
547    #[tokio::test]
548    async fn delete_payload() {
549        let mut store = MemoryStore::default();
550        let private_key = PrivateKey::new();
551        let body = Body::new("hello!".as_bytes());
552
553        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
554
555        let inserted = store
556            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
557            .await
558            .expect("no errors");
559        assert!(inserted);
560
561        assert!(store.delete_payload(hash).await.expect("no error"));
562
563        let (_, no_body) = store
564            .get_operation(hash)
565            .await
566            .expect("no error")
567            .expect("operation exist");
568        assert!(no_body.is_none());
569        assert!(store.has_operation(hash).await.expect("no error"));
570
571        let (_, no_body) = store
572            .get_raw_operation(hash)
573            .await
574            .expect("no error")
575            .expect("operation exist");
576        assert!(no_body.is_none());
577    }
578
579    #[tokio::test]
580    async fn get_log() {
581        let mut store = MemoryStore::default();
582        let private_key = PrivateKey::new();
583        let log_id = 0;
584
585        let body_0 = Body::new("hello!".as_bytes());
586        let body_1 = Body::new("hello again!".as_bytes());
587        let body_2 = Body::new("hello for a third time!".as_bytes());
588
589        let (hash_0, header_0, header_bytes_0) =
590            create_operation(&private_key, &body_0, 0, 0, None);
591        let (hash_1, header_1, header_bytes_1) =
592            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
593        let (hash_2, header_2, header_bytes_2) =
594            create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
595
596        store
597            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
598            .await
599            .expect("no errors");
600        store
601            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
602            .await
603            .expect("no errors");
604        store
605            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
606            .await
607            .expect("no errors");
608
609        // Get all log operations.
610        let log = store
611            .get_log(&private_key.public_key(), &log_id, None)
612            .await
613            .expect("no errors")
614            .expect("log should exist");
615
616        assert_eq!(log.len(), 3);
617        assert_eq!(log[0].0.hash(), hash_0);
618        assert_eq!(log[1].0.hash(), hash_1);
619        assert_eq!(log[2].0.hash(), hash_2);
620        assert_eq!(log[0].1, Some(body_0.clone()));
621        assert_eq!(log[1].1, Some(body_1.clone()));
622        assert_eq!(log[2].1, Some(body_2.clone()));
623
624        // Get all log operations starting from sequence number 1.
625        let log = store
626            .get_log(&private_key.public_key(), &log_id, Some(1))
627            .await
628            .expect("no errors")
629            .expect("log should exist");
630
631        assert_eq!(log.len(), 2);
632        assert_eq!(log[0].0.hash(), hash_1);
633        assert_eq!(log[1].0.hash(), hash_2);
634        assert_eq!(log[0].1, Some(body_1.clone()));
635        assert_eq!(log[1].1, Some(body_2.clone()));
636
637        // Get all raw log operations.
638        let log = store
639            .get_raw_log(&private_key.public_key(), &log_id, None)
640            .await
641            .expect("no errors")
642            .expect("log should exist");
643
644        assert_eq!(log.len(), 3);
645        assert_eq!(log[0].0, header_bytes_0);
646        assert_eq!(log[1].0, header_bytes_1);
647        assert_eq!(log[2].0, header_bytes_2);
648        assert_eq!(log[0].1, Some(body_0.to_bytes()));
649        assert_eq!(log[1].1, Some(body_1.to_bytes()));
650        assert_eq!(log[2].1, Some(body_2.to_bytes()));
651
652        // Get all raw log operations starting from sequence number 1.
653        let log = store
654            .get_raw_log(&private_key.public_key(), &log_id, Some(1))
655            .await
656            .expect("no errors")
657            .expect("log should exist");
658
659        assert_eq!(log.len(), 2);
660        assert_eq!(log[0].0, header_bytes_1);
661        assert_eq!(log[1].0, header_bytes_2);
662        assert_eq!(log[0].1, Some(body_1.to_bytes()));
663        assert_eq!(log[1].1, Some(body_2.to_bytes()));
664    }
665
666    #[tokio::test]
667    async fn get_log_hashes_and_size() {
668        let mut store = MemoryStore::default();
669        let private_key = PrivateKey::new();
670        let log_id = 0;
671
672        let body_0 = Body::new("hello!".as_bytes());
673        let body_1 = Body::new("hello again!".as_bytes());
674        let body_2 = Body::new("hello for a third time!".as_bytes());
675
676        let (hash_0, header_0, header_bytes_0) =
677            create_operation(&private_key, &body_0, 0, 0, None);
678        let (hash_1, header_1, header_bytes_1) =
679            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
680        let (hash_2, header_2, header_bytes_2) =
681            create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
682
683        store
684            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
685            .await
686            .expect("no errors");
687        store
688            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
689            .await
690            .expect("no errors");
691        store
692            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
693            .await
694            .expect("no errors");
695
696        // Get all log hashes.
697        let hashes = store
698            .get_log_hashes(&private_key.public_key(), &log_id, None)
699            .await
700            .expect("no errors")
701            .expect("log should exist");
702
703        assert_eq!(hashes.len(), 3);
704        assert_eq!(hashes[0], (0, hash_0));
705        assert_eq!(hashes[1], (1, hash_1));
706        assert_eq!(hashes[2], (2, hash_2));
707
708        // Get sum of log byte lengths.
709        let size = store
710            .get_log_size(&private_key.public_key(), &log_id, None)
711            .await
712            .expect("no errors")
713            .expect("log should exist");
714
715        let expected_size = header_bytes_0.len() as u64
716            + header_0.payload_size
717            + header_bytes_1.len() as u64
718            + header_1.payload_size
719            + header_bytes_2.len() as u64
720            + header_2.payload_size;
721        assert_eq!(size, expected_size);
722
723        // Get all log hashes starting from sequence number 1.
724        let hashes = store
725            .get_log_hashes(&private_key.public_key(), &log_id, Some(1))
726            .await
727            .expect("no errors")
728            .expect("log should exist");
729
730        assert_eq!(hashes.len(), 2);
731        assert_eq!(hashes[0], (1, hash_1));
732        assert_eq!(hashes[1], (2, hash_2));
733
734        // Get sum of log byte lengths from sequence number 1.
735        let size = store
736            .get_log_size(&private_key.public_key(), &log_id, Some(1))
737            .await
738            .expect("no errors")
739            .expect("log should exist");
740
741        let expected_size = header_bytes_1.len() as u64
742            + header_1.payload_size
743            + header_bytes_2.len() as u64
744            + header_2.payload_size;
745        assert_eq!(size, expected_size);
746    }
747
748    #[tokio::test]
749    async fn insert_many_get_one_log() {
750        let mut store = MemoryStore::default();
751        let private_key = PrivateKey::new();
752        let log_a_id = "a";
753        let log_b_id = "b";
754
755        let body_a0 = Body::new("hello from log a!".as_bytes());
756        let body_a1 = Body::new("hello from log a again!".as_bytes());
757        let (hash_a0, header_a0, header_bytes_a0) =
758            create_operation(&private_key, &body_a0, 0, 0, None);
759        let (hash_a1, header_a1, header_bytes_a1) =
760            create_operation(&private_key, &body_a1, 1, 1, Some(hash_a0));
761
762        let inserted = store
763            .insert_operation(
764                hash_a0,
765                &header_a0,
766                Some(&body_a0),
767                &header_bytes_a0,
768                &log_a_id,
769            )
770            .await
771            .expect("no errors");
772        assert!(inserted);
773
774        let inserted = store
775            .insert_operation(
776                hash_a1,
777                &header_a1,
778                Some(&body_a1),
779                &header_bytes_a1,
780                &log_a_id,
781            )
782            .await
783            .expect("no errors");
784        assert!(inserted);
785
786        let body_b0 = Body::new("hello from log b!".as_bytes());
787        let body_b1 = Body::new("hello from log b again!".as_bytes());
788        let (hash_b0, header_b0, header_bytes_b0) =
789            create_operation(&private_key, &body_b0, 0, 3, None);
790        let (hash_b1, header_b1, header_bytes_b1) =
791            create_operation(&private_key, &body_b1, 1, 4, Some(hash_b0));
792
793        store
794            .insert_operation(
795                hash_b0,
796                &header_b0,
797                Some(&body_b0),
798                &header_bytes_b0,
799                &log_b_id,
800            )
801            .await
802            .expect("no errors");
803
804        store
805            .insert_operation(
806                hash_b1,
807                &header_b1,
808                Some(&body_b1),
809                &header_bytes_b1,
810                &log_b_id,
811            )
812            .await
813            .expect("no errors");
814
815        let log_a = store
816            .get_log(&private_key.public_key(), &log_a_id, None)
817            .await
818            .expect("no errors")
819            .expect("log should exist");
820
821        assert_eq!(log_a.len(), 2);
822        assert_eq!(log_a[0].0.hash(), header_a0.hash());
823        assert_eq!(log_a[1].0.hash(), header_a1.hash());
824
825        let log_b = store
826            .get_log(&private_key.public_key(), &log_b_id, None)
827            .await
828            .expect("no errors")
829            .expect("log should exist");
830
831        assert_eq!(log_b.len(), 2);
832        assert_eq!(log_b[0].0.hash(), header_b0.hash());
833        assert_eq!(log_b[1].0.hash(), header_b1.hash());
834    }
835
836    #[tokio::test]
837    async fn many_authors_same_log_id() {
838        let mut store = MemoryStore::default();
839        let private_key_a = PrivateKey::new();
840        let private_key_b = PrivateKey::new();
841        let log_id = 0;
842        let body = Body::new("hello!".as_bytes());
843
844        let (hash_a, header_a, header_bytes_a) =
845            create_operation(&private_key_a, &body, 0, 0, None);
846        let inserted = store
847            .insert_operation(hash_a, &header_a, Some(&body), &header_bytes_a, &log_id)
848            .await
849            .expect("no errors");
850        assert!(inserted);
851
852        let (hash_b, header_b, header_bytes_b) =
853            create_operation(&private_key_b, &body, 0, 0, None);
854        let inserted = store
855            .insert_operation(hash_b, &header_b, Some(&body), &header_bytes_b, &log_id)
856            .await
857            .expect("no errors");
858        assert!(inserted);
859
860        let author_a_log = store
861            .get_log(&private_key_a.public_key(), &log_id, None)
862            .await
863            .expect("no errors")
864            .expect("log should exist");
865
866        assert_eq!(author_a_log.len(), 1);
867        assert_eq!(author_a_log[0].0.hash(), header_a.hash());
868
869        let author_b_log = store
870            .get_log(&private_key_b.public_key(), &log_id, None)
871            .await
872            .expect("no errors")
873            .expect("log should exist");
874
875        assert_eq!(author_b_log.len(), 1);
876        assert_eq!(author_b_log[0].0.hash(), header_b.hash());
877    }
878
879    #[tokio::test]
880    async fn get_latest_operation() {
881        let mut store = MemoryStore::default();
882        let private_key = PrivateKey::new();
883        let log_id = 0;
884
885        let body_0 = Body::new("hello!".as_bytes());
886        let body_1 = Body::new("hello again!".as_bytes());
887
888        let (hash_0, header_0, header_bytes_0) =
889            create_operation(&private_key, &body_0, 0, 0, None);
890        let (hash_1, header_1, header_bytes_1) =
891            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
892
893        store
894            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
895            .await
896            .expect("no errors");
897        store
898            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
899            .await
900            .expect("no errors");
901
902        let (latest_header, latest_body) = store
903            .latest_operation(&private_key.public_key(), &log_id)
904            .await
905            .expect("no errors")
906            .expect("there's an operation");
907
908        assert_eq!(latest_header.hash(), header_1.hash());
909        assert_eq!(latest_body, Some(body_1));
910    }
911
912    #[tokio::test]
913    async fn delete_operations() {
914        let mut store = MemoryStore::default();
915        let private_key = PrivateKey::new();
916        let log_id = 0;
917
918        let body_0 = Body::new("hello!".as_bytes());
919        let body_1 = Body::new("hello again!".as_bytes());
920        let body_2 = Body::new("final hello!".as_bytes());
921
922        let (hash_0, header_0, header_bytes_0) =
923            create_operation(&private_key, &body_0, 0, 0, None);
924        let (hash_1, header_1, header_bytes_1) =
925            create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
926        let (hash_2, header_2, header_bytes_2) =
927            create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
928
929        store
930            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
931            .await
932            .expect("no errors");
933        store
934            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
935            .await
936            .expect("no errors");
937        store
938            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
939            .await
940            .expect("no errors");
941
942        // We expect one log and 3 operations.
943        assert_eq!(store.read_store().logs.len(), 1);
944        assert_eq!(store.read_store().operations.len(), 3);
945
946        // Delete all operations _before_ seq_num 2.
947        let deleted = store
948            .delete_operations(&private_key.public_key(), &log_id, 2)
949            .await
950            .expect("no errors");
951        assert!(deleted);
952
953        // There is now only one operation in the log.
954        assert_eq!(store.read_store().logs.len(), 1);
955        assert_eq!(store.read_store().operations.len(), 1);
956
957        // The remaining operation in the log should be the latest (seq_num == 2).
958        let log = store
959            .get_log(&private_key.public_key(), &log_id, None)
960            .await
961            .expect("no errors")
962            .expect("log should exist");
963        assert_eq!(log[0].0.hash(), header_2.hash());
964
965        // Deleting the same range again should return `false`, meaning no deletion occurred.
966        let deleted = store
967            .delete_operations(&private_key.public_key(), &log_id, 2)
968            .await
969            .expect("no errors");
970        assert!(!deleted);
971    }
972
973    #[tokio::test]
974    async fn delete_payloads() {
975        let mut store = MemoryStore::default();
976        let private_key = PrivateKey::new();
977        let log_id = 0;
978
979        let body_0 = Body::new("hello!".as_bytes());
980        let body_1 = Body::new("hello again!".as_bytes());
981        let body_2 = Body::new("final hello!".as_bytes());
982
983        let (hash_0, header_0, header_bytes_0) =
984            create_operation(&private_key, &body_0, 0, 0, None);
985        let (hash_1, header_1, header_bytes_1) =
986            create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
987        let (hash_2, header_2, header_bytes_2) =
988            create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
989
990        store
991            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
992            .await
993            .expect("no errors");
994        store
995            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
996            .await
997            .expect("no errors");
998        store
999            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
1000            .await
1001            .expect("no errors");
1002
1003        // There is one log and 3 operations.
1004        assert_eq!(store.read_store().logs.len(), 1);
1005        assert_eq!(store.read_store().operations.len(), 3);
1006
1007        // Delete all operation payloads from sequence number 0 up to but not including 2.
1008        let deleted = store
1009            .delete_payloads(&private_key.public_key(), &log_id, 0, 2)
1010            .await
1011            .expect("no errors");
1012        assert!(deleted);
1013
1014        let log = store
1015            .get_log(&private_key.public_key(), &log_id, None)
1016            .await
1017            .expect("no errors")
1018            .expect("log should exist");
1019
1020        assert_eq!(log[0].1, None);
1021        assert_eq!(log[1].1, None);
1022        assert_eq!(log[2].1, Some(body_2));
1023    }
1024}