1use crate::helpers::{check_timestamp_for_liveness, fmt_id};
17use snarkos_node_bft_ledger_service::LedgerService;
18use snarkos_node_bft_storage_service::StorageService;
19use snarkvm::{
20 ledger::{
21 block::{Block, Transaction},
22 narwhal::{BatchCertificate, BatchHeader, Transmission, TransmissionID},
23 },
24 prelude::{Address, Field, Network, Result, anyhow, bail, ensure},
25 utilities::{cfg_into_iter, cfg_iter, cfg_sorted_by},
26};
27
28use indexmap::{IndexMap, IndexSet, map::Entry};
29#[cfg(feature = "locktick")]
30use locktick::parking_lot::RwLock;
31use lru::LruCache;
32#[cfg(not(feature = "locktick"))]
33use parking_lot::RwLock;
34#[cfg(not(feature = "serial"))]
35use rayon::prelude::*;
36use std::{
37 collections::{HashMap, HashSet},
38 num::NonZeroUsize,
39 sync::{
40 Arc,
41 atomic::{AtomicU32, AtomicU64, Ordering},
42 },
43};
44
45#[derive(Clone, Debug)]
46pub struct Storage<N: Network>(Arc<StorageInner<N>>);
47
48impl<N: Network> std::ops::Deref for Storage<N> {
49 type Target = Arc<StorageInner<N>>;
50
51 fn deref(&self) -> &Self::Target {
52 &self.0
53 }
54}
55
56#[derive(Debug)]
76pub struct StorageInner<N: Network> {
77 ledger: Arc<dyn LedgerService<N>>,
79 current_height: AtomicU32,
82 current_round: AtomicU64,
91 gc_round: AtomicU64,
93 max_gc_rounds: u64,
95 rounds: RwLock<IndexMap<u64, IndexSet<(Field<N>, Address<N>)>>>,
98 unprocessed_certificates: RwLock<LruCache<Field<N>, BatchCertificate<N>>>,
100 certificates: RwLock<IndexMap<Field<N>, BatchCertificate<N>>>,
102 batch_ids: RwLock<IndexMap<Field<N>, u64>>,
104 transmissions: Arc<dyn StorageService<N>>,
106}
107
108impl<N: Network> Storage<N> {
109 pub fn new(
111 ledger: Arc<dyn LedgerService<N>>,
112 transmissions: Arc<dyn StorageService<N>>,
113 max_gc_rounds: u64,
114 ) -> Self {
115 let committee = ledger.current_committee().expect("Ledger is missing a committee.");
118 let current_round = committee.starting_round().max(1);
120 let unprocessed_cache_size = NonZeroUsize::new((N::LATEST_MAX_CERTIFICATES().unwrap() * 2) as usize).unwrap();
122
123 let storage = Self(Arc::new(StorageInner {
125 ledger,
126 current_height: Default::default(),
127 current_round: Default::default(),
128 gc_round: Default::default(),
129 max_gc_rounds,
130 rounds: Default::default(),
131 unprocessed_certificates: RwLock::new(LruCache::new(unprocessed_cache_size)),
132 certificates: Default::default(),
133 batch_ids: Default::default(),
134 transmissions,
135 }));
136 storage.update_current_round(current_round);
138 storage.garbage_collect_certificates(current_round);
141 storage
143 }
144}
145
146impl<N: Network> Storage<N> {
147 pub fn current_height(&self) -> u32 {
149 self.current_height.load(Ordering::SeqCst)
151 }
152}
153
154impl<N: Network> Storage<N> {
155 pub fn current_round(&self) -> u64 {
157 self.current_round.load(Ordering::SeqCst)
159 }
160
161 pub fn gc_round(&self) -> u64 {
163 self.gc_round.load(Ordering::SeqCst)
165 }
166
167 pub fn max_gc_rounds(&self) -> u64 {
169 self.max_gc_rounds
170 }
171
172 pub fn increment_to_next_round(&self, current_round: u64) -> Result<u64> {
175 let next_round = current_round + 1;
177
178 {
180 let storage_round = self.current_round();
182 if next_round < storage_round {
184 return Ok(storage_round);
185 }
186 }
187
188 let current_committee = self.ledger.current_committee()?;
190 let starting_round = current_committee.starting_round();
192 if next_round < starting_round {
194 let latest_block_round = self.ledger.latest_round();
196 info!(
198 "Syncing primary round ({next_round}) with the current committee's starting round ({starting_round}). Syncing with the latest block round {latest_block_round}..."
199 );
200 self.sync_round_with_block(latest_block_round);
202 return Ok(latest_block_round);
204 }
205
206 self.update_current_round(next_round);
208
209 #[cfg(feature = "metrics")]
210 metrics::gauge(metrics::bft::LAST_STORED_ROUND, next_round as f64);
211
212 let storage_round = self.current_round();
214 let gc_round = self.gc_round();
216 ensure!(next_round == storage_round, "The next round {next_round} does not match in storage ({storage_round})");
218 ensure!(next_round >= gc_round, "The next round {next_round} is behind the GC round {gc_round}");
220
221 info!("Starting round {next_round}...");
223 Ok(next_round)
224 }
225
226 fn update_current_round(&self, next_round: u64) {
228 self.current_round.store(next_round, Ordering::SeqCst);
230 }
231
232 pub(crate) fn garbage_collect_certificates(&self, next_round: u64) {
234 let current_gc_round = self.gc_round();
236 let next_gc_round = next_round.saturating_sub(self.max_gc_rounds);
238 if next_gc_round > current_gc_round {
240 for gc_round in current_gc_round..=next_gc_round {
242 for id in self.get_certificate_ids_for_round(gc_round).into_iter() {
244 self.remove_certificate(id);
246 }
247 }
248 self.gc_round.store(next_gc_round, Ordering::SeqCst);
250 }
251 }
252}
253
254impl<N: Network> Storage<N> {
255 pub fn contains_certificates_for_round(&self, round: u64) -> bool {
257 self.rounds.read().contains_key(&round)
259 }
260
261 pub fn contains_certificate(&self, certificate_id: Field<N>) -> bool {
263 self.certificates.read().contains_key(&certificate_id)
265 }
266
267 pub fn contains_certificate_in_round_from(&self, round: u64, author: Address<N>) -> bool {
269 self.rounds.read().get(&round).is_some_and(|set| set.iter().any(|(_, a)| a == &author))
270 }
271
272 pub fn contains_unprocessed_certificate(&self, certificate_id: Field<N>) -> bool {
274 self.unprocessed_certificates.read().contains(&certificate_id)
276 }
277
278 pub fn contains_batch(&self, batch_id: Field<N>) -> bool {
280 self.batch_ids.read().contains_key(&batch_id)
282 }
283
284 pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
286 self.transmissions.contains_transmission(transmission_id.into())
287 }
288
289 pub fn get_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> Option<Transmission<N>> {
292 self.transmissions.get_transmission(transmission_id.into())
293 }
294
295 pub fn get_round_for_certificate(&self, certificate_id: Field<N>) -> Option<u64> {
298 self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
300 }
301
302 pub fn get_round_for_batch(&self, batch_id: Field<N>) -> Option<u64> {
305 self.batch_ids.read().get(&batch_id).copied()
307 }
308
309 pub fn get_certificate_round(&self, certificate_id: Field<N>) -> Option<u64> {
312 self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
314 }
315
316 pub fn get_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
319 self.certificates.read().get(&certificate_id).cloned()
321 }
322
323 pub fn get_unprocessed_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
326 self.unprocessed_certificates.read().peek(&certificate_id).cloned()
328 }
329
330 pub fn get_certificate_for_round_with_author(&self, round: u64, author: Address<N>) -> Option<BatchCertificate<N>> {
334 if let Some(entries) = self.rounds.read().get(&round) {
336 let certificates = self.certificates.read();
337 entries.iter().find_map(
338 |(certificate_id, a)| if a == &author { certificates.get(certificate_id).cloned() } else { None },
339 )
340 } else {
341 Default::default()
342 }
343 }
344
345 pub fn get_certificates_for_round(&self, round: u64) -> IndexSet<BatchCertificate<N>> {
348 if round == 0 {
350 return Default::default();
351 }
352 if let Some(entries) = self.rounds.read().get(&round) {
354 let certificates = self.certificates.read();
355 entries.iter().flat_map(|(certificate_id, _)| certificates.get(certificate_id).cloned()).collect()
356 } else {
357 Default::default()
358 }
359 }
360
361 pub fn get_certificate_ids_for_round(&self, round: u64) -> IndexSet<Field<N>> {
364 if round == 0 {
366 return Default::default();
367 }
368 if let Some(entries) = self.rounds.read().get(&round) {
370 entries.iter().map(|(certificate_id, _)| *certificate_id).collect()
371 } else {
372 Default::default()
373 }
374 }
375
376 pub fn get_certificate_authors_for_round(&self, round: u64) -> HashSet<Address<N>> {
379 if round == 0 {
381 return Default::default();
382 }
383 if let Some(entries) = self.rounds.read().get(&round) {
385 entries.iter().map(|(_, author)| *author).collect()
386 } else {
387 Default::default()
388 }
389 }
390
391 pub(crate) fn get_pending_certificates(&self) -> IndexSet<BatchCertificate<N>> {
394 let rounds = self.rounds.read();
396 let certificates = self.certificates.read();
397
398 cfg_sorted_by!(rounds.clone(), |a, _, b, _| a.cmp(b))
400 .flat_map(|(_, certificates_for_round)| {
401 cfg_into_iter!(certificates_for_round).filter_map(|(certificate_id, _)| {
403 if self.ledger.contains_certificate(&certificate_id).unwrap_or(false) {
405 None
406 } else {
407 certificates.get(&certificate_id).cloned()
409 }
410 })
411 })
412 .collect()
413 }
414
415 pub fn check_batch_header(
428 &self,
429 batch_header: &BatchHeader<N>,
430 transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
431 aborted_transmissions: HashSet<TransmissionID<N>>,
432 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
433 let round = batch_header.round();
435 let gc_round = self.gc_round();
437 let gc_log = format!("(gc = {gc_round})");
439
440 if self.contains_batch(batch_header.batch_id()) {
442 bail!("Batch for round {round} already exists in storage {gc_log}")
443 }
444
445 let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
447 bail!("Storage failed to retrieve the committee lookback for round {round} {gc_log}")
448 };
449 if !committee_lookback.is_committee_member(batch_header.author()) {
451 bail!("Author {} is not in the committee for round {round} {gc_log}", batch_header.author())
452 }
453
454 check_timestamp_for_liveness(batch_header.timestamp())?;
456
457 let missing_transmissions = self
459 .transmissions
460 .find_missing_transmissions(batch_header, transmissions, aborted_transmissions)
461 .map_err(|e| anyhow!("{e} for round {round} {gc_log}"))?;
462
463 let previous_round = round.saturating_sub(1);
465 if previous_round > gc_round {
467 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
469 bail!("Missing committee for the previous round {previous_round} in storage {gc_log}")
470 };
471 if !self.contains_certificates_for_round(previous_round) {
473 bail!("Missing certificates for the previous round {previous_round} in storage {gc_log}")
474 }
475 if batch_header.previous_certificate_ids().len() > previous_committee_lookback.num_members() {
477 bail!("Too many previous certificates for round {round} {gc_log}")
478 }
479 let mut previous_authors = HashSet::with_capacity(batch_header.previous_certificate_ids().len());
481 for previous_certificate_id in batch_header.previous_certificate_ids() {
483 let Some(previous_certificate) = self.get_certificate(*previous_certificate_id) else {
485 bail!(
486 "Missing previous certificate '{}' for certificate in round {round} {gc_log}",
487 fmt_id(previous_certificate_id)
488 )
489 };
490 if previous_certificate.round() != previous_round {
492 bail!("Round {round} certificate contains a round {previous_round} certificate {gc_log}")
493 }
494 if previous_authors.contains(&previous_certificate.author()) {
496 bail!("Round {round} certificate contains a duplicate author {gc_log}")
497 }
498 previous_authors.insert(previous_certificate.author());
500 }
501 if !previous_committee_lookback.is_quorum_threshold_reached(&previous_authors) {
503 bail!("Previous certificates for a batch in round {round} did not reach quorum threshold {gc_log}")
504 }
505 }
506 Ok(missing_transmissions)
507 }
508
509 pub fn check_incoming_certificate(&self, certificate: &BatchCertificate<N>) -> Result<()> {
521 let certificate_author = certificate.author();
523 let certificate_round = certificate.round();
524
525 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
527
528 let mut signers: HashSet<Address<N>> =
531 certificate.signatures().map(|signature| signature.to_address()).collect();
532 signers.insert(certificate_author);
533 ensure!(
534 committee_lookback.is_quorum_threshold_reached(&signers),
535 "Certificate '{}' for round {certificate_round} does not meet quorum requirements",
536 certificate.id()
537 );
538
539 cfg_iter!(&signers).try_for_each(|signer| {
541 ensure!(
542 committee_lookback.is_committee_member(*signer),
543 "Signer '{signer}' of certificate '{}' for round {certificate_round} is not in the committee",
544 certificate.id()
545 );
546 Ok(())
547 })?;
548
549 Ok(())
550 }
551
552 pub fn check_certificate(
568 &self,
569 certificate: &BatchCertificate<N>,
570 transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
571 aborted_transmissions: HashSet<TransmissionID<N>>,
572 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
573 let round = certificate.round();
575 let gc_round = self.gc_round();
577 let gc_log = format!("(gc = {gc_round})");
579
580 if self.contains_certificate(certificate.id()) {
582 bail!("Certificate for round {round} already exists in storage {gc_log}")
583 }
584
585 if self.contains_certificate_in_round_from(round, certificate.author()) {
587 bail!("Certificate with this author for round {round} already exists in storage {gc_log}")
588 }
589
590 let missing_transmissions =
592 self.check_batch_header(certificate.batch_header(), transmissions, aborted_transmissions)?;
593
594 check_timestamp_for_liveness(certificate.timestamp())?;
596
597 let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
599 bail!("Storage failed to retrieve the committee for round {round} {gc_log}")
600 };
601
602 let mut signers = HashSet::with_capacity(certificate.signatures().len() + 1);
604 signers.insert(certificate.author());
606
607 for signature in certificate.signatures() {
609 let signer = signature.to_address();
611 if !committee_lookback.is_committee_member(signer) {
613 bail!("Signer {signer} is not in the committee for round {round} {gc_log}")
614 }
615 signers.insert(signer);
617 }
618
619 if !committee_lookback.is_quorum_threshold_reached(&signers) {
621 bail!("Signatures for a batch in round {round} did not reach quorum threshold {gc_log}")
622 }
623 Ok(missing_transmissions)
624 }
625
626 pub fn insert_certificate(
638 &self,
639 certificate: BatchCertificate<N>,
640 transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
641 aborted_transmissions: HashSet<TransmissionID<N>>,
642 ) -> Result<()> {
643 ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
645 let missing_transmissions =
647 self.check_certificate(&certificate, transmissions, aborted_transmissions.clone())?;
648 self.insert_certificate_atomic(certificate, aborted_transmissions, missing_transmissions);
650 Ok(())
651 }
652
653 fn insert_certificate_atomic(
659 &self,
660 certificate: BatchCertificate<N>,
661 aborted_transmission_ids: HashSet<TransmissionID<N>>,
662 missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
663 ) {
664 let round = certificate.round();
666 let certificate_id = certificate.id();
668 let author = certificate.author();
670
671 self.rounds.write().entry(round).or_default().insert((certificate_id, author));
673 let transmission_ids = certificate.transmission_ids().clone();
675 self.certificates.write().insert(certificate_id, certificate);
677 self.unprocessed_certificates.write().pop(&certificate_id);
679 self.batch_ids.write().insert(certificate_id, round);
681 self.transmissions.insert_transmissions(
683 certificate_id,
684 transmission_ids,
685 aborted_transmission_ids,
686 missing_transmissions,
687 );
688 }
689
690 pub fn insert_unprocessed_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
694 ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
696 self.unprocessed_certificates.write().put(certificate.id(), certificate);
698
699 Ok(())
700 }
701
702 fn remove_certificate(&self, certificate_id: Field<N>) -> bool {
709 let Some(certificate) = self.get_certificate(certificate_id) else {
711 warn!("Certificate {certificate_id} does not exist in storage");
712 return false;
713 };
714 let round = certificate.round();
716 let author = certificate.author();
718
719 match self.rounds.write().entry(round) {
725 Entry::Occupied(mut entry) => {
726 entry.get_mut().swap_remove(&(certificate_id, author));
728 if entry.get().is_empty() {
730 entry.swap_remove();
731 }
732 }
733 Entry::Vacant(_) => {}
734 }
735 self.certificates.write().swap_remove(&certificate_id);
737 self.unprocessed_certificates.write().pop(&certificate_id);
739 self.batch_ids.write().swap_remove(&certificate_id);
741 self.transmissions.remove_transmissions(&certificate_id, certificate.transmission_ids());
743 true
745 }
746}
747
748impl<N: Network> Storage<N> {
749 pub(crate) fn sync_height_with_block(&self, next_height: u32) {
751 if next_height > self.current_height() {
753 self.current_height.store(next_height, Ordering::SeqCst);
755 }
756 }
757
758 pub(crate) fn sync_round_with_block(&self, next_round: u64) {
760 let next_round = next_round.max(1);
762 if next_round > self.current_round() {
764 self.update_current_round(next_round);
766 info!("Synced to round {next_round}...");
768 }
769 }
770
771 pub(crate) fn sync_certificate_with_block(
773 &self,
774 block: &Block<N>,
775 certificate: BatchCertificate<N>,
776 unconfirmed_transactions: &HashMap<N::TransactionID, Transaction<N>>,
777 ) {
778 let gc_round = self.gc_round();
780 if certificate.round() <= gc_round {
781 trace!("Got certificate for round {} below GC round ({gc_round}). Will not store it.", certificate.round());
782 return;
783 }
784
785 if self.contains_certificate(certificate.id()) {
787 trace!("Got certificate {} for round {} more than once.", certificate.id(), certificate.round());
788 return;
789 }
790 let mut missing_transmissions = HashMap::new();
792
793 let mut aborted_transmissions = HashSet::new();
795
796 let aborted_solutions: IndexSet<_> = block.aborted_solution_ids().iter().collect();
798 let aborted_transactions: IndexSet<_> = block.aborted_transaction_ids().iter().collect();
799
800 for transmission_id in certificate.transmission_ids() {
802 if missing_transmissions.contains_key(transmission_id) {
804 continue;
805 }
806 if self.contains_transmission(*transmission_id) {
808 continue;
809 }
810 match transmission_id {
812 TransmissionID::Ratification => (),
813 TransmissionID::Solution(solution_id, _) => {
814 match block.get_solution(solution_id) {
816 Some(solution) => missing_transmissions.insert(*transmission_id, (*solution).into()),
818 None => match self.ledger.get_solution(solution_id) {
820 Ok(solution) => missing_transmissions.insert(*transmission_id, solution.into()),
822 Err(_) => {
824 match aborted_solutions.contains(solution_id)
826 || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
827 {
828 true => {
829 aborted_transmissions.insert(*transmission_id);
830 }
831 false => error!("Missing solution {solution_id} in block {}", block.height()),
832 }
833 continue;
834 }
835 },
836 };
837 }
838 TransmissionID::Transaction(transaction_id, _) => {
839 match unconfirmed_transactions.get(transaction_id) {
841 Some(transaction) => missing_transmissions.insert(*transmission_id, transaction.clone().into()),
843 None => match self.ledger.get_unconfirmed_transaction(*transaction_id) {
845 Ok(transaction) => missing_transmissions.insert(*transmission_id, transaction.into()),
847 Err(_) => {
849 match aborted_transactions.contains(transaction_id)
851 || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
852 {
853 true => {
854 aborted_transmissions.insert(*transmission_id);
855 }
856 false => warn!("Missing transaction {transaction_id} in block {}", block.height()),
857 }
858 continue;
859 }
860 },
861 };
862 }
863 }
864 }
865 let certificate_id = fmt_id(certificate.id());
867 debug!(
868 "Syncing certificate '{certificate_id}' for round {} with {} transmissions",
869 certificate.round(),
870 certificate.transmission_ids().len()
871 );
872 if let Err(error) = self.insert_certificate(certificate, missing_transmissions, aborted_transmissions) {
873 error!("Failed to insert certificate '{certificate_id}' from block {} - {error}", block.height());
874 }
875 }
876}
877
878#[cfg(test)]
879impl<N: Network> Storage<N> {
880 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
882 &self.ledger
883 }
884
885 pub fn rounds_iter(&self) -> impl Iterator<Item = (u64, IndexSet<(Field<N>, Address<N>)>)> {
887 self.rounds.read().clone().into_iter()
888 }
889
890 pub fn certificates_iter(&self) -> impl Iterator<Item = (Field<N>, BatchCertificate<N>)> {
892 self.certificates.read().clone().into_iter()
893 }
894
895 pub fn batch_ids_iter(&self) -> impl Iterator<Item = (Field<N>, u64)> {
897 self.batch_ids.read().clone().into_iter()
898 }
899
900 pub fn transmissions_iter(
902 &self,
903 ) -> impl Iterator<Item = (TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>))> {
904 self.transmissions.as_hashmap().into_iter()
905 }
906
907 #[cfg(test)]
911 #[doc(hidden)]
912 pub(crate) fn testing_only_insert_certificate_testing_only(&self, certificate: BatchCertificate<N>) {
913 let round = certificate.round();
915 let certificate_id = certificate.id();
917 let author = certificate.author();
919
920 self.rounds.write().entry(round).or_default().insert((certificate_id, author));
922 let transmission_ids = certificate.transmission_ids().clone();
924 self.certificates.write().insert(certificate_id, certificate);
926 self.batch_ids.write().insert(certificate_id, round);
928
929 let missing_transmissions = transmission_ids
931 .iter()
932 .map(|id| (*id, Transmission::Transaction(snarkvm::ledger::narwhal::Data::Buffer(bytes::Bytes::new()))))
933 .collect::<HashMap<_, _>>();
934 self.transmissions.insert_transmissions(
936 certificate_id,
937 transmission_ids,
938 Default::default(),
939 missing_transmissions,
940 );
941 }
942}
943
944#[cfg(test)]
945pub(crate) mod tests {
946 use super::*;
947 use snarkos_node_bft_ledger_service::MockLedgerService;
948 use snarkos_node_bft_storage_service::BFTMemoryService;
949 use snarkvm::{
950 ledger::narwhal::{Data, batch_certificate::test_helpers::sample_batch_certificate_for_round_with_committee},
951 prelude::{Rng, TestRng},
952 };
953
954 use ::bytes::Bytes;
955 use indexmap::indexset;
956
957 type CurrentNetwork = snarkvm::prelude::MainnetV0;
958
959 pub fn assert_storage<N: Network>(
961 storage: &Storage<N>,
962 rounds: &[(u64, IndexSet<(Field<N>, Address<N>)>)],
963 certificates: &[(Field<N>, BatchCertificate<N>)],
964 batch_ids: &[(Field<N>, u64)],
965 transmissions: &HashMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>,
966 ) {
967 assert_eq!(storage.rounds_iter().collect::<Vec<_>>(), *rounds);
969 assert_eq!(storage.certificates_iter().collect::<Vec<_>>(), *certificates);
971 assert_eq!(storage.batch_ids_iter().collect::<Vec<_>>(), *batch_ids);
973 assert_eq!(storage.transmissions_iter().collect::<HashMap<_, _>>(), *transmissions);
975 }
976
977 fn sample_transmission(rng: &mut TestRng) -> Transmission<CurrentNetwork> {
979 let s = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
981 let t = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..2048).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
983 match rng.r#gen::<bool>() {
985 true => Transmission::Solution(s(rng)),
986 false => Transmission::Transaction(t(rng)),
987 }
988 }
989
990 pub(crate) fn sample_transmissions(
992 certificate: &BatchCertificate<CurrentNetwork>,
993 rng: &mut TestRng,
994 ) -> (
995 HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>,
996 HashMap<TransmissionID<CurrentNetwork>, (Transmission<CurrentNetwork>, IndexSet<Field<CurrentNetwork>>)>,
997 ) {
998 let certificate_id = certificate.id();
1000
1001 let mut missing_transmissions = HashMap::new();
1002 let mut transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1003 for transmission_id in certificate.transmission_ids() {
1004 let transmission = sample_transmission(rng);
1006 missing_transmissions.insert(*transmission_id, transmission.clone());
1008 transmissions
1010 .entry(*transmission_id)
1011 .or_insert((transmission, Default::default()))
1012 .1
1013 .insert(certificate_id);
1014 }
1015 (missing_transmissions, transmissions)
1016 }
1017
1018 #[test]
1021 fn test_certificate_insert_remove() {
1022 let rng = &mut TestRng::default();
1023
1024 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1026 let ledger = Arc::new(MockLedgerService::new(committee));
1028 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1030
1031 assert_storage(&storage, &[], &[], &[], &Default::default());
1033
1034 let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1036 let certificate_id = certificate.id();
1038 let round = certificate.round();
1040 let author = certificate.author();
1042
1043 let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1045
1046 storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions);
1048 assert!(storage.contains_certificate(certificate_id));
1050 assert_eq!(storage.get_certificates_for_round(round), indexset! { certificate.clone() });
1052 assert_eq!(storage.get_certificate_for_round_with_author(round, author), Some(certificate.clone()));
1054
1055 {
1057 let rounds = [(round, indexset! { (certificate_id, author) })];
1059 let certificates = [(certificate_id, certificate.clone())];
1061 let batch_ids = [(certificate_id, round)];
1063 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1065 }
1066
1067 let candidate_certificate = storage.get_certificate(certificate_id).unwrap();
1069 assert_eq!(certificate, candidate_certificate);
1071
1072 assert!(storage.remove_certificate(certificate_id));
1074 assert!(!storage.contains_certificate(certificate_id));
1076 assert!(storage.get_certificates_for_round(round).is_empty());
1078 assert_eq!(storage.get_certificate_for_round_with_author(round, author), None);
1080 assert_storage(&storage, &[], &[], &[], &Default::default());
1082 }
1083
1084 #[test]
1085 fn test_certificate_duplicate() {
1086 let rng = &mut TestRng::default();
1087
1088 let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1090 let ledger = Arc::new(MockLedgerService::new(committee));
1092 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1094
1095 assert_storage(&storage, &[], &[], &[], &Default::default());
1097
1098 let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1100 let certificate_id = certificate.id();
1102 let round = certificate.round();
1104 let author = certificate.author();
1106
1107 let rounds = [(round, indexset! { (certificate_id, author) })];
1109 let certificates = [(certificate_id, certificate.clone())];
1111 let batch_ids = [(certificate_id, round)];
1113 let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1115
1116 storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1118 assert!(storage.contains_certificate(certificate_id));
1120 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1122
1123 storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1125 assert!(storage.contains_certificate(certificate_id));
1127 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1129
1130 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1132 assert!(storage.contains_certificate(certificate_id));
1134 assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1136 }
1137
1138 #[test]
1140 fn test_valid_incoming_certificate() {
1141 let rng = &mut TestRng::default();
1142
1143 let (committee, private_keys) =
1145 snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 5, rng);
1146 let ledger = Arc::new(MockLedgerService::new(committee));
1148 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1150
1151 let mut previous_certs = IndexSet::default();
1153
1154 for round in 1..=100 {
1155 let mut new_certs = IndexSet::default();
1156
1157 for private_key in private_keys.iter() {
1159 let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1160
1161 let certificate = sample_batch_certificate_for_round_with_committee(
1162 round,
1163 previous_certs.clone(),
1164 private_key,
1165 &other_keys,
1166 rng,
1167 );
1168 storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1169 new_certs.insert(certificate.id());
1170
1171 let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1173 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1174 }
1175
1176 previous_certs = new_certs;
1177 }
1178 }
1179
1180 #[test]
1182 fn test_invalid_incoming_certificate_missing_signature() {
1183 let rng = &mut TestRng::default();
1184
1185 let (committee, private_keys) =
1187 snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1188 let ledger = Arc::new(MockLedgerService::new(committee));
1190 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1192
1193 let mut previous_certs = IndexSet::default();
1195
1196 for round in 1..=5 {
1197 let mut new_certs = IndexSet::default();
1198
1199 for private_key in private_keys.iter() {
1201 if round < 5 {
1202 let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1203
1204 let certificate = sample_batch_certificate_for_round_with_committee(
1205 round,
1206 previous_certs.clone(),
1207 private_key,
1208 &other_keys,
1209 rng,
1210 );
1211 storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1212 new_certs.insert(certificate.id());
1213
1214 let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1216 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1217 } else {
1218 let other_keys: Vec<_> = private_keys[0..=3].iter().cloned().filter(|k| k != private_key).collect();
1220
1221 let certificate = sample_batch_certificate_for_round_with_committee(
1222 round,
1223 previous_certs.clone(),
1224 private_key,
1225 &other_keys,
1226 rng,
1227 );
1228 assert!(storage.check_incoming_certificate(&certificate).is_err());
1229 }
1230 }
1231
1232 previous_certs = new_certs;
1233 }
1234 }
1235
1236 #[test]
1238 fn test_invalid_certificate_insufficient_previous_certs() {
1239 let rng = &mut TestRng::default();
1240
1241 let (committee, private_keys) =
1243 snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1244 let ledger = Arc::new(MockLedgerService::new(committee));
1246 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1248
1249 let mut previous_certs = IndexSet::default();
1251
1252 for round in 1..=6 {
1253 let mut new_certs = IndexSet::default();
1254
1255 for private_key in private_keys.iter() {
1257 let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1258
1259 let certificate = sample_batch_certificate_for_round_with_committee(
1260 round,
1261 previous_certs.clone(),
1262 private_key,
1263 &other_keys,
1264 rng,
1265 );
1266
1267 let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1269 let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1270
1271 if round <= 5 {
1272 new_certs.insert(certificate.id());
1273 storage
1274 .insert_certificate(certificate, transmissions, Default::default())
1275 .expect("Valid certificate rejected");
1276 } else {
1277 assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1278 }
1279 }
1280
1281 if round < 5 {
1282 previous_certs = new_certs;
1283 } else {
1284 previous_certs = new_certs.into_iter().skip(6).collect();
1286 }
1287 }
1288 }
1289
1290 #[test]
1292 fn test_invalid_certificate_wrong_round_number() {
1293 let rng = &mut TestRng::default();
1294
1295 let (committee, private_keys) =
1297 snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1298 let ledger = Arc::new(MockLedgerService::new(committee));
1300 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1302
1303 let mut previous_certs = IndexSet::default();
1305
1306 for round in 1..=6 {
1307 let mut new_certs = IndexSet::default();
1308
1309 for private_key in private_keys.iter() {
1311 let cert_round = round.min(5); let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1313
1314 let certificate = sample_batch_certificate_for_round_with_committee(
1315 cert_round,
1316 previous_certs.clone(),
1317 private_key,
1318 &other_keys,
1319 rng,
1320 );
1321
1322 let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1324 let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1325
1326 if round <= 5 {
1327 new_certs.insert(certificate.id());
1328 storage
1329 .insert_certificate(certificate, transmissions, Default::default())
1330 .expect("Valid certificate rejected");
1331 } else {
1332 assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1333 }
1334 }
1335
1336 if round < 5 {
1337 previous_certs = new_certs;
1338 } else {
1339 previous_certs = new_certs.into_iter().skip(6).collect();
1341 }
1342 }
1343 }
1344}
1345
1346#[cfg(test)]
1347pub mod prop_tests {
1348 use super::*;
1349 use crate::helpers::{now, storage::tests::assert_storage};
1350 use snarkos_node_bft_ledger_service::MockLedgerService;
1351 use snarkos_node_bft_storage_service::BFTMemoryService;
1352 use snarkvm::{
1353 ledger::{
1354 committee::prop_tests::{CommitteeContext, ValidatorSet},
1355 narwhal::{BatchHeader, Data},
1356 puzzle::SolutionID,
1357 },
1358 prelude::{Signature, Uniform},
1359 };
1360
1361 use ::bytes::Bytes;
1362 use indexmap::indexset;
1363 use proptest::{
1364 collection,
1365 prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any},
1366 prop_oneof,
1367 sample::{Selector, size_range},
1368 test_runner::TestRng,
1369 };
1370 use rand::{CryptoRng, Error, Rng, RngCore};
1371 use std::fmt::Debug;
1372 use test_strategy::proptest;
1373
1374 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1375
1376 impl Arbitrary for Storage<CurrentNetwork> {
1377 type Parameters = CommitteeContext;
1378 type Strategy = BoxedStrategy<Storage<CurrentNetwork>>;
1379
1380 fn arbitrary() -> Self::Strategy {
1381 (any::<CommitteeContext>(), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1382 .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1383 let ledger = Arc::new(MockLedgerService::new(committee));
1384 Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds)
1385 })
1386 .boxed()
1387 }
1388
1389 fn arbitrary_with(context: Self::Parameters) -> Self::Strategy {
1390 (Just(context), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1391 .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1392 let ledger = Arc::new(MockLedgerService::new(committee));
1393 Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds)
1394 })
1395 .boxed()
1396 }
1397 }
1398
1399 #[derive(Debug)]
1401 pub struct CryptoTestRng(TestRng);
1402
1403 impl Arbitrary for CryptoTestRng {
1404 type Parameters = ();
1405 type Strategy = BoxedStrategy<CryptoTestRng>;
1406
1407 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1408 Just(0).prop_perturb(|_, rng| CryptoTestRng(rng)).boxed()
1409 }
1410 }
1411 impl RngCore for CryptoTestRng {
1412 fn next_u32(&mut self) -> u32 {
1413 self.0.next_u32()
1414 }
1415
1416 fn next_u64(&mut self) -> u64 {
1417 self.0.next_u64()
1418 }
1419
1420 fn fill_bytes(&mut self, dest: &mut [u8]) {
1421 self.0.fill_bytes(dest);
1422 }
1423
1424 fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), Error> {
1425 self.0.try_fill_bytes(dest)
1426 }
1427 }
1428
1429 impl CryptoRng for CryptoTestRng {}
1430
1431 #[derive(Debug, Clone)]
1432 pub struct AnyTransmission(pub Transmission<CurrentNetwork>);
1433
1434 impl Arbitrary for AnyTransmission {
1435 type Parameters = ();
1436 type Strategy = BoxedStrategy<AnyTransmission>;
1437
1438 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1439 any_transmission().prop_map(AnyTransmission).boxed()
1440 }
1441 }
1442
1443 #[derive(Debug, Clone)]
1444 pub struct AnyTransmissionID(pub TransmissionID<CurrentNetwork>);
1445
1446 impl Arbitrary for AnyTransmissionID {
1447 type Parameters = ();
1448 type Strategy = BoxedStrategy<AnyTransmissionID>;
1449
1450 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1451 any_transmission_id().prop_map(AnyTransmissionID).boxed()
1452 }
1453 }
1454
1455 fn any_transmission() -> BoxedStrategy<Transmission<CurrentNetwork>> {
1456 prop_oneof![
1457 (collection::vec(any::<u8>(), 512..=512))
1458 .prop_map(|bytes| Transmission::Solution(Data::Buffer(Bytes::from(bytes)))),
1459 (collection::vec(any::<u8>(), 2048..=2048))
1460 .prop_map(|bytes| Transmission::Transaction(Data::Buffer(Bytes::from(bytes)))),
1461 ]
1462 .boxed()
1463 }
1464
1465 pub fn any_solution_id() -> BoxedStrategy<SolutionID<CurrentNetwork>> {
1466 Just(0).prop_perturb(|_, rng| CryptoTestRng(rng).r#gen::<u64>().into()).boxed()
1467 }
1468
1469 pub fn any_transaction_id() -> BoxedStrategy<<CurrentNetwork as Network>::TransactionID> {
1470 Just(0)
1471 .prop_perturb(|_, rng| {
1472 <CurrentNetwork as Network>::TransactionID::from(Field::rand(&mut CryptoTestRng(rng)))
1473 })
1474 .boxed()
1475 }
1476
1477 pub fn any_transmission_id() -> BoxedStrategy<TransmissionID<CurrentNetwork>> {
1478 prop_oneof![
1479 any_transaction_id().prop_perturb(|id, mut rng| TransmissionID::Transaction(
1480 id,
1481 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>()
1482 )),
1483 any_solution_id().prop_perturb(|id, mut rng| TransmissionID::Solution(
1484 id,
1485 rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>()
1486 )),
1487 ]
1488 .boxed()
1489 }
1490
1491 pub fn sign_batch_header<R: Rng + CryptoRng>(
1492 validator_set: &ValidatorSet,
1493 batch_header: &BatchHeader<CurrentNetwork>,
1494 rng: &mut R,
1495 ) -> IndexSet<Signature<CurrentNetwork>> {
1496 let mut signatures = IndexSet::with_capacity(validator_set.0.len());
1497 for validator in validator_set.0.iter() {
1498 let private_key = validator.private_key;
1499 signatures.insert(private_key.sign(&[batch_header.batch_id()], rng).unwrap());
1500 }
1501 signatures
1502 }
1503
1504 #[proptest]
1505 fn test_certificate_duplicate(
1506 context: CommitteeContext,
1507 #[any(size_range(1..16).lift())] transmissions: Vec<(AnyTransmissionID, AnyTransmission)>,
1508 mut rng: CryptoTestRng,
1509 selector: Selector,
1510 ) {
1511 let CommitteeContext(committee, ValidatorSet(validators)) = context;
1512 let committee_id = committee.id();
1513
1514 let ledger = Arc::new(MockLedgerService::new(committee));
1516 let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1517
1518 assert_storage(&storage, &[], &[], &[], &Default::default());
1520
1521 let signer = selector.select(&validators);
1523
1524 let mut transmission_map = IndexMap::new();
1525
1526 for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter() {
1527 transmission_map.insert(*id, t.clone());
1528 }
1529
1530 let batch_header = BatchHeader::new(
1531 &signer.private_key,
1532 0,
1533 now(),
1534 committee_id,
1535 transmission_map.keys().cloned().collect(),
1536 Default::default(),
1537 &mut rng,
1538 )
1539 .unwrap();
1540
1541 let mut validators = validators.clone();
1544 validators.remove(signer);
1545
1546 let certificate = BatchCertificate::from(
1547 batch_header.clone(),
1548 sign_batch_header(&ValidatorSet(validators), &batch_header, &mut rng),
1549 )
1550 .unwrap();
1551
1552 let certificate_id = certificate.id();
1554 let mut internal_transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1555 for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter().cloned() {
1556 internal_transmissions.entry(id).or_insert((t, Default::default())).1.insert(certificate_id);
1557 }
1558
1559 let round = certificate.round();
1561 let author = certificate.author();
1563
1564 let rounds = [(round, indexset! { (certificate_id, author) })];
1566 let certificates = [(certificate_id, certificate.clone())];
1568 let batch_ids = [(certificate_id, round)];
1570
1571 let missing_transmissions: HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>> =
1573 transmission_map.into_iter().collect();
1574 storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1575 assert!(storage.contains_certificate(certificate_id));
1577 assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1579
1580 storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1582 assert!(storage.contains_certificate(certificate_id));
1584 assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1586
1587 storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1589 assert!(storage.contains_certificate(certificate_id));
1591 assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1593 }
1594}