p2panda_store/
memory.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! In-memory persistence for p2panda operations and logs.
4use std::collections::{BTreeSet, HashMap};
5use std::convert::Infallible;
6use std::fmt::Debug;
7use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
8
9use p2panda_core::{Body, Extensions, Hash, Header, PublicKey, RawOperation};
10
11use crate::{LogId, LogStore, OperationStore};
12
13type SeqNum = u64;
14type Timestamp = u64;
15type RawHeader = Vec<u8>;
16
17type LogMeta = (SeqNum, Timestamp, Hash);
18type StoredOperation<L, E> = (L, Header<E>, Option<Body>, RawHeader);
19
20/// An in-memory store for core p2panda data types: `Operation` and `Log`.
21#[derive(Clone, Debug)]
22pub struct InnerMemoryStore<L, E> {
23    operations: HashMap<Hash, StoredOperation<L, E>>,
24    logs: HashMap<(PublicKey, L), BTreeSet<LogMeta>>,
25}
26
27/// An in-memory store for core p2panda data types: `Operation` and log.
28///
29/// `MemoryStore` supports usage in asynchronous and multi-threaded contexts by wrapping an
30/// `InnerMemoryStore` with an `RwLock` and `Arc`. Convenience methods are provided to obtain a
31/// read- or write-lock on the underlying store.
32#[derive(Clone, Debug)]
33pub struct MemoryStore<L, E = ()> {
34    inner: Arc<RwLock<InnerMemoryStore<L, E>>>,
35}
36
37impl<L, E> MemoryStore<L, E> {
38    /// Create a new in-memory store.
39    pub fn new() -> Self {
40        let inner = InnerMemoryStore {
41            operations: HashMap::new(),
42            logs: HashMap::new(),
43        };
44
45        Self {
46            inner: Arc::new(RwLock::new(inner)),
47        }
48    }
49}
50
51impl<T> Default for MemoryStore<T, ()> {
52    fn default() -> Self {
53        Self::new()
54    }
55}
56
57impl<T, E> MemoryStore<T, E> {
58    /// Obtain a read-lock on the store.
59    pub fn read_store(&self) -> RwLockReadGuard<InnerMemoryStore<T, E>> {
60        self.inner
61            .read()
62            .expect("acquire shared read access on store")
63    }
64
65    /// Obtain a write-lock on the store.
66    pub fn write_store(&self) -> RwLockWriteGuard<InnerMemoryStore<T, E>> {
67        self.inner
68            .write()
69            .expect("acquire exclusive write access on store")
70    }
71}
72
73impl<L, E> OperationStore<L, E> for MemoryStore<L, E>
74where
75    L: LogId + Send + Sync,
76    E: Extensions + Send + Sync,
77{
78    type Error = Infallible;
79
80    async fn insert_operation(
81        &mut self,
82        hash: Hash,
83        header: &Header<E>,
84        body: Option<&Body>,
85        header_bytes: &[u8],
86        log_id: &L,
87    ) -> Result<bool, Self::Error> {
88        let mut store = self.write_store();
89
90        let log_meta = (header.seq_num, header.timestamp, hash);
91        let insertion_occured = store
92            .logs
93            .entry((header.public_key, log_id.to_owned()))
94            .or_default()
95            .insert(log_meta);
96
97        if insertion_occured {
98            let entry = (
99                log_id.to_owned(),
100                header.to_owned(),
101                body.cloned(),
102                header_bytes.to_vec(),
103            );
104            store.operations.insert(hash, entry);
105        }
106
107        Ok(insertion_occured)
108    }
109
110    async fn get_operation(
111        &self,
112        hash: Hash,
113    ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
114        match self.read_store().operations.get(&hash) {
115            Some((_, header, body, _)) => Ok(Some((header.clone(), body.clone()))),
116            None => Ok(None),
117        }
118    }
119
120    async fn get_raw_operation(&self, hash: Hash) -> Result<Option<RawOperation>, Self::Error> {
121        match self.read_store().operations.get(&hash) {
122            Some((_, _, body, header_bytes)) => Ok(Some((
123                header_bytes.clone(),
124                body.as_ref().map(|body| body.to_bytes()),
125            ))),
126            None => Ok(None),
127        }
128    }
129
130    async fn has_operation(&self, hash: Hash) -> Result<bool, Self::Error> {
131        Ok(self.read_store().operations.contains_key(&hash))
132    }
133
134    async fn delete_operation(&mut self, hash: Hash) -> Result<bool, Self::Error> {
135        let mut store = self.write_store();
136        let Some((_, header, _, _)) = store.operations.remove(&hash) else {
137            return Ok(false);
138        };
139        store.logs = store
140            .logs
141            .clone()
142            .into_iter()
143            .filter_map(|(key, mut log)| {
144                log.remove(&(header.seq_num, header.timestamp, hash));
145                if log.is_empty() {
146                    None
147                } else {
148                    Some((key, log))
149                }
150            })
151            .collect();
152
153        Ok(true)
154    }
155
156    async fn delete_payload(&mut self, hash: Hash) -> Result<bool, Self::Error> {
157        if let Some(operation) = self.write_store().operations.get_mut(&hash) {
158            operation.2 = None;
159            Ok(true)
160        } else {
161            Ok(false)
162        }
163    }
164}
165
166impl<L, E> LogStore<L, E> for MemoryStore<L, E>
167where
168    L: LogId + Send + Sync,
169    E: Extensions + Send + Sync,
170{
171    type Error = Infallible;
172
173    async fn get_log(
174        &self,
175        public_key: &PublicKey,
176        log_id: &L,
177        from: Option<u64>,
178    ) -> Result<Option<Vec<(Header<E>, Option<Body>)>>, Self::Error> {
179        let store = self.read_store();
180        match store.logs.get(&(*public_key, log_id.to_owned())) {
181            Some(log) => {
182                let mut result = Vec::new();
183                if let Some(from) = from {
184                    log.iter().for_each(|(seq_num, _, hash)| {
185                        if *seq_num >= from {
186                            let (_, header, body, _) =
187                                store.operations.get(hash).expect("exists in hash map");
188                            result.push((header.to_owned(), body.to_owned()));
189                        }
190                    });
191                } else {
192                    log.iter().for_each(|(_, _, hash)| {
193                        let (_, header, body, _) =
194                            store.operations.get(hash).expect("exists in hash map");
195                        result.push((header.to_owned(), body.to_owned()));
196                    });
197                }
198                Ok(Some(result))
199            }
200            None => Ok(None),
201        }
202    }
203
204    async fn get_raw_log(
205        &self,
206        public_key: &PublicKey,
207        log_id: &L,
208        from: Option<u64>,
209    ) -> Result<Option<Vec<RawOperation>>, Self::Error> {
210        let store = self.read_store();
211        match store.logs.get(&(*public_key, log_id.to_owned())) {
212            Some(log) => {
213                let mut result = Vec::new();
214                if let Some(from) = from {
215                    log.iter().for_each(|(seq_num, _, hash)| {
216                        if *seq_num >= from {
217                            let (_, _, body, header_bytes) =
218                                store.operations.get(hash).expect("exists in hash map");
219                            result.push((
220                                header_bytes.clone(),
221                                body.as_ref().map(|body| body.to_bytes()),
222                            ));
223                        }
224                    });
225                } else {
226                    log.iter().for_each(|(_, _, hash)| {
227                        let (_, _, body, header_bytes) =
228                            store.operations.get(hash).expect("exists in hash map");
229                        result.push((
230                            header_bytes.clone(),
231                            body.as_ref().map(|body| body.to_bytes()),
232                        ));
233                    });
234                }
235                Ok(Some(result))
236            }
237            None => Ok(None),
238        }
239    }
240
241    async fn latest_operation(
242        &self,
243        public_key: &PublicKey,
244        log_id: &L,
245    ) -> Result<Option<(Header<E>, Option<Body>)>, Self::Error> {
246        let store = self.read_store();
247
248        let Some(log) = store.logs.get(&(*public_key, log_id.to_owned())) else {
249            return Ok(None);
250        };
251
252        let Some((_, _, hash)) = log.last() else {
253            return Ok(None);
254        };
255
256        let Some((_, header, body, _)) = store.operations.get(hash) else {
257            return Ok(None);
258        };
259
260        Ok(Some((header.to_owned(), body.to_owned())))
261    }
262
263    async fn delete_operations(
264        &mut self,
265        public_key: &PublicKey,
266        log_id: &L,
267        before: u64,
268    ) -> Result<bool, Self::Error> {
269        let mut deleted = vec![];
270        let mut store = self.write_store();
271        if let Some(log) = store.logs.get_mut(&(*public_key, log_id.to_owned())) {
272            log.retain(|(seq_num, _, hash)| {
273                let remove = *seq_num < before;
274                if remove {
275                    deleted.push(*hash);
276                };
277                !remove
278            });
279        };
280        store.operations.retain(|hash, _| !deleted.contains(hash));
281        Ok(!deleted.is_empty())
282    }
283
284    async fn delete_payloads(
285        &mut self,
286        public_key: &PublicKey,
287        log_id: &L,
288        from: u64,
289        to: u64,
290    ) -> Result<bool, Self::Error> {
291        let mut deleted = vec![];
292        {
293            let store = self.read_store();
294            if let Some(log) = store.logs.get(&(*public_key, log_id.to_owned())) {
295                log.iter().for_each(|(seq_num, _, hash)| {
296                    if *seq_num >= from && *seq_num < to {
297                        deleted.push(*hash)
298                    };
299                });
300            };
301        }
302        let mut store = self.write_store();
303        for hash in &deleted {
304            let operation = store
305                .operations
306                .get_mut(hash)
307                .expect("operation exists in store");
308            operation.2 = None;
309        }
310        Ok(!deleted.is_empty())
311    }
312
313    async fn get_log_heights(&self, log_id: &L) -> Result<Vec<(PublicKey, SeqNum)>, Self::Error> {
314        let log_heights = self
315            .read_store()
316            .logs
317            .iter()
318            .filter_map(|((public_key, inner_log_id), log)| {
319                if inner_log_id == log_id {
320                    let log_height = log
321                        .last()
322                        .expect("all logs contain at least one operation")
323                        .0;
324                    Some((*public_key, log_height))
325                } else {
326                    None
327                }
328            })
329            .collect();
330        Ok(log_heights)
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use p2panda_core::{Body, Hash, Header, PrivateKey};
337    use serde::{Deserialize, Serialize};
338
339    use crate::{LogStore, OperationStore};
340
341    use super::MemoryStore;
342
343    fn create_operation(
344        private_key: &PrivateKey,
345        body: &Body,
346        seq_num: u64,
347        timestamp: u64,
348        backlink: Option<Hash>,
349    ) -> (Hash, Header<()>, Vec<u8>) {
350        let mut header = Header {
351            version: 1,
352            public_key: private_key.public_key(),
353            signature: None,
354            payload_size: body.size(),
355            payload_hash: Some(body.hash()),
356            timestamp,
357            seq_num,
358            backlink,
359            previous: vec![],
360            extensions: None,
361        };
362        header.sign(private_key);
363        let header_bytes = header.to_bytes();
364        (header.hash(), header, header_bytes)
365    }
366
367    #[tokio::test]
368    async fn default_memory_store() {
369        let mut store = MemoryStore::default();
370        let private_key = PrivateKey::new();
371        let body = Body::new("hello!".as_bytes());
372
373        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
374        let inserted = store
375            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
376            .await
377            .expect("no errors");
378        assert!(inserted);
379    }
380
381    #[tokio::test]
382    async fn generic_extensions_mem_store() {
383        // Define our own custom extension type.
384        #[derive(Clone, Debug, Default, Serialize, Deserialize)]
385        struct MyExtension {}
386
387        // Construct a new store.
388        let mut store = MemoryStore::new();
389
390        // Construct an operation using the custom extension.
391        let private_key = PrivateKey::new();
392        let body = Body::new("hello!".as_bytes());
393        let mut header = Header {
394            version: 1,
395            public_key: private_key.public_key(),
396            signature: None,
397            payload_size: body.size(),
398            payload_hash: Some(body.hash()),
399            timestamp: 0,
400            seq_num: 0,
401            backlink: None,
402            previous: vec![],
403            extensions: Some(MyExtension {}),
404        };
405        header.sign(&private_key);
406
407        // Insert the operation into the store, the extension type is inferred.
408        let inserted = store
409            .insert_operation(header.hash(), &header, Some(&body), &header.to_bytes(), &0)
410            .await
411            .expect("no errors");
412        assert!(inserted);
413    }
414
415    #[tokio::test]
416    async fn insert_get_operation() {
417        let mut store = MemoryStore::default();
418        let private_key = PrivateKey::new();
419        let body = Body::new("hello!".as_bytes());
420
421        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
422
423        let inserted = store
424            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
425            .await
426            .expect("no errors");
427        assert!(inserted);
428        assert!(store.has_operation(hash).await.expect("no error"));
429
430        let (header_again, body_again) = store
431            .get_operation(hash)
432            .await
433            .expect("no error")
434            .expect("operation exist");
435
436        assert_eq!(header.hash(), header_again.hash());
437        assert_eq!(Some(body.clone()), body_again);
438
439        let (header_bytes_again, body_bytes_again) = store
440            .get_raw_operation(hash)
441            .await
442            .expect("no error")
443            .expect("operation exist");
444
445        assert_eq!(header_bytes_again, header_bytes);
446        assert_eq!(body_bytes_again, Some(body.to_bytes()));
447    }
448
449    #[tokio::test]
450    async fn delete_operation() {
451        let mut store: MemoryStore<i32> = MemoryStore::default();
452        let private_key = PrivateKey::new();
453        let body = Body::new("hello!".as_bytes());
454
455        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
456
457        // Insert one operation.
458        let inserted = store
459            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
460            .await
461            .expect("no errors");
462        assert!(inserted);
463
464        // We expect one log and one operation.
465        assert_eq!(store.read_store().logs.len(), 1);
466        assert_eq!(store.read_store().operations.len(), 1);
467
468        // Delete the operation.
469        assert!(store.delete_operation(hash).await.expect("no error"));
470
471        // We expect no logs and no operations.
472        assert_eq!(store.read_store().logs.len(), 0);
473        assert_eq!(store.read_store().operations.len(), 0);
474
475        let deleted_operation = store.get_operation(hash).await.expect("no error");
476        assert!(deleted_operation.is_none());
477        assert!(!store.has_operation(hash).await.expect("no error"));
478
479        let deleted_raw_operation = store.get_raw_operation(hash).await.expect("no error");
480        assert!(deleted_raw_operation.is_none());
481    }
482
483    #[tokio::test]
484    async fn delete_payload() {
485        let mut store = MemoryStore::default();
486        let private_key = PrivateKey::new();
487        let body = Body::new("hello!".as_bytes());
488
489        let (hash, header, header_bytes) = create_operation(&private_key, &body, 0, 0, None);
490
491        let inserted = store
492            .insert_operation(hash, &header, Some(&body), &header_bytes, &0)
493            .await
494            .expect("no errors");
495        assert!(inserted);
496
497        assert!(store.delete_payload(hash).await.expect("no error"));
498
499        let (_, no_body) = store
500            .get_operation(hash)
501            .await
502            .expect("no error")
503            .expect("operation exist");
504        assert!(no_body.is_none());
505        assert!(store.has_operation(hash).await.expect("no error"));
506
507        let (_, no_body) = store
508            .get_raw_operation(hash)
509            .await
510            .expect("no error")
511            .expect("operation exist");
512        assert!(no_body.is_none());
513    }
514
515    #[tokio::test]
516    async fn get_log() {
517        let mut store = MemoryStore::default();
518        let private_key = PrivateKey::new();
519        let log_id = 0;
520
521        let body_0 = Body::new("hello!".as_bytes());
522        let body_1 = Body::new("hello again!".as_bytes());
523        let body_2 = Body::new("hello for a third time!".as_bytes());
524
525        let (hash_0, header_0, header_bytes_0) =
526            create_operation(&private_key, &body_0, 0, 0, None);
527        let (hash_1, header_1, header_bytes_1) =
528            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
529        let (hash_2, header_2, header_bytes_2) =
530            create_operation(&private_key, &body_2, 2, 0, Some(hash_1));
531
532        store
533            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &0)
534            .await
535            .expect("no errors");
536        store
537            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &0)
538            .await
539            .expect("no errors");
540        store
541            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &0)
542            .await
543            .expect("no errors");
544
545        // Get all log operations.
546        let log = store
547            .get_log(&private_key.public_key(), &log_id, None)
548            .await
549            .expect("no errors")
550            .expect("log should exist");
551
552        assert_eq!(log.len(), 3);
553        assert_eq!(log[0].0.hash(), hash_0);
554        assert_eq!(log[1].0.hash(), hash_1);
555        assert_eq!(log[2].0.hash(), hash_2);
556        assert_eq!(log[0].1, Some(body_0.clone()));
557        assert_eq!(log[1].1, Some(body_1.clone()));
558        assert_eq!(log[2].1, Some(body_2.clone()));
559
560        // Get all log operations starting from sequence number 1.
561        let log = store
562            .get_log(&private_key.public_key(), &log_id, Some(1))
563            .await
564            .expect("no errors")
565            .expect("log should exist");
566
567        assert_eq!(log.len(), 2);
568        assert_eq!(log[0].0.hash(), hash_1);
569        assert_eq!(log[1].0.hash(), hash_2);
570        assert_eq!(log[0].1, Some(body_1.clone()));
571        assert_eq!(log[1].1, Some(body_2.clone()));
572
573        // Get all raw log operations.
574        let log = store
575            .get_raw_log(&private_key.public_key(), &log_id, None)
576            .await
577            .expect("no errors")
578            .expect("log should exist");
579
580        assert_eq!(log.len(), 3);
581        assert_eq!(log[0].0, header_bytes_0);
582        assert_eq!(log[1].0, header_bytes_1);
583        assert_eq!(log[2].0, header_bytes_2);
584        assert_eq!(log[0].1, Some(body_0.to_bytes()));
585        assert_eq!(log[1].1, Some(body_1.to_bytes()));
586        assert_eq!(log[2].1, Some(body_2.to_bytes()));
587
588        // Get all raw log operations starting from sequence number 1.
589        let log = store
590            .get_raw_log(&private_key.public_key(), &log_id, Some(1))
591            .await
592            .expect("no errors")
593            .expect("log should exist");
594
595        assert_eq!(log.len(), 2);
596        assert_eq!(log[0].0, header_bytes_1);
597        assert_eq!(log[1].0, header_bytes_2);
598        assert_eq!(log[0].1, Some(body_1.to_bytes()));
599        assert_eq!(log[1].1, Some(body_2.to_bytes()));
600    }
601
602    #[tokio::test]
603    async fn insert_many_get_one_log() {
604        let mut store = MemoryStore::default();
605        let private_key = PrivateKey::new();
606        let log_a_id = "a";
607        let log_b_id = "b";
608
609        let body_a0 = Body::new("hello from log a!".as_bytes());
610        let body_a1 = Body::new("hello from log a again!".as_bytes());
611        let (hash_a0, header_a0, header_bytes_a0) =
612            create_operation(&private_key, &body_a0, 0, 0, None);
613        let (hash_a1, header_a1, header_bytes_a1) =
614            create_operation(&private_key, &body_a1, 1, 1, Some(hash_a0));
615
616        let inserted = store
617            .insert_operation(
618                hash_a0,
619                &header_a0,
620                Some(&body_a0),
621                &header_bytes_a0,
622                &log_a_id,
623            )
624            .await
625            .expect("no errors");
626        assert!(inserted);
627
628        let inserted = store
629            .insert_operation(
630                hash_a1,
631                &header_a1,
632                Some(&body_a1),
633                &header_bytes_a1,
634                &log_a_id,
635            )
636            .await
637            .expect("no errors");
638        assert!(inserted);
639
640        let body_b0 = Body::new("hello from log b!".as_bytes());
641        let body_b1 = Body::new("hello from log b again!".as_bytes());
642        let (hash_b0, header_b0, header_bytes_b0) =
643            create_operation(&private_key, &body_b0, 0, 3, None);
644        let (hash_b1, header_b1, header_bytes_b1) =
645            create_operation(&private_key, &body_b1, 1, 4, Some(hash_b0));
646
647        store
648            .insert_operation(
649                hash_b0,
650                &header_b0,
651                Some(&body_b0),
652                &header_bytes_b0,
653                &log_b_id,
654            )
655            .await
656            .expect("no errors");
657
658        store
659            .insert_operation(
660                hash_b1,
661                &header_b1,
662                Some(&body_b1),
663                &header_bytes_b1,
664                &log_b_id,
665            )
666            .await
667            .expect("no errors");
668
669        let log_a = store
670            .get_log(&private_key.public_key(), &log_a_id, None)
671            .await
672            .expect("no errors")
673            .expect("log should exist");
674
675        assert_eq!(log_a.len(), 2);
676        assert_eq!(log_a[0].0.hash(), header_a0.hash());
677        assert_eq!(log_a[1].0.hash(), header_a1.hash());
678
679        let log_b = store
680            .get_log(&private_key.public_key(), &log_b_id, None)
681            .await
682            .expect("no errors")
683            .expect("log should exist");
684
685        assert_eq!(log_b.len(), 2);
686        assert_eq!(log_b[0].0.hash(), header_b0.hash());
687        assert_eq!(log_b[1].0.hash(), header_b1.hash());
688    }
689
690    #[tokio::test]
691    async fn many_authors_same_log_id() {
692        let mut store = MemoryStore::default();
693        let private_key_a = PrivateKey::new();
694        let private_key_b = PrivateKey::new();
695        let log_id = 0;
696        let body = Body::new("hello!".as_bytes());
697
698        let (hash_a, header_a, header_bytes_a) =
699            create_operation(&private_key_a, &body, 0, 0, None);
700        let inserted = store
701            .insert_operation(hash_a, &header_a, Some(&body), &header_bytes_a, &log_id)
702            .await
703            .expect("no errors");
704        assert!(inserted);
705
706        let (hash_b, header_b, header_bytes_b) =
707            create_operation(&private_key_b, &body, 0, 0, None);
708        let inserted = store
709            .insert_operation(hash_b, &header_b, Some(&body), &header_bytes_b, &log_id)
710            .await
711            .expect("no errors");
712        assert!(inserted);
713
714        let author_a_log = store
715            .get_log(&private_key_a.public_key(), &log_id, None)
716            .await
717            .expect("no errors")
718            .expect("log should exist");
719
720        assert_eq!(author_a_log.len(), 1);
721        assert_eq!(author_a_log[0].0.hash(), header_a.hash());
722
723        let author_b_log = store
724            .get_log(&private_key_b.public_key(), &log_id, None)
725            .await
726            .expect("no errors")
727            .expect("log should exist");
728
729        assert_eq!(author_b_log.len(), 1);
730        assert_eq!(author_b_log[0].0.hash(), header_b.hash());
731    }
732
733    #[tokio::test]
734    async fn get_latest_operation() {
735        let mut store = MemoryStore::default();
736        let private_key = PrivateKey::new();
737        let log_id = 0;
738
739        let body_0 = Body::new("hello!".as_bytes());
740        let body_1 = Body::new("hello again!".as_bytes());
741
742        let (hash_0, header_0, header_bytes_0) =
743            create_operation(&private_key, &body_0, 0, 0, None);
744        let (hash_1, header_1, header_bytes_1) =
745            create_operation(&private_key, &body_1, 1, 0, Some(hash_0));
746
747        store
748            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
749            .await
750            .expect("no errors");
751        store
752            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
753            .await
754            .expect("no errors");
755
756        let (latest_header, latest_body) = store
757            .latest_operation(&private_key.public_key(), &log_id)
758            .await
759            .expect("no errors")
760            .expect("there's an operation");
761
762        assert_eq!(latest_header.hash(), header_1.hash());
763        assert_eq!(latest_body, Some(body_1));
764    }
765
766    #[tokio::test]
767    async fn delete_operations() {
768        let mut store = MemoryStore::default();
769        let private_key = PrivateKey::new();
770        let log_id = 0;
771
772        let body_0 = Body::new("hello!".as_bytes());
773        let body_1 = Body::new("hello again!".as_bytes());
774        let body_2 = Body::new("final hello!".as_bytes());
775
776        let (hash_0, header_0, header_bytes_0) =
777            create_operation(&private_key, &body_0, 0, 0, None);
778        let (hash_1, header_1, header_bytes_1) =
779            create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
780        let (hash_2, header_2, header_bytes_2) =
781            create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
782
783        store
784            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
785            .await
786            .expect("no errors");
787        store
788            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
789            .await
790            .expect("no errors");
791        store
792            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
793            .await
794            .expect("no errors");
795
796        // We expect one log and 3 operations.
797        assert_eq!(store.read_store().logs.len(), 1);
798        assert_eq!(store.read_store().operations.len(), 3);
799
800        // Delete all operations _before_ seq_num 2.
801        let deleted = store
802            .delete_operations(&private_key.public_key(), &log_id, 2)
803            .await
804            .expect("no errors");
805        assert!(deleted);
806
807        // There is now only one operation in the log.
808        assert_eq!(store.read_store().logs.len(), 1);
809        assert_eq!(store.read_store().operations.len(), 1);
810
811        // The remaining operation in the log should be the latest (seq_num == 2).
812        let log = store
813            .get_log(&private_key.public_key(), &log_id, None)
814            .await
815            .expect("no errors")
816            .expect("log should exist");
817        assert_eq!(log[0].0.hash(), header_2.hash());
818
819        // Deleting the same range again should return `false`, meaning no deletion occurred.
820        let deleted = store
821            .delete_operations(&private_key.public_key(), &log_id, 2)
822            .await
823            .expect("no errors");
824        assert!(!deleted);
825    }
826
827    #[tokio::test]
828    async fn delete_payloads() {
829        let mut store = MemoryStore::default();
830        let private_key = PrivateKey::new();
831        let log_id = 0;
832
833        let body_0 = Body::new("hello!".as_bytes());
834        let body_1 = Body::new("hello again!".as_bytes());
835        let body_2 = Body::new("final hello!".as_bytes());
836
837        let (hash_0, header_0, header_bytes_0) =
838            create_operation(&private_key, &body_0, 0, 0, None);
839        let (hash_1, header_1, header_bytes_1) =
840            create_operation(&private_key, &body_1, 1, 100, Some(hash_0));
841        let (hash_2, header_2, header_bytes_2) =
842            create_operation(&private_key, &body_2, 2, 200, Some(hash_1));
843
844        store
845            .insert_operation(hash_0, &header_0, Some(&body_0), &header_bytes_0, &log_id)
846            .await
847            .expect("no errors");
848        store
849            .insert_operation(hash_1, &header_1, Some(&body_1), &header_bytes_1, &log_id)
850            .await
851            .expect("no errors");
852        store
853            .insert_operation(hash_2, &header_2, Some(&body_2), &header_bytes_2, &log_id)
854            .await
855            .expect("no errors");
856
857        // There is one log and 3 operations.
858        assert_eq!(store.read_store().logs.len(), 1);
859        assert_eq!(store.read_store().operations.len(), 3);
860
861        // Delete all operation payloads from sequence number 0 up to but not including 2.
862        let deleted = store
863            .delete_payloads(&private_key.public_key(), &log_id, 0, 2)
864            .await
865            .expect("no errors");
866        assert!(deleted);
867
868        let log = store
869            .get_log(&private_key.public_key(), &log_id, None)
870            .await
871            .expect("no errors")
872            .expect("log should exist");
873
874        assert_eq!(log[0].1, None);
875        assert_eq!(log[1].1, None);
876        assert_eq!(log[2].1, Some(body_2));
877    }
878}