1use crate::helpers::{check_timestamp_for_liveness, fmt_id};
17use snarkos_node_bft_ledger_service::LedgerService;
18use snarkos_node_bft_storage_service::StorageService;
19use snarkvm::{
20 ledger::{
21 block::{Block, Transaction},
22 narwhal::{BatchCertificate, BatchHeader, Transmission, TransmissionID},
23 },
24 prelude::{Address, Field, Network, Result, anyhow, bail, ensure},
25 utilities::{cfg_into_iter, cfg_iter, cfg_sorted_by},
26};
27
28use anyhow::Context;
29use indexmap::{IndexMap, IndexSet, map::Entry};
30#[cfg(feature = "locktick")]
31use locktick::parking_lot::RwLock;
32use lru::LruCache;
33#[cfg(not(feature = "locktick"))]
34use parking_lot::RwLock;
35#[cfg(not(feature = "serial"))]
36use rayon::prelude::*;
37use std::{
38 collections::{HashMap, HashSet},
39 num::NonZeroUsize,
40 sync::{
41 Arc,
42 atomic::{AtomicU32, AtomicU64, Ordering},
43 },
44};
45
46#[derive(Clone, Debug)]
47pub struct Storage<N: Network>(Arc<StorageInner<N>>);
48
49impl<N: Network> std::ops::Deref for Storage<N> {
50 type Target = Arc<StorageInner<N>>;
51
52 fn deref(&self) -> &Self::Target {
53 &self.0
54 }
55}
56
57#[derive(Debug)]
77pub struct StorageInner<N: Network> {
78 ledger: Arc<dyn LedgerService<N>>,
80 current_height: AtomicU32,
83 current_round: AtomicU64,
92 gc_round: AtomicU64,
94 max_gc_rounds: u64,
96 rounds: RwLock<IndexMap<u64, IndexSet<(Field<N>, Address<N>)>>>,
99 unprocessed_certificates: RwLock<LruCache<Field<N>, BatchCertificate<N>>>,
101 certificates: RwLock<IndexMap<Field<N>, BatchCertificate<N>>>,
103 batch_ids: RwLock<IndexMap<Field<N>, u64>>,
105 transmissions: Arc<dyn StorageService<N>>,
107}
108
109impl<N: Network> Storage<N> {
110 pub fn new(
112 ledger: Arc<dyn LedgerService<N>>,
113 transmissions: Arc<dyn StorageService<N>>,
114 max_gc_rounds: u64,
115 ) -> Result<Self> {
116 let committee = ledger.current_committee().expect("Ledger is missing a committee.");
119 let current_round = committee.starting_round().max(1);
121 let unprocessed_cache_size = NonZeroUsize::new((N::LATEST_MAX_CERTIFICATES() * 2) as usize).unwrap();
123
124 let storage = Self(Arc::new(StorageInner {
126 ledger,
127 current_height: Default::default(),
128 current_round: AtomicU64::new(current_round),
129 gc_round: Default::default(),
130 max_gc_rounds,
131 rounds: Default::default(),
132 unprocessed_certificates: RwLock::new(LruCache::new(unprocessed_cache_size)),
133 certificates: Default::default(),
134 batch_ids: Default::default(),
135 transmissions,
136 }));
137
138 storage.garbage_collect_certificates(current_round)?;
141
142 Ok(storage)
144 }
145}
146
147impl<N: Network> Storage<N> {
148 pub fn current_height(&self) -> u32 {
150 self.current_height.load(Ordering::SeqCst)
152 }
153}
154
155impl<N: Network> Storage<N> {
156 pub fn current_round(&self) -> u64 {
158 self.current_round.load(Ordering::SeqCst)
160 }
161
162 pub fn gc_round(&self) -> u64 {
167 self.gc_round.load(Ordering::SeqCst)
169 }
170
171 pub fn max_gc_rounds(&self) -> u64 {
173 self.max_gc_rounds
174 }
175
176 pub fn increment_to_next_round(&self, current_round: u64) -> Result<u64> {
179 let next_round = current_round + 1;
181
182 {
184 let storage_round = self.current_round();
186 if next_round < storage_round {
188 return Ok(storage_round);
189 }
190
191 trace!("Incrementing storage from round {storage_round} to {next_round}");
192 }
193
194 let current_committee = self.ledger.current_committee()?;
196 let starting_round = current_committee.starting_round();
198 if next_round < starting_round {
200 let latest_block_round = self.ledger.latest_round();
202 info!(
204 "Syncing primary round ({next_round}) with the current committee's starting round ({starting_round}). Syncing with the latest block round {latest_block_round}..."
205 );
206 self.sync_round_with_block(latest_block_round);
208 return Ok(latest_block_round);
210 }
211
212 self.update_current_round(next_round);
214
215 #[cfg(feature = "metrics")]
216 metrics::gauge(metrics::bft::LAST_STORED_ROUND, next_round as f64);
217
218 let storage_round = self.current_round();
220 let gc_round = self.gc_round();
222 ensure!(next_round == storage_round, "The next round {next_round} does not match in storage ({storage_round})");
224 ensure!(next_round >= gc_round, "The next round {next_round} is behind the GC round {gc_round}");
226
227 info!("Starting round {next_round}...");
229 Ok(next_round)
230 }
231
232 fn update_current_round(&self, next_round: u64) {
234 self.current_round.store(next_round, Ordering::SeqCst);
236 }
237
238 pub(crate) fn garbage_collect_certificates(&self, next_round: u64) -> Result<()> {
240 let current_gc_round = self.gc_round();
242 let next_gc_round = next_round.saturating_sub(self.max_gc_rounds);
244 if next_gc_round > current_gc_round {
246 if self
247 .gc_round
248 .compare_exchange(current_gc_round, next_gc_round, Ordering::SeqCst, Ordering::SeqCst)
249 .is_err()
250 {
251 bail!("Concurrent updates to GC round detected.");
252 }
253
254 for gc_round in current_gc_round..=next_gc_round {
256 for id in self.get_certificate_ids_for_round(gc_round).into_iter() {
258 trace!(
259 "Garbage collecting certificate {id} at round {gc_round} (cut-off is round {next_gc_round})"
260 );
261 self.remove_certificate(id);
262 }
263 }
264 self.gc_round.store(next_gc_round, Ordering::SeqCst);
266 } else if next_gc_round < current_gc_round {
267 bail!("Attempted to decrease GC round from {current_gc_round} to {next_gc_round}");
268 }
269
270 Ok(())
271 }
272}
273
274impl<N: Network> Storage<N> {
275 pub fn contains_certificates_for_round(&self, round: u64) -> bool {
277 self.rounds.read().contains_key(&round)
279 }
280
281 pub fn contains_certificate(&self, certificate_id: Field<N>) -> bool {
283 self.certificates.read().contains_key(&certificate_id)
285 }
286
287 pub fn contains_certificate_in_round_from(&self, round: u64, author: Address<N>) -> bool {
289 self.rounds.read().get(&round).is_some_and(|set| set.iter().any(|(_, a)| a == &author))
290 }
291
292 pub fn contains_unprocessed_certificate(&self, certificate_id: Field<N>) -> bool {
294 self.unprocessed_certificates.read().contains(&certificate_id)
296 }
297
298 pub fn contains_batch(&self, batch_id: Field<N>) -> bool {
300 self.batch_ids.read().contains_key(&batch_id)
302 }
303
304 pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
306 self.transmissions.contains_transmission(transmission_id.into())
307 }
308
309 pub fn get_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> Option<Transmission<N>> {
312 self.transmissions.get_transmission(transmission_id.into())
313 }
314
315 pub fn get_round_for_certificate(&self, certificate_id: Field<N>) -> Option<u64> {
318 self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
320 }
321
322 pub fn get_round_for_batch(&self, batch_id: Field<N>) -> Option<u64> {
325 self.batch_ids.read().get(&batch_id).copied()
327 }
328
329 pub fn get_certificate_round(&self, certificate_id: Field<N>) -> Option<u64> {
332 self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
334 }
335
336 pub fn get_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
339 self.certificates.read().get(&certificate_id).cloned()
341 }
342
343 pub fn get_unprocessed_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
346 self.unprocessed_certificates.read().peek(&certificate_id).cloned()
348 }
349
350 pub fn get_certificate_for_round_with_author(&self, round: u64, author: Address<N>) -> Option<BatchCertificate<N>> {
354 if let Some(entries) = self.rounds.read().get(&round) {
356 let certificates = self.certificates.read();
357 entries.iter().find_map(
358 |(certificate_id, a)| if a == &author { certificates.get(certificate_id).cloned() } else { None },
359 )
360 } else {
361 Default::default()
362 }
363 }
364
365 pub fn get_certificates_for_round(&self, round: u64) -> IndexSet<BatchCertificate<N>> {
368 if round == 0 {
370 return Default::default();
371 }
372 if let Some(entries) = self.rounds.read().get(&round) {
374 let certificates = self.certificates.read();
375 entries.iter().flat_map(|(certificate_id, _)| certificates.get(certificate_id).cloned()).collect()
376 } else {
377 Default::default()
378 }
379 }
380
381 pub fn get_certificate_ids_for_round(&self, round: u64) -> IndexSet<Field<N>> {
384 if round == 0 {
386 return Default::default();
387 }
388 if let Some(entries) = self.rounds.read().get(&round) {
390 entries.iter().map(|(certificate_id, _)| *certificate_id).collect()
391 } else {
392 Default::default()
393 }
394 }
395
396 pub fn get_certificate_authors_for_round(&self, round: u64) -> HashSet<Address<N>> {
399 if round == 0 {
401 return Default::default();
402 }
403 if let Some(entries) = self.rounds.read().get(&round) {
405 entries.iter().map(|(_, author)| *author).collect()
406 } else {
407 Default::default()
408 }
409 }
410
411 pub(crate) fn get_pending_certificates(&self) -> IndexSet<BatchCertificate<N>> {
414 let rounds = self.rounds.read();
416 let certificates = self.certificates.read();
417
418 cfg_sorted_by!(rounds.clone(), |a, _, b, _| a.cmp(b))
420 .flat_map(|(_, certificates_for_round)| {
421 cfg_into_iter!(certificates_for_round).filter_map(|(certificate_id, _)| {
423 if self.ledger.contains_certificate(&certificate_id).unwrap_or(false) {
425 None
426 } else {
427 certificates.get(&certificate_id).cloned()
429 }
430 })
431 })
432 .collect()
433 }
434
435 pub fn check_batch_header(
459 &self,
460 batch_header: &BatchHeader<N>,
461 transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
462 aborted_transmissions: HashSet<TransmissionID<N>>,
463 ) -> Result<Option<HashMap<TransmissionID<N>, Transmission<N>>>> {
464 let round = batch_header.round();
466 let gc_round = self.gc_round();
468 let gc_log = format!("(gc = {gc_round})");
470
471 if self.contains_batch(batch_header.batch_id()) {
473 debug!("Batch for round {round} already exists in storage {gc_log}");
474 return Ok(None);
475 }
476
477 let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
479 bail!("Storage failed to retrieve the committee lookback for round {round} {gc_log}")
480 };
481 if !committee_lookback.is_committee_member(batch_header.author()) {
483 bail!("Author {} is not in the committee for round {round} {gc_log}", batch_header.author())
484 }
485
486 check_timestamp_for_liveness(batch_header.timestamp())?;
488
489 let missing_transmissions = self
491 .transmissions
492 .find_missing_transmissions(batch_header, transmissions, aborted_transmissions)
493 .map_err(|e| anyhow!("{e} for round {round} {gc_log}"))?;
494
495 let previous_round = round.saturating_sub(1);
497 if previous_round > gc_round {
499 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
501 bail!("Missing committee for the previous round {previous_round} in storage {gc_log}")
502 };
503 if !self.contains_certificates_for_round(previous_round) {
505 bail!("Missing certificates for the previous round {previous_round} in storage {gc_log}")
506 }
507 if batch_header.previous_certificate_ids().len() > previous_committee_lookback.num_members() {
509 bail!("Too many previous certificates for round {round} {gc_log}")
510 }
511 let mut previous_authors = HashSet::with_capacity(batch_header.previous_certificate_ids().len());
513 for previous_certificate_id in batch_header.previous_certificate_ids() {
515 let Some(previous_certificate) = self.get_certificate(*previous_certificate_id) else {
517 bail!(
518 "Missing previous certificate '{}' for certificate in round {round} {gc_log}",
519 fmt_id(previous_certificate_id)
520 )
521 };
522 if previous_certificate.round() != previous_round {
524 bail!("Round {round} certificate contains a round {previous_round} certificate {gc_log}")
525 }
526 if previous_authors.contains(&previous_certificate.author()) {
528 bail!("Round {round} certificate contains a duplicate author {gc_log}")
529 }
530 previous_authors.insert(previous_certificate.author());
532 }
533 if !previous_committee_lookback.is_quorum_threshold_reached(&previous_authors) {
535 bail!("Previous certificates for a batch in round {round} did not reach quorum threshold {gc_log}")
536 }
537 }
538
539 Ok(Some(missing_transmissions))
540 }
541
542 pub fn check_incoming_certificate(&self, certificate: &BatchCertificate<N>) -> Result<()> {
554 let certificate_author = certificate.author();
556 let certificate_round = certificate.round();
557
558 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
560
561 let mut signers: HashSet<Address<N>> =
564 certificate.signatures().map(|signature| signature.to_address()).collect();
565 signers.insert(certificate_author);
566 ensure!(
567 committee_lookback.is_quorum_threshold_reached(&signers),
568 "Certificate '{}' for round {certificate_round} does not meet quorum requirements",
569 certificate.id()
570 );
571
572 cfg_iter!(&signers).try_for_each(|signer| {
574 ensure!(
575 committee_lookback.is_committee_member(*signer),
576 "Signer '{signer}' of certificate '{}' for round {certificate_round} is not in the committee",
577 certificate.id()
578 );
579 Ok(())
580 })?;
581
582 Ok(())
583 }
584
585 pub fn check_certificate(
607 &self,
608 certificate: &BatchCertificate<N>,
609 transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
610 aborted_transmissions: HashSet<TransmissionID<N>>,
611 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
612 let round = certificate.round();
614 let gc_round = self.gc_round();
616 let gc_log = format!("(gc = {gc_round})");
618
619 if self.contains_certificate(certificate.id()) {
621 bail!("Certificate for round {round} already exists in storage {gc_log}")
622 }
623
624 if self.contains_certificate_in_round_from(round, certificate.author()) {
626 bail!("Certificate with this author for round {round} already exists in storage {gc_log}")
627 }
628
629 let Some(missing_transmissions) =
631 self.check_batch_header(certificate.batch_header(), transmissions, aborted_transmissions)?
632 else {
633 bail!("Certificate for round {round} already exists in storage {gc_log}")
634 };
635
636 check_timestamp_for_liveness(certificate.timestamp())?;
638
639 let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
641 bail!("Storage failed to retrieve the committee for round {round} {gc_log}")
642 };
643
644 let mut signers = HashSet::with_capacity(certificate.signatures().len() + 1);
646 signers.insert(certificate.author());
648
649 for signature in certificate.signatures() {
651 let signer = signature.to_address();
653 if !committee_lookback.is_committee_member(signer) {
655 bail!("Signer {signer} is not in the committee for round {round} {gc_log}")
656 }
657 signers.insert(signer);
659 }
660
661 if !committee_lookback.is_quorum_threshold_reached(&signers) {
663 bail!("Signatures for a batch in round {round} did not reach quorum threshold {gc_log}")
664 }
665
666 Ok(missing_transmissions)
667 }
668
669 pub fn insert_certificate(
687 &self,
688 certificate: BatchCertificate<N>,
689 transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
690 aborted_transmissions: HashSet<TransmissionID<N>>,
691 ) -> Result<()> {
692 ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
694 let missing_transmissions =
696 self.check_certificate(&certificate, transmissions, aborted_transmissions.clone())?;
697 self.insert_certificate_atomic(certificate, aborted_transmissions, missing_transmissions);
699 Ok(())
700 }
701
702 fn insert_certificate_atomic(
708 &self,
709 certificate: BatchCertificate<N>,
710 aborted_transmission_ids: HashSet<TransmissionID<N>>,
711 missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
712 ) {
713 let round = certificate.round();
715 let certificate_id = certificate.id();
717 let author = certificate.author();
719
720 self.rounds.write().entry(round).or_default().insert((certificate_id, author));
722 let transmission_ids = certificate.transmission_ids().clone();
724 self.certificates.write().insert(certificate_id, certificate);
726 self.unprocessed_certificates.write().pop(&certificate_id);
728 self.batch_ids.write().insert(certificate_id, round);
730 self.transmissions.insert_transmissions(
732 certificate_id,
733 transmission_ids,
734 aborted_transmission_ids,
735 missing_transmissions,
736 );
737 }
738
739 pub fn insert_unprocessed_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
743 ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
745 self.unprocessed_certificates.write().put(certificate.id(), certificate);
747
748 Ok(())
749 }
750
751 fn remove_certificate(&self, certificate_id: Field<N>) -> bool {
758 let Some(certificate) = self.get_certificate(certificate_id) else {
760 warn!("Certificate {certificate_id} does not exist in storage");
761 return false;
762 };
763 let round = certificate.round();
765 let author = certificate.author();
767
768 match self.rounds.write().entry(round) {
774 Entry::Occupied(mut entry) => {
775 entry.get_mut().swap_remove(&(certificate_id, author));
777 if entry.get().is_empty() {
779 entry.swap_remove();
780 }
781 }
782 Entry::Vacant(_) => {}
783 }
784 self.certificates.write().swap_remove(&certificate_id);
786 self.unprocessed_certificates.write().pop(&certificate_id);
788 self.batch_ids.write().swap_remove(&certificate_id);
790 self.transmissions.remove_transmissions(&certificate_id, certificate.transmission_ids());
792 true
794 }
795}
796
797impl<N: Network> Storage<N> {
798 pub(crate) fn sync_height_with_block(&self, next_height: u32) {
800 if next_height > self.current_height() {
802 self.current_height.store(next_height, Ordering::SeqCst);
804 }
805 }
806
807 pub(crate) fn sync_round_with_block(&self, next_round: u64) {
809 let next_round = next_round.max(1);
811 if next_round > self.current_round() {
813 self.update_current_round(next_round);
815 info!("Synced to round {next_round}...");
817 } else {
818 trace!(
819 "Skipping sync to round {next_round} as it is less than or equal to the current round ({})",
820 self.current_round()
821 );
822 }
823 }
824
825 pub(crate) fn sync_certificate_with_block(
827 &self,
828 block: &Block<N>,
829 certificate: BatchCertificate<N>,
830 unconfirmed_transactions: &HashMap<N::TransactionID, Transaction<N>>,
831 trusted_ledger_certificate: bool,
832 ) -> Result<()> {
833 let gc_round = self.gc_round();
835 if certificate.round() <= gc_round {
836 trace!("Got certificate for round {} below GC round ({gc_round}). Will not store it.", certificate.round());
837 return Ok(());
838 }
839
840 if self.contains_certificate(certificate.id()) {
842 trace!("Got certificate {} for round {} more than once.", certificate.id(), certificate.round());
843 return Ok(());
844 }
845 let mut missing_transmissions = HashMap::new();
847
848 let mut aborted_transmissions = HashSet::new();
850
851 let aborted_solutions: IndexSet<_> = block.aborted_solution_ids().iter().collect();
853 let aborted_transactions: IndexSet<_> = block.aborted_transaction_ids().iter().collect();
854
855 for transmission_id in certificate.transmission_ids() {
857 if missing_transmissions.contains_key(transmission_id) {
859 continue;
860 }
861 if self.contains_transmission(*transmission_id) {
863 continue;
864 }
865 match transmission_id {
867 TransmissionID::Ratification => (),
868 TransmissionID::Solution(solution_id, _) => {
869 if let Some(solution) = block.get_solution(solution_id) {
876 missing_transmissions.insert(*transmission_id, (*solution).into());
877 } else if let Ok(Some(solution)) = self.ledger.get_solution(solution_id) {
878 missing_transmissions.insert(*transmission_id, solution.into());
879 } else if aborted_solutions.contains(solution_id)
880 || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
881 {
882 aborted_transmissions.insert(*transmission_id);
883 } else {
884 bail!("Missing solution {solution_id} for block {}", block.height());
885 }
886 }
887 TransmissionID::Transaction(transaction_id, _) => {
888 if let Some(transaction) = unconfirmed_transactions.get(transaction_id) {
895 missing_transmissions.insert(*transmission_id, transaction.clone().into());
896 } else if let Ok(Some(transaction)) = self.ledger.get_unconfirmed_transaction(*transaction_id) {
897 missing_transmissions.insert(*transmission_id, transaction.into());
898 } else if aborted_transactions.contains(transaction_id)
899 || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
900 {
901 aborted_transmissions.insert(*transmission_id);
902 } else {
903 bail!("Missing transaction {transaction_id} for block {}", block.height());
904 }
905 }
906 }
907 }
908 let certificate_id = fmt_id(certificate.id());
910 debug!(
911 "Syncing certificate '{certificate_id}' for round {} with {} transmissions",
912 certificate.round(),
913 certificate.transmission_ids().len()
914 );
915
916 if trusted_ledger_certificate {
917 self.insert_certificate_atomic(certificate, aborted_transmissions, missing_transmissions);
918 Ok(())
919 } else {
920 self.insert_certificate(certificate, missing_transmissions, aborted_transmissions).with_context(|| {
921 format!("Failed to insert certificate '{certificate_id}' from block {}", block.height())
922 })
923 }
924 }
925}
926
927#[cfg(test)]
928impl<N: Network> Storage<N> {
929 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
931 &self.ledger
932 }
933
934 pub fn rounds_iter(&self) -> impl Iterator<Item = (u64, IndexSet<(Field<N>, Address<N>)>)> {
936 self.rounds.read().clone().into_iter()
937 }
938
939 pub fn certificates_iter(&self) -> impl Iterator<Item = (Field<N>, BatchCertificate<N>)> {
941 self.certificates.read().clone().into_iter()
942 }
943
944 pub fn batch_ids_iter(&self) -> impl Iterator<Item = (Field<N>, u64)> {
946 self.batch_ids.read().clone().into_iter()
947 }
948
949 pub fn transmissions_iter(
951 &self,
952 ) -> impl Iterator<Item = (TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>))> {
953 self.transmissions.as_hashmap().into_iter()
954 }
955
956 #[cfg(test)]
960 #[doc(hidden)]
961 pub(crate) fn testing_only_insert_certificate_testing_only(&self, certificate: BatchCertificate<N>) {
962 let round = certificate.round();
964 let certificate_id = certificate.id();
966 let author = certificate.author();
968
969 self.rounds.write().entry(round).or_default().insert((certificate_id, author));
971 let transmission_ids = certificate.transmission_ids().clone();
973 self.certificates.write().insert(certificate_id, certificate);
975 self.batch_ids.write().insert(certificate_id, round);
977
978 let missing_transmissions = transmission_ids
980 .iter()
981 .map(|id| (*id, Transmission::Transaction(snarkvm::ledger::narwhal::Data::Buffer(bytes::Bytes::new()))))
982 .collect::<HashMap<_, _>>();
983 self.transmissions.insert_transmissions(
985 certificate_id,
986 transmission_ids,
987 Default::default(),
988 missing_transmissions,
989 );
990 }
991}
992
993#[cfg(test)]
994pub(crate) mod tests {
995 use super::*;
996 use snarkos_node_bft_ledger_service::MockLedgerService;
997 use snarkos_node_bft_storage_service::BFTMemoryService;
998 use snarkvm::{
999 ledger::narwhal::{Data, batch_certificate::test_helpers::sample_batch_certificate_for_round_with_committee},
1000 prelude::{Rng, TestRng},
1001 };
1002
1003 use ::bytes::Bytes;
1004 use indexmap::indexset;
1005
1006 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1007
1008 pub fn assert_storage<N: Network>(
1010 storage: &Storage<N>,
1011 rounds: &[(u64, IndexSet<(Field<N>, Address<N>)>)],
1012 certificates: &[(Field<N>, BatchCertificate<N>)],
1013 batch_ids: &[(Field<N>, u64)],
1014 transmissions: &HashMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>,
1015 ) {
1016 assert_eq!(storage.rounds_iter().collect::<Vec<_>>(), *rounds);
1018 assert_eq!(storage.certificates_iter().collect::<Vec<_>>(), *certificates);
1020 assert_eq!(storage.batch_ids_iter().collect::<Vec<_>>(), *batch_ids);
1022 assert_eq!(storage.transmissions_iter().collect::<HashMap<_, _>>(), *transmissions);
1024 }
1025
1026 fn sample_transmission(rng: &mut TestRng) -> Transmission<CurrentNetwork> {
1028 let s = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
1030 let t =
1032 |rng: &mut TestRng| Data::Buffer(Bytes::from((0..2048).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
1033 match rng.random::<bool>() {
1035 true => Transmission::Solution(s(rng)),
1036 false => Transmission::Transaction(t(rng)),
1037 }
1038 }
1039
1040 pub(crate) fn sample_transmissions(
1042 certificate: &BatchCertificate<CurrentNetwork>,
1043 rng: &mut TestRng,
1044 ) -> (
1045 HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>,
1046 HashMap<TransmissionID<CurrentNetwork>, (Transmission<CurrentNetwork>, IndexSet<Field<CurrentNetwork>>)>,
1047 ) {
1048 let certificate_id = certificate.id();
1050
1051 let mut missing_transmissions = HashMap::new();
1052 let mut transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1053 for transmission_id in certificate.transmission_ids() {
1054 let transmission = sample_transmission(rng);
1056 missing_transmissions.insert(*transmission_id, transmission.clone());
1058 transmissions
1060 .entry(*transmission_id)
1061 .or_insert((transmission, Default::default()))
1062 .1
1063 .insert(certificate_id);
1064 }
1065 (missing_transmissions, transmissions)
1066 }
1067
1068 #[test]
1071 fn test_certificate_insert_remove() {
1072 let rng = &mut TestRng::default();
1073
1074 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1076 let ledger = Arc::new(MockLedgerService::new(committee));
1078 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1080
1081 assert_storage(&storage, &[], &[], &[], &Default::default());
1083
1084 let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1086 let certificate_id = certificate.id();
1088 let round = certificate.round();
1090 let author = certificate.author();
1092
1093 let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1095
1096 storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions);
1098 assert!(storage.contains_certificate(certificate_id));
1100 assert_eq!(storage.get_certificates_for_round(round), indexset! { certificate.clone() });
1102 assert_eq!(storage.get_certificate_for_round_with_author(round, author), Some(certificate.clone()));
1104
1105 {
1107 let rounds = [(round, indexset! { (certificate_id, author) })];
1109 let certificates = [(certificate_id, certificate.clone())];
1111 let batch_ids = [(certificate_id, round)];
1113 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1115 }
1116
1117 let candidate_certificate = storage.get_certificate(certificate_id).unwrap();
1119 assert_eq!(certificate, candidate_certificate);
1121
1122 assert!(storage.remove_certificate(certificate_id));
1124 assert!(!storage.contains_certificate(certificate_id));
1126 assert!(storage.get_certificates_for_round(round).is_empty());
1128 assert_eq!(storage.get_certificate_for_round_with_author(round, author), None);
1130 assert_storage(&storage, &[], &[], &[], &Default::default());
1132 }
1133
1134 #[test]
1135 fn test_certificate_duplicate() {
1136 let rng = &mut TestRng::default();
1137
1138 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1140 let ledger = Arc::new(MockLedgerService::new(committee));
1142 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1144
1145 assert_storage(&storage, &[], &[], &[], &Default::default());
1147
1148 let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1150 let certificate_id = certificate.id();
1152 let round = certificate.round();
1154 let author = certificate.author();
1156
1157 let rounds = [(round, indexset! { (certificate_id, author) })];
1159 let certificates = [(certificate_id, certificate.clone())];
1161 let batch_ids = [(certificate_id, round)];
1163 let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1165
1166 storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1168 assert!(storage.contains_certificate(certificate_id));
1170 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1172
1173 storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1175 assert!(storage.contains_certificate(certificate_id));
1177 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1179
1180 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1182 assert!(storage.contains_certificate(certificate_id));
1184 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1186 }
1187
1188 #[test]
1192 fn test_certificate_insert_with_aborted_transmissions() {
1193 use std::collections::HashSet;
1194
1195 let rng = &mut TestRng::default();
1196
1197 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1198 let ledger = Arc::new(MockLedgerService::new(committee));
1199 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1200
1201 let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1202 let certificate_id = certificate.id();
1203 let round = certificate.round();
1204 let transmission_ids: Vec<_> = certificate.transmission_ids().iter().copied().collect();
1205
1206 if transmission_ids.len() < 2 {
1207 let (missing_transmissions, _) = sample_transmissions(&certificate, rng);
1209 storage.insert_certificate_atomic(certificate.clone(), HashSet::new(), missing_transmissions);
1210 for id in certificate.transmission_ids() {
1211 assert!(storage.contains_transmission(*id));
1212 }
1213 return;
1214 }
1215
1216 let (all_missing, _) = sample_transmissions(&certificate, rng);
1217 let aborted_id = transmission_ids[0];
1218 let aborted_transmission_ids: HashSet<_> = [aborted_id].into_iter().collect();
1219 let mut missing_transmissions = all_missing;
1220 missing_transmissions.remove(&aborted_id);
1221
1222 storage.insert_certificate_atomic(certificate.clone(), aborted_transmission_ids, missing_transmissions);
1223
1224 assert!(storage.contains_certificate(certificate_id));
1225 assert_eq!(storage.get_certificates_for_round(round), indexset! { certificate.clone() });
1226
1227 for id in certificate.transmission_ids() {
1229 assert!(
1230 storage.contains_transmission(*id),
1231 "contains_transmission should be true for all transmission IDs including aborted {id:?}"
1232 );
1233 }
1234
1235 assert!(
1237 storage.get_transmission(aborted_id).is_none(),
1238 "Aborted transmission should not have content in storage"
1239 );
1240 for id in transmission_ids.iter().skip(1) {
1241 assert!(
1242 storage.get_transmission(*id).is_some(),
1243 "Non-aborted transmission {id:?} should have content in storage"
1244 );
1245 }
1246 }
1247
1248 #[test]
1250 fn test_valid_incoming_certificate() {
1251 let rng = &mut TestRng::default();
1252
1253 let (committee, private_keys) =
1255 snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 5, rng);
1256 let ledger = Arc::new(MockLedgerService::new(committee));
1258 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1260
1261 let mut previous_certs = IndexSet::default();
1263
1264 for round in 1..=100 {
1265 let mut new_certs = IndexSet::default();
1266
1267 for private_key in private_keys.iter() {
1269 let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1270
1271 let certificate = sample_batch_certificate_for_round_with_committee(
1272 round,
1273 previous_certs.clone(),
1274 private_key,
1275 &other_keys,
1276 rng,
1277 );
1278 storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1279 new_certs.insert(certificate.id());
1280
1281 let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1283 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1284 }
1285
1286 previous_certs = new_certs;
1287 }
1288 }
1289
1290 #[test]
1292 fn test_invalid_incoming_certificate_missing_signature() {
1293 let rng = &mut TestRng::default();
1294
1295 let (committee, private_keys) =
1297 snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1298 let ledger = Arc::new(MockLedgerService::new(committee));
1300 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1302
1303 let mut previous_certs = IndexSet::default();
1305
1306 for round in 1..=5 {
1307 let mut new_certs = IndexSet::default();
1308
1309 for private_key in private_keys.iter() {
1311 if round < 5 {
1312 let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1313
1314 let certificate = sample_batch_certificate_for_round_with_committee(
1315 round,
1316 previous_certs.clone(),
1317 private_key,
1318 &other_keys,
1319 rng,
1320 );
1321 storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1322 new_certs.insert(certificate.id());
1323
1324 let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1326 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1327 } else {
1328 let other_keys: Vec<_> = private_keys[0..=3].iter().cloned().filter(|k| k != private_key).collect();
1330
1331 let certificate = sample_batch_certificate_for_round_with_committee(
1332 round,
1333 previous_certs.clone(),
1334 private_key,
1335 &other_keys,
1336 rng,
1337 );
1338 assert!(storage.check_incoming_certificate(&certificate).is_err());
1339 }
1340 }
1341
1342 previous_certs = new_certs;
1343 }
1344 }
1345
1346 #[test]
1348 fn test_invalid_certificate_insufficient_previous_certs() {
1349 let rng = &mut TestRng::default();
1350
1351 let (committee, private_keys) =
1353 snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1354 let ledger = Arc::new(MockLedgerService::new(committee));
1356 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1358
1359 let mut previous_certs = IndexSet::default();
1361
1362 for round in 1..=6 {
1363 let mut new_certs = IndexSet::default();
1364
1365 for private_key in private_keys.iter() {
1367 let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1368
1369 let certificate = sample_batch_certificate_for_round_with_committee(
1370 round,
1371 previous_certs.clone(),
1372 private_key,
1373 &other_keys,
1374 rng,
1375 );
1376
1377 let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1379 let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1380
1381 if round <= 5 {
1382 new_certs.insert(certificate.id());
1383 storage
1384 .insert_certificate(certificate, transmissions, Default::default())
1385 .expect("Valid certificate rejected");
1386 } else {
1387 assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1388 }
1389 }
1390
1391 if round < 5 {
1392 previous_certs = new_certs;
1393 } else {
1394 previous_certs = new_certs.into_iter().skip(6).collect();
1396 }
1397 }
1398 }
1399
1400 #[test]
1402 fn test_invalid_certificate_wrong_round_number() {
1403 let rng = &mut TestRng::default();
1404
1405 let (committee, private_keys) =
1407 snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1408 let ledger = Arc::new(MockLedgerService::new(committee));
1410 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1412
1413 let mut previous_certs = IndexSet::default();
1415
1416 for round in 1..=6 {
1417 let mut new_certs = IndexSet::default();
1418
1419 for private_key in private_keys.iter() {
1421 let cert_round = round.min(5); let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1423
1424 let certificate = sample_batch_certificate_for_round_with_committee(
1425 cert_round,
1426 previous_certs.clone(),
1427 private_key,
1428 &other_keys,
1429 rng,
1430 );
1431
1432 let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1434 let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1435
1436 if round <= 5 {
1437 new_certs.insert(certificate.id());
1438 storage
1439 .insert_certificate(certificate, transmissions, Default::default())
1440 .expect("Valid certificate rejected");
1441 } else {
1442 assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1443 }
1444 }
1445
1446 if round < 5 {
1447 previous_certs = new_certs;
1448 } else {
1449 previous_certs = new_certs.into_iter().skip(6).collect();
1451 }
1452 }
1453 }
1454}
1455
1456#[cfg(test)]
1457pub mod prop_tests {
1458 use super::*;
1459 use crate::helpers::{now, storage::tests::assert_storage};
1460 use snarkos_node_bft_events::committee_prop_tests::{CommitteeContext, ValidatorSet};
1461 use snarkos_node_bft_ledger_service::MockLedgerService;
1462 use snarkos_node_bft_storage_service::BFTMemoryService;
1463 use snarkvm::{
1464 ledger::{
1465 narwhal::{BatchHeader, Data},
1466 puzzle::SolutionID,
1467 },
1468 prelude::{Signature, Uniform},
1469 };
1470
1471 use ::bytes::Bytes;
1472 use indexmap::indexset;
1473 use proptest::{
1474 collection,
1475 prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any},
1476 prop_oneof,
1477 sample::{Selector, size_range},
1478 };
1479 use rand::{CryptoRng, SeedableRng, TryCryptoRng, TryRng};
1480 use rand_chacha::ChaChaRng;
1481 use std::fmt::Debug;
1482 use test_strategy::proptest;
1483
1484 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1485
1486 impl Arbitrary for Storage<CurrentNetwork> {
1487 type Parameters = CommitteeContext;
1488 type Strategy = BoxedStrategy<Storage<CurrentNetwork>>;
1489
1490 fn arbitrary() -> Self::Strategy {
1491 (any::<CommitteeContext>(), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1492 .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1493 let ledger = Arc::new(MockLedgerService::new(committee));
1494 Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds).unwrap()
1495 })
1496 .boxed()
1497 }
1498
1499 fn arbitrary_with(context: Self::Parameters) -> Self::Strategy {
1500 (Just(context), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1501 .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1502 let ledger = Arc::new(MockLedgerService::new(committee));
1503 Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds).unwrap()
1504 })
1505 .boxed()
1506 }
1507 }
1508
1509 #[derive(Debug)]
1512 pub struct CryptoTestRng(ChaChaRng);
1513
1514 impl Arbitrary for CryptoTestRng {
1515 type Parameters = ();
1516 type Strategy = BoxedStrategy<CryptoTestRng>;
1517
1518 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1519 use proptest::prelude::RngCore as ProptestRngCore;
1520 Just(0).prop_perturb(|_, mut rng| CryptoTestRng(ChaChaRng::seed_from_u64(rng.next_u64()))).boxed()
1521 }
1522 }
1523
1524 impl TryRng for CryptoTestRng {
1525 type Error = core::convert::Infallible;
1526
1527 fn try_next_u32(&mut self) -> Result<u32, Self::Error> {
1528 TryRng::try_next_u32(&mut self.0)
1529 }
1530
1531 fn try_next_u64(&mut self) -> Result<u64, Self::Error> {
1532 TryRng::try_next_u64(&mut self.0)
1533 }
1534
1535 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), Self::Error> {
1536 TryRng::try_fill_bytes(&mut self.0, dest)
1537 }
1538 }
1539
1540 impl TryCryptoRng for CryptoTestRng {}
1541
1542 #[derive(Debug, Clone)]
1543 pub struct AnyTransmission(pub Transmission<CurrentNetwork>);
1544
1545 impl Arbitrary for AnyTransmission {
1546 type Parameters = ();
1547 type Strategy = BoxedStrategy<AnyTransmission>;
1548
1549 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1550 any_transmission().prop_map(AnyTransmission).boxed()
1551 }
1552 }
1553
1554 #[derive(Debug, Clone)]
1555 pub struct AnyTransmissionID(pub TransmissionID<CurrentNetwork>);
1556
1557 impl Arbitrary for AnyTransmissionID {
1558 type Parameters = ();
1559 type Strategy = BoxedStrategy<AnyTransmissionID>;
1560
1561 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1562 any_transmission_id().prop_map(AnyTransmissionID).boxed()
1563 }
1564 }
1565
1566 fn any_transmission() -> BoxedStrategy<Transmission<CurrentNetwork>> {
1567 prop_oneof![
1568 (collection::vec(any::<u8>(), 512..=512))
1569 .prop_map(|bytes| Transmission::Solution(Data::Buffer(Bytes::from(bytes)))),
1570 (collection::vec(any::<u8>(), 2048..=2048))
1571 .prop_map(|bytes| Transmission::Transaction(Data::Buffer(Bytes::from(bytes)))),
1572 ]
1573 .boxed()
1574 }
1575
1576 pub fn any_solution_id() -> BoxedStrategy<SolutionID<CurrentNetwork>> {
1577 any::<u64>().prop_map(|x| x.into()).boxed()
1578 }
1579
1580 pub fn any_transaction_id() -> BoxedStrategy<<CurrentNetwork as Network>::TransactionID> {
1581 any::<u64>()
1582 .prop_map(|seed| {
1583 let rng = &mut ChaChaRng::seed_from_u64(seed);
1584 <CurrentNetwork as Network>::TransactionID::from(Field::rand(rng))
1585 })
1586 .boxed()
1587 }
1588
1589 pub fn any_transmission_id() -> BoxedStrategy<TransmissionID<CurrentNetwork>> {
1590 prop_oneof![
1591 (any_transaction_id(), any::<<CurrentNetwork as Network>::TransmissionChecksum>())
1592 .prop_map(|(id, cs)| TransmissionID::Transaction(id, cs)),
1593 (any_solution_id(), any::<<CurrentNetwork as Network>::TransmissionChecksum>())
1594 .prop_map(|(id, cs)| TransmissionID::Solution(id, cs)),
1595 ]
1596 .boxed()
1597 }
1598
1599 pub fn sign_batch_header<R: CryptoRng>(
1600 validator_set: &ValidatorSet,
1601 batch_header: &BatchHeader<CurrentNetwork>,
1602 rng: &mut R,
1603 ) -> IndexSet<Signature<CurrentNetwork>> {
1604 let mut signatures = IndexSet::with_capacity(validator_set.0.len());
1605 for validator in validator_set.0.iter() {
1606 let private_key = validator.private_key;
1607 signatures.insert(private_key.sign(&[batch_header.batch_id()], rng).unwrap());
1608 }
1609 signatures
1610 }
1611
1612 #[proptest]
1613 fn test_certificate_duplicate(
1614 context: CommitteeContext,
1615 #[any(size_range(1..16).lift())] transmissions: Vec<(AnyTransmissionID, AnyTransmission)>,
1616 mut rng: CryptoTestRng,
1617 selector: Selector,
1618 ) {
1619 let CommitteeContext(committee, ValidatorSet(validators)) = context;
1620 let committee_id = committee.id();
1621
1622 let ledger = Arc::new(MockLedgerService::new(committee));
1624 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1625
1626 assert_storage(&storage, &[], &[], &[], &Default::default());
1628
1629 let signer = selector.select(&validators);
1631
1632 let mut transmission_map = IndexMap::new();
1633
1634 for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter() {
1635 transmission_map.insert(*id, t.clone());
1636 }
1637
1638 let batch_header = BatchHeader::new(
1639 &signer.private_key,
1640 0,
1641 now(),
1642 committee_id,
1643 transmission_map.keys().cloned().collect(),
1644 Default::default(),
1645 &mut rng,
1646 )
1647 .unwrap();
1648
1649 let mut validators = validators.clone();
1652 validators.remove(signer);
1653
1654 let certificate = BatchCertificate::from(
1655 batch_header.clone(),
1656 sign_batch_header(&ValidatorSet(validators), &batch_header, &mut rng),
1657 )
1658 .unwrap();
1659
1660 let certificate_id = certificate.id();
1662 let mut internal_transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1663 for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter().cloned() {
1664 internal_transmissions.entry(id).or_insert((t, Default::default())).1.insert(certificate_id);
1665 }
1666
1667 let round = certificate.round();
1669 let author = certificate.author();
1671
1672 let rounds = [(round, indexset! { (certificate_id, author) })];
1674 let certificates = [(certificate_id, certificate.clone())];
1676 let batch_ids = [(certificate_id, round)];
1678
1679 let missing_transmissions: HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>> =
1681 transmission_map.into_iter().collect();
1682 storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1683 assert!(storage.contains_certificate(certificate_id));
1685 assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1687
1688 storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1690 assert!(storage.contains_certificate(certificate_id));
1692 assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1694
1695 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1697 assert!(storage.contains_certificate(certificate_id));
1699 assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1701 }
1702}