1use crate::{
17 Gateway,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 PRIMARY_PING_IN_MS,
22 Sync,
23 Transport,
24 WORKER_PING_IN_MS,
25 Worker,
26 events::{BatchPropose, BatchSignature, Event},
27 helpers::{
28 BFTSender,
29 PrimaryReceiver,
30 PrimarySender,
31 Proposal,
32 ProposalCache,
33 SignedProposals,
34 Storage,
35 assign_to_worker,
36 assign_to_workers,
37 fmt_id,
38 init_sync_channels,
39 init_worker_channels,
40 now,
41 },
42 spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_sync::DUMMY_SELF_IP;
48use snarkvm::{
49 console::{
50 prelude::*,
51 types::{Address, Field},
52 },
53 ledger::{
54 block::Transaction,
55 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
56 puzzle::{Solution, SolutionID},
57 },
58 prelude::committee::Committee,
59};
60
61use colored::Colorize;
62use futures::stream::{FuturesUnordered, StreamExt};
63use indexmap::{IndexMap, IndexSet};
64use parking_lot::{Mutex, RwLock};
65use rayon::prelude::*;
66use std::{
67 collections::{HashMap, HashSet},
68 future::Future,
69 net::SocketAddr,
70 sync::Arc,
71 time::Duration,
72};
73use tokio::{
74 sync::{Mutex as TMutex, OnceCell},
75 task::JoinHandle,
76};
77
78pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
80
81#[derive(Clone)]
82pub struct Primary<N: Network> {
83 sync: Sync<N>,
85 gateway: Gateway<N>,
87 storage: Storage<N>,
89 ledger: Arc<dyn LedgerService<N>>,
91 workers: Arc<[Worker<N>]>,
93 bft_sender: Arc<OnceCell<BFTSender<N>>>,
95 proposed_batch: Arc<ProposedBatch<N>>,
97 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
99 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
101 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
103 propose_lock: Arc<TMutex<u64>>,
105}
106
107impl<N: Network> Primary<N> {
108 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
110
111 pub fn new(
113 account: Account<N>,
114 storage: Storage<N>,
115 ledger: Arc<dyn LedgerService<N>>,
116 ip: Option<SocketAddr>,
117 trusted_validators: &[SocketAddr],
118 dev: Option<u16>,
119 ) -> Result<Self> {
120 let gateway = Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, dev)?;
122 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone());
124
125 Ok(Self {
127 sync,
128 gateway,
129 storage,
130 ledger,
131 workers: Arc::from(vec![]),
132 bft_sender: Default::default(),
133 proposed_batch: Default::default(),
134 latest_proposed_batch_timestamp: Default::default(),
135 signed_proposals: Default::default(),
136 handles: Default::default(),
137 propose_lock: Default::default(),
138 })
139 }
140
141 async fn load_proposal_cache(&self) -> Result<()> {
143 match ProposalCache::<N>::exists(self.gateway.dev()) {
145 true => match ProposalCache::<N>::load(self.gateway.account().address(), self.gateway.dev()) {
147 Ok(proposal_cache) => {
148 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
150 proposal_cache.into();
151
152 *self.proposed_batch.write() = proposed_batch;
154 *self.signed_proposals.write() = signed_proposals;
156 *self.propose_lock.lock().await = latest_certificate_round;
158
159 for certificate in pending_certificates {
161 let batch_id = certificate.batch_id();
162 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
166 {
167 warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
168 }
169 }
170 Ok(())
171 }
172 Err(err) => {
173 bail!("Failed to read the signed proposals from the file system - {err}.");
174 }
175 },
176 false => Ok(()),
178 }
179 }
180
181 pub async fn run(
183 &mut self,
184 bft_sender: Option<BFTSender<N>>,
185 primary_sender: PrimarySender<N>,
186 primary_receiver: PrimaryReceiver<N>,
187 ) -> Result<()> {
188 info!("Starting the primary instance of the memory pool...");
189
190 if let Some(bft_sender) = &bft_sender {
192 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
194 }
195
196 let mut worker_senders = IndexMap::new();
198 let mut workers = Vec::new();
200 for id in 0..MAX_WORKERS {
202 let (tx_worker, rx_worker) = init_worker_channels();
204 let worker = Worker::new(
206 id,
207 Arc::new(self.gateway.clone()),
208 self.storage.clone(),
209 self.ledger.clone(),
210 self.proposed_batch.clone(),
211 )?;
212 worker.run(rx_worker);
214 workers.push(worker);
216 worker_senders.insert(id, tx_worker);
218 }
219 self.workers = Arc::from(workers);
221
222 let (sync_sender, sync_receiver) = init_sync_channels();
224 self.sync.initialize(bft_sender).await?;
226 self.load_proposal_cache().await?;
228 self.sync.run(sync_receiver).await?;
230 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
232 self.start_handlers(primary_receiver);
235
236 Ok(())
237 }
238
239 pub fn current_round(&self) -> u64 {
241 self.storage.current_round()
242 }
243
244 pub fn is_synced(&self) -> bool {
246 self.sync.is_synced()
247 }
248
249 pub const fn gateway(&self) -> &Gateway<N> {
251 &self.gateway
252 }
253
254 pub const fn storage(&self) -> &Storage<N> {
256 &self.storage
257 }
258
259 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
261 &self.ledger
262 }
263
264 pub fn num_workers(&self) -> u8 {
266 u8::try_from(self.workers.len()).expect("Too many workers")
267 }
268
269 pub const fn workers(&self) -> &Arc<[Worker<N>]> {
271 &self.workers
272 }
273
274 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
276 &self.proposed_batch
277 }
278}
279
280impl<N: Network> Primary<N> {
281 pub fn num_unconfirmed_transmissions(&self) -> usize {
283 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
284 }
285
286 pub fn num_unconfirmed_ratifications(&self) -> usize {
288 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
289 }
290
291 pub fn num_unconfirmed_solutions(&self) -> usize {
293 self.workers.iter().map(|worker| worker.num_solutions()).sum()
294 }
295
296 pub fn num_unconfirmed_transactions(&self) -> usize {
298 self.workers.iter().map(|worker| worker.num_transactions()).sum()
299 }
300}
301
302impl<N: Network> Primary<N> {
303 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
305 self.workers.iter().flat_map(|worker| worker.transmission_ids())
306 }
307
308 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
310 self.workers.iter().flat_map(|worker| worker.transmissions())
311 }
312
313 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
315 self.workers.iter().flat_map(|worker| worker.solutions())
316 }
317
318 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
320 self.workers.iter().flat_map(|worker| worker.transactions())
321 }
322}
323
324impl<N: Network> Primary<N> {
325 pub fn clear_worker_solutions(&self) {
327 self.workers.iter().for_each(Worker::clear_solutions);
328 }
329}
330
331impl<N: Network> Primary<N> {
332 pub async fn propose_batch(&self) -> Result<()> {
340 let mut lock_guard = self.propose_lock.lock().await;
342
343 if let Err(e) = self.check_proposed_batch_for_expiration().await {
345 warn!("Failed to check the proposed batch for expiration - {e}");
346 return Ok(());
347 }
348
349 let round = self.current_round();
351 let previous_round = round.saturating_sub(1);
353
354 ensure!(round > 0, "Round 0 cannot have transaction batches");
356
357 if round < *lock_guard {
359 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
360 return Ok(());
361 }
362
363 if let Some(proposal) = self.proposed_batch.read().as_ref() {
366 if round < proposal.round()
368 || proposal
369 .batch_header()
370 .previous_certificate_ids()
371 .iter()
372 .any(|id| !self.storage.contains_certificate(*id))
373 {
374 warn!(
375 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
376 proposal.round(),
377 );
378 return Ok(());
379 }
380 let event = Event::BatchPropose(proposal.batch_header().clone().into());
383 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
385 match self.gateway.resolver().get_peer_ip_for_address(address) {
387 Some(peer_ip) => {
389 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
390 tokio::spawn(async move {
391 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
392 if gateway.send(peer_ip, event_).await.is_none() {
394 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
395 }
396 });
397 }
398 None => continue,
399 }
400 }
401 debug!("Proposed batch for round {} is still valid", proposal.round());
402 return Ok(());
403 }
404
405 #[cfg(feature = "metrics")]
406 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
407
408 if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
410 debug!("Primary is safely skipping a batch proposal - {}", format!("{e}").dimmed());
411 return Ok(());
412 }
413
414 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
416 if let Some(bft_sender) = self.bft_sender.get() {
418 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
419 Ok(true) => (), Ok(false) => return Ok(()),
423 Err(e) => {
425 warn!("Failed to update the BFT to the next round - {e}");
426 return Err(e);
427 }
428 }
429 }
430 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
431 return Ok(());
432 }
433
434 if round == *lock_guard {
440 warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
441 return Ok(());
442 }
443
444 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
446 {
448 let mut connected_validators = self.gateway.connected_addresses();
450 connected_validators.insert(self.gateway.account().address());
452 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
454 debug!(
455 "Primary is safely skipping a batch proposal {}",
456 "(please connect to more validators)".dimmed()
457 );
458 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
459 return Ok(());
460 }
461 }
462
463 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
465
466 let mut is_ready = previous_round == 0;
469 if previous_round > 0 {
471 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
473 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
474 };
475 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
477 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
479 is_ready = true;
480 }
481 }
482 if !is_ready {
484 debug!(
485 "Primary is safely skipping a batch proposal {}",
486 format!("(previous round {previous_round} has not reached quorum)").dimmed()
487 );
488 return Ok(());
489 }
490
491 let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
493 let mut transmissions: IndexMap<_, _> = Default::default();
495 for worker in self.workers.iter() {
497 let mut num_transmissions_included_for_worker = 0;
499 'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
501 let num_remaining_transmissions =
503 num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
504 let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
506 if worker_transmissions.peek().is_none() {
508 break 'outer;
509 }
510 'inner: for (id, transmission) in worker_transmissions {
512 if self.ledger.contains_transmission(&id).unwrap_or(true) {
514 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
515 continue 'inner;
516 }
517 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
521 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
522 continue 'inner;
523 }
524 match (id, transmission.clone()) {
526 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
527 match solution.to_checksum::<N>() {
529 Ok(solution_checksum) if solution_checksum == checksum => (),
530 _ => {
531 trace!(
532 "Proposing - Skipping solution '{}' - Checksum mismatch",
533 fmt_id(solution_id)
534 );
535 continue 'inner;
536 }
537 }
538 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
540 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
541 continue 'inner;
542 }
543 }
544 (
545 TransmissionID::Transaction(transaction_id, checksum),
546 Transmission::Transaction(transaction),
547 ) => {
548 match transaction.to_checksum::<N>() {
550 Ok(transaction_checksum) if transaction_checksum == checksum => (),
551 _ => {
552 trace!(
553 "Proposing - Skipping transaction '{}' - Checksum mismatch",
554 fmt_id(transaction_id)
555 );
556 continue 'inner;
557 }
558 }
559 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
561 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
562 continue 'inner;
563 }
564 }
565 (TransmissionID::Ratification, Transmission::Ratification) => continue,
568 _ => continue 'inner,
570 }
571 transmissions.insert(id, transmission);
573 num_transmissions_included_for_worker += 1;
574 }
575 }
576 }
577
578 let current_timestamp = now();
580
581 *lock_guard = round;
582
583 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
585
586 let private_key = *self.gateway.account().private_key();
588 let committee_id = committee_lookback.id();
590 let transmission_ids = transmissions.keys().copied().collect();
592 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
594 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
596 &private_key,
597 round,
598 current_timestamp,
599 committee_id,
600 transmission_ids,
601 previous_certificate_ids,
602 &mut rand::thread_rng()
603 ))
604 .and_then(|batch_header| {
605 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
606 .map(|proposal| (batch_header, proposal))
607 })
608 .inspect_err(|_| {
609 if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
611 error!("Failed to reinsert transmissions: {e:?}");
612 }
613 })?;
614 self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
616 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
618 *self.proposed_batch.write() = Some(proposal);
620 Ok(())
621 }
622
623 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
633 let BatchPropose { round: batch_round, batch_header } = batch_propose;
634
635 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
637 if batch_round != batch_header.round() {
639 self.gateway.disconnect(peer_ip);
641 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
642 }
643
644 let batch_author = batch_header.author();
646
647 match self.gateway.resolver().get_address(peer_ip) {
649 Some(address) => {
651 if address != batch_author {
652 self.gateway.disconnect(peer_ip);
654 bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
655 }
656 }
657 None => bail!("Batch proposal from a disconnected validator"),
658 }
659 if !self.gateway.is_authorized_validator_address(batch_author) {
661 self.gateway.disconnect(peer_ip);
663 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
664 }
665 if self.gateway.account().address() == batch_author {
667 bail!("Invalid peer - proposed batch from myself ({batch_author})");
668 }
669
670 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
672 if expected_committee_id != batch_header.committee_id() {
673 self.gateway.disconnect(peer_ip);
675 bail!(
676 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
677 batch_header.committee_id()
678 );
679 }
680
681 if let Some((signed_round, signed_batch_id, signature)) =
683 self.signed_proposals.read().get(&batch_author).copied()
684 {
685 if signed_round > batch_header.round() {
687 bail!("Peer ({batch_author}) proposed a batch for a previous round ({})", batch_header.round());
688 }
689
690 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
692 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
693 }
694 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
697 let gateway = self.gateway.clone();
698 tokio::spawn(async move {
699 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
700 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
701 if gateway.send(peer_ip, event).await.is_none() {
703 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
704 }
705 });
706 return Ok(());
708 }
709 }
710
711 if self.storage.contains_batch(batch_header.batch_id()) {
714 debug!(
715 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
716 format!("batch for round {batch_round} already exists in storage").dimmed()
717 );
718 return Ok(());
719 }
720
721 let previous_round = batch_round.saturating_sub(1);
723 if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
725 self.gateway.disconnect(peer_ip);
727 bail!("Malicious peer - {e} from '{peer_ip}'");
728 }
729
730 if batch_header.contains(TransmissionID::Ratification) {
732 self.gateway.disconnect(peer_ip);
734 bail!("Malicious peer - proposed batch contains an unsupported ratification transmissionID",);
735 }
736
737 let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
739
740 if let Err(err) = cfg_iter_mut!(missing_transmissions).try_for_each(|(transmission_id, transmission)| {
742 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
744 }) {
745 debug!("Batch propose from '{peer_ip}' contains an invalid transmission - {err}",);
746 return Ok(());
747 }
748
749 if let Err(e) = self.ensure_is_signing_round(batch_round) {
753 debug!("{e} from '{peer_ip}'");
755 return Ok(());
756 }
757
758 let (storage, header) = (self.storage.clone(), batch_header.clone());
760 let missing_transmissions =
761 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
762 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
764
765 let batch_id = batch_header.batch_id();
769 let account = self.gateway.account().clone();
771 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
772
773 match self.signed_proposals.write().0.entry(batch_author) {
779 std::collections::hash_map::Entry::Occupied(mut entry) => {
780 if entry.get().0 == batch_round {
785 return Ok(());
786 }
787 entry.insert((batch_round, batch_id, signature));
789 }
790 std::collections::hash_map::Entry::Vacant(entry) => {
792 entry.insert((batch_round, batch_id, signature));
794 }
795 };
796
797 let self_ = self.clone();
799 tokio::spawn(async move {
800 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
801 if self_.gateway.send(peer_ip, event).await.is_some() {
803 debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
804 }
805 });
806 Ok(())
807 }
808
809 async fn process_batch_signature_from_peer(
818 &self,
819 peer_ip: SocketAddr,
820 batch_signature: BatchSignature<N>,
821 ) -> Result<()> {
822 self.check_proposed_batch_for_expiration().await?;
824
825 let BatchSignature { batch_id, signature } = batch_signature;
827
828 let signer = signature.to_address();
830
831 if self.gateway.resolver().get_address(peer_ip).map_or(true, |address| address != signer) {
833 self.gateway.disconnect(peer_ip);
835 bail!("Malicious peer - batch signature is from a different validator ({signer})");
836 }
837 if self.gateway.account().address() == signer {
839 bail!("Invalid peer - received a batch signature from myself ({signer})");
840 }
841
842 let self_ = self.clone();
843 let Some(proposal) = spawn_blocking!({
844 let mut proposed_batch = self_.proposed_batch.write();
846 match proposed_batch.as_mut() {
848 Some(proposal) => {
849 if proposal.batch_id() != batch_id {
851 match self_.storage.contains_batch(batch_id) {
852 true => {
854 debug!(
855 "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
856 proposal.round()
857 );
858 return Ok(None);
859 }
860 false => bail!(
862 "Unknown batch ID '{batch_id}', expected '{}' for round {}",
863 proposal.batch_id(),
864 proposal.round()
865 ),
866 }
867 }
868 let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
870 let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
872 bail!("Signature is from a disconnected validator");
873 };
874 proposal.add_signature(signer, signature, &committee_lookback)?;
876 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
877 if !proposal.is_quorum_threshold_reached(&committee_lookback) {
879 return Ok(None);
881 }
882 }
883 None => return Ok(None),
885 };
886 match proposed_batch.take() {
888 Some(proposal) => Ok(Some(proposal)),
889 None => Ok(None),
890 }
891 })?
892 else {
893 return Ok(());
894 };
895
896 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
899
900 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
902 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
905 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
907 return Err(e);
908 }
909
910 #[cfg(feature = "metrics")]
911 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
912 Ok(())
913 }
914
915 async fn process_batch_certificate_from_peer(
922 &self,
923 peer_ip: SocketAddr,
924 certificate: BatchCertificate<N>,
925 ) -> Result<()> {
926 if !self.gateway.is_authorized_validator_ip(peer_ip) {
928 self.gateway.disconnect(peer_ip);
930 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
931 }
932 if self.storage.contains_certificate(certificate.id()) {
934 return Ok(());
935 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
937 self.storage.insert_unprocessed_certificate(certificate.clone())?;
938 }
939
940 let author = certificate.author();
942 let certificate_round = certificate.round();
944 let committee_id = certificate.committee_id();
946
947 if self.gateway.account().address() == author {
949 bail!("Received a batch certificate for myself ({author})");
950 }
951
952 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
954
955 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
960 let authors = self.storage.get_certificate_authors_for_round(certificate_round);
962 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
964
965 let expected_committee_id = committee_lookback.id();
967 if expected_committee_id != committee_id {
968 self.gateway.disconnect(peer_ip);
970 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
971 }
972
973 let should_advance = match &*self.proposed_batch.read() {
977 Some(proposal) => proposal.round() < certificate_round,
979 None => true,
981 };
982
983 let current_round = self.current_round();
985
986 if is_quorum && should_advance && certificate_round >= current_round {
988 self.try_increment_to_the_next_round(current_round + 1).await?;
990 }
991 Ok(())
992 }
993}
994
995impl<N: Network> Primary<N> {
996 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
998 let PrimaryReceiver {
999 mut rx_batch_propose,
1000 mut rx_batch_signature,
1001 mut rx_batch_certified,
1002 mut rx_primary_ping,
1003 mut rx_unconfirmed_solution,
1004 mut rx_unconfirmed_transaction,
1005 } = primary_receiver;
1006
1007 if self.sync.is_gateway_mode() {
1009 let self_ = self.clone();
1010 self.spawn(async move {
1011 loop {
1012 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1014
1015 let self__ = self_.clone();
1017 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1018 Ok(block_locators) => block_locators,
1019 Err(e) => {
1020 warn!("Failed to retrieve block locators - {e}");
1021 continue;
1022 }
1023 };
1024
1025 let primary_certificate = {
1027 let primary_address = self_.gateway.account().address();
1029
1030 let mut certificate = None;
1032 let mut current_round = self_.current_round();
1033 while certificate.is_none() {
1034 if current_round == 0 {
1036 break;
1037 }
1038 if let Some(primary_certificate) =
1040 self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1041 {
1042 certificate = Some(primary_certificate);
1043 } else {
1045 current_round = current_round.saturating_sub(1);
1046 }
1047 }
1048
1049 match certificate {
1051 Some(certificate) => certificate,
1052 None => continue,
1054 }
1055 };
1056
1057 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1059 self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1061 }
1062 });
1063 }
1064
1065 let self_ = self.clone();
1067 self.spawn(async move {
1068 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1069 if !self_.sync.is_synced() {
1071 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1072 continue;
1073 }
1074
1075 {
1077 let self_ = self_.clone();
1078 tokio::spawn(async move {
1079 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1081 else {
1082 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1083 return;
1084 };
1085 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1087 warn!("Cannot process a primary certificate in a 'PrimaryPing' from '{peer_ip}' - {e}");
1088 }
1089 });
1090 }
1091 }
1092 });
1093
1094 if self.sync.is_gateway_mode() {
1096 let self_ = self.clone();
1097 self.spawn(async move {
1098 loop {
1099 tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1100 if !self_.sync.is_synced() {
1102 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1103 continue;
1104 }
1105 for worker in self_.workers.iter() {
1107 worker.broadcast_ping();
1108 }
1109 }
1110 });
1111 }
1112
1113 let self_ = self.clone();
1115 self.spawn(async move {
1116 loop {
1117 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1119 if !self_.sync.is_synced() {
1121 debug!("Skipping batch proposal {}", "(node is syncing)".dimmed());
1122 continue;
1123 }
1124 if self_.propose_lock.try_lock().is_err() {
1127 trace!("Skipping batch proposal {}", "(node is already proposing)".dimmed());
1128 continue;
1129 };
1130 if let Err(e) = self_.propose_batch().await {
1134 warn!("Cannot propose a batch - {e}");
1135 }
1136 }
1137 });
1138
1139 let self_ = self.clone();
1141 self.spawn(async move {
1142 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1143 if !self_.sync.is_synced() {
1145 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1146 continue;
1147 }
1148 let self_ = self_.clone();
1150 tokio::spawn(async move {
1151 if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1153 warn!("Cannot sign a batch from '{peer_ip}' - {e}");
1154 }
1155 });
1156 }
1157 });
1158
1159 let self_ = self.clone();
1161 self.spawn(async move {
1162 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1163 if !self_.sync.is_synced() {
1165 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1166 continue;
1167 }
1168 if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1174 warn!("Cannot store a signature from '{peer_ip}' - {e}");
1175 }
1176 }
1177 });
1178
1179 let self_ = self.clone();
1181 self.spawn(async move {
1182 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1183 if !self_.sync.is_synced() {
1185 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1186 continue;
1187 }
1188 let self_ = self_.clone();
1190 tokio::spawn(async move {
1191 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1193 warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1194 return;
1195 };
1196 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1198 warn!("Cannot store a certificate from '{peer_ip}' - {e}");
1199 }
1200 });
1201 }
1202 });
1203
1204 let self_ = self.clone();
1208 self.spawn(async move {
1209 loop {
1210 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1212 if !self_.sync.is_synced() {
1214 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1215 continue;
1216 }
1217 let next_round = self_.current_round().saturating_add(1);
1219 let is_quorum_threshold_reached = {
1221 let authors = self_.storage.get_certificate_authors_for_round(next_round);
1223 if authors.is_empty() {
1225 continue;
1226 }
1227 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
1228 warn!("Failed to retrieve the committee lookback for round {next_round}");
1229 continue;
1230 };
1231 committee_lookback.is_quorum_threshold_reached(&authors)
1232 };
1233 if is_quorum_threshold_reached {
1235 debug!("Quorum threshold reached for round {}", next_round);
1236 if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1237 warn!("Failed to increment to the next round - {e}");
1238 }
1239 }
1240 }
1241 });
1242
1243 let self_ = self.clone();
1245 self.spawn(async move {
1246 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1247 let Ok(checksum) = solution.to_checksum::<N>() else {
1249 error!("Failed to compute the checksum for the unconfirmed solution");
1250 continue;
1251 };
1252 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1254 error!("Unable to determine the worker ID for the unconfirmed solution");
1255 continue;
1256 };
1257 let self_ = self_.clone();
1258 tokio::spawn(async move {
1259 let worker = &self_.workers[worker_id as usize];
1261 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1263 callback.send(result).ok();
1265 });
1266 }
1267 });
1268
1269 let self_ = self.clone();
1271 self.spawn(async move {
1272 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1273 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1274 let Ok(checksum) = transaction.to_checksum::<N>() else {
1276 error!("Failed to compute the checksum for the unconfirmed transaction");
1277 continue;
1278 };
1279 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1281 error!("Unable to determine the worker ID for the unconfirmed transaction");
1282 continue;
1283 };
1284 let self_ = self_.clone();
1285 tokio::spawn(async move {
1286 let worker = &self_.workers[worker_id as usize];
1288 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1290 callback.send(result).ok();
1292 });
1293 }
1294 });
1295 }
1296
1297 async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1299 let is_expired = match self.proposed_batch.read().as_ref() {
1301 Some(proposal) => proposal.round() < self.current_round(),
1302 None => false,
1303 };
1304 if is_expired {
1306 let proposal = self.proposed_batch.write().take();
1308 if let Some(proposal) = proposal {
1309 debug!("Cleared expired proposal for round {}", proposal.round());
1310 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1311 }
1312 }
1313 Ok(())
1314 }
1315
1316 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1318 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1320 let mut fast_forward_round = self.current_round();
1321 while fast_forward_round < next_round.saturating_sub(1) {
1323 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1325 *self.proposed_batch.write() = None;
1327 }
1328 }
1329
1330 let current_round = self.current_round();
1332 if current_round < next_round {
1334 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1336 match bft_sender.send_primary_round_to_bft(current_round).await {
1337 Ok(is_ready) => is_ready,
1338 Err(e) => {
1339 warn!("Failed to update the BFT to the next round - {e}");
1340 return Err(e);
1341 }
1342 }
1343 }
1344 else {
1346 self.storage.increment_to_next_round(current_round)?;
1348 true
1350 };
1351
1352 match is_ready {
1354 true => debug!("Primary is ready to propose the next round"),
1355 false => debug!("Primary is not ready to propose the next round"),
1356 }
1357
1358 if is_ready {
1360 self.propose_batch().await?;
1361 }
1362 }
1363 Ok(())
1364 }
1365
1366 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1370 let current_round = self.current_round();
1372 if current_round + self.storage.max_gc_rounds() <= batch_round {
1374 bail!("Round {batch_round} is too far in the future")
1375 }
1376 if current_round > batch_round + 1 {
1380 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1381 }
1382 if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1384 if signing_round > batch_round {
1385 bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1386 }
1387 }
1388 Ok(())
1389 }
1390
1391 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1394 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1396 Some(certificate) => certificate.timestamp(),
1398 None => match self.gateway.account().address() == author {
1399 true => *self.latest_proposed_batch_timestamp.read(),
1401 false => return Ok(()),
1403 },
1404 };
1405
1406 let elapsed = timestamp
1408 .checked_sub(previous_timestamp)
1409 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1410 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1412 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1413 false => Ok(()),
1414 }
1415 }
1416
1417 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1419 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1421 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1424 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1426 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1427 debug!("Stored a batch certificate for round {}", certificate.round());
1428 if let Some(bft_sender) = self.bft_sender.get() {
1430 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1432 warn!("Failed to update the BFT DAG from primary - {e}");
1433 return Err(e);
1434 };
1435 }
1436 self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1438 let num_transmissions = certificate.transmission_ids().len();
1440 let round = certificate.round();
1441 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1442 self.try_increment_to_the_next_round(round + 1).await
1444 }
1445
1446 fn insert_missing_transmissions_into_workers(
1448 &self,
1449 peer_ip: SocketAddr,
1450 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1451 ) -> Result<()> {
1452 assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1454 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1455 })
1456 }
1457
1458 fn reinsert_transmissions_into_workers(
1460 &self,
1461 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1462 ) -> Result<()> {
1463 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1465 worker.reinsert(transmission_id, transmission);
1466 })
1467 }
1468
1469 #[async_recursion::async_recursion]
1479 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1480 &self,
1481 peer_ip: SocketAddr,
1482 certificate: BatchCertificate<N>,
1483 ) -> Result<()> {
1484 let batch_header = certificate.batch_header();
1486 let batch_round = batch_header.round();
1488
1489 if batch_round <= self.storage.gc_round() {
1491 return Ok(());
1492 }
1493 if self.storage.contains_certificate(certificate.id()) {
1495 return Ok(());
1496 }
1497
1498 if !IS_SYNCING && !self.is_synced() {
1500 bail!(
1501 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1502 fmt_id(certificate.id())
1503 );
1504 }
1505
1506 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1508
1509 if !self.storage.contains_certificate(certificate.id()) {
1511 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1513 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1514 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1515 if let Some(bft_sender) = self.bft_sender.get() {
1517 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1519 warn!("Failed to update the BFT DAG from sync: {e}");
1520 return Err(e);
1521 };
1522 }
1523 }
1524 Ok(())
1525 }
1526
1527 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1529 &self,
1530 peer_ip: SocketAddr,
1531 batch_header: &BatchHeader<N>,
1532 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1533 let batch_round = batch_header.round();
1535
1536 if batch_round <= self.storage.gc_round() {
1538 bail!("Round {batch_round} is too far in the past")
1539 }
1540
1541 if !IS_SYNCING && !self.is_synced() {
1543 bail!(
1544 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1545 fmt_id(batch_header.batch_id())
1546 );
1547 }
1548
1549 let is_quorum_threshold_reached = {
1551 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1552 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1553 committee_lookback.is_quorum_threshold_reached(&authors)
1554 };
1555
1556 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1561 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1563 if is_behind_schedule || is_peer_far_in_future {
1565 self.try_increment_to_the_next_round(batch_round).await?;
1567 }
1568
1569 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1571
1572 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1574
1575 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1577 missing_transmissions_handle,
1578 missing_previous_certificates_handle,
1579 ).map_err(|e| {
1580 anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1581 })?;
1582
1583 for batch_certificate in missing_previous_certificates {
1585 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1587 }
1588 Ok(missing_transmissions)
1589 }
1590
1591 async fn fetch_missing_transmissions(
1594 &self,
1595 peer_ip: SocketAddr,
1596 batch_header: &BatchHeader<N>,
1597 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1598 if batch_header.round() <= self.storage.gc_round() {
1600 return Ok(Default::default());
1601 }
1602
1603 if self.storage.contains_batch(batch_header.batch_id()) {
1605 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1606 return Ok(Default::default());
1607 }
1608
1609 let workers = self.workers.clone();
1611
1612 let mut fetch_transmissions = FuturesUnordered::new();
1614
1615 let num_workers = self.num_workers();
1617 for transmission_id in batch_header.transmission_ids() {
1619 if !self.storage.contains_transmission(*transmission_id) {
1621 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1623 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1624 };
1625 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1627 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1629 }
1630 }
1631
1632 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1634 while let Some(result) = fetch_transmissions.next().await {
1636 let (transmission_id, transmission) = result?;
1638 transmissions.insert(transmission_id, transmission);
1640 }
1641 Ok(transmissions)
1643 }
1644
1645 async fn fetch_missing_previous_certificates(
1647 &self,
1648 peer_ip: SocketAddr,
1649 batch_header: &BatchHeader<N>,
1650 ) -> Result<HashSet<BatchCertificate<N>>> {
1651 let round = batch_header.round();
1653 if round == 1 || round <= self.storage.gc_round() + 1 {
1655 return Ok(Default::default());
1656 }
1657
1658 let missing_previous_certificates =
1660 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1661 if !missing_previous_certificates.is_empty() {
1662 debug!(
1663 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1664 missing_previous_certificates.len(),
1665 );
1666 }
1667 Ok(missing_previous_certificates)
1669 }
1670
1671 async fn fetch_missing_certificates(
1673 &self,
1674 peer_ip: SocketAddr,
1675 round: u64,
1676 certificate_ids: &IndexSet<Field<N>>,
1677 ) -> Result<HashSet<BatchCertificate<N>>> {
1678 let mut fetch_certificates = FuturesUnordered::new();
1680 let mut missing_certificates = HashSet::default();
1682 for certificate_id in certificate_ids {
1684 if self.ledger.contains_certificate(certificate_id)? {
1686 continue;
1687 }
1688 if self.storage.contains_certificate(*certificate_id) {
1690 continue;
1691 }
1692 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1694 missing_certificates.insert(certificate);
1695 } else {
1696 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1698 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1701 }
1702 }
1703
1704 match fetch_certificates.is_empty() {
1706 true => return Ok(missing_certificates),
1707 false => trace!(
1708 "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1709 fetch_certificates.len(),
1710 ),
1711 }
1712
1713 while let Some(result) = fetch_certificates.next().await {
1715 missing_certificates.insert(result?);
1717 }
1718 Ok(missing_certificates)
1720 }
1721}
1722
1723impl<N: Network> Primary<N> {
1724 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1726 self.handles.lock().push(tokio::spawn(future));
1727 }
1728
1729 pub async fn shut_down(&self) {
1731 info!("Shutting down the primary...");
1732 self.workers.iter().for_each(|worker| worker.shut_down());
1734 self.handles.lock().iter().for_each(|handle| handle.abort());
1736 let proposal_cache = {
1738 let proposal = self.proposed_batch.write().take();
1739 let signed_proposals = self.signed_proposals.read().clone();
1740 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1741 let pending_certificates = self.storage.get_pending_certificates();
1742 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1743 };
1744 if let Err(err) = proposal_cache.store(self.gateway.dev()) {
1745 error!("Failed to store the current proposal cache: {err}");
1746 }
1747 self.gateway.shut_down().await;
1749 }
1750}
1751
1752#[cfg(test)]
1753mod tests {
1754 use super::*;
1755 use snarkos_node_bft_ledger_service::MockLedgerService;
1756 use snarkos_node_bft_storage_service::BFTMemoryService;
1757 use snarkvm::{
1758 ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1759 prelude::{Address, Signature},
1760 };
1761
1762 use bytes::Bytes;
1763 use indexmap::IndexSet;
1764 use rand::RngCore;
1765
1766 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1767
1768 async fn primary_without_handlers(
1770 rng: &mut TestRng,
1771 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1772 let (accounts, committee) = {
1774 const COMMITTEE_SIZE: usize = 4;
1775 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1776 let mut members = IndexMap::new();
1777
1778 for i in 0..COMMITTEE_SIZE {
1779 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1780 let account = Account::new(rng).unwrap();
1781 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1782 accounts.push((socket_addr, account));
1783 }
1784
1785 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1786 };
1787
1788 let account = accounts.first().unwrap().1.clone();
1789 let ledger = Arc::new(MockLedgerService::new(committee));
1790 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1791
1792 let mut primary = Primary::new(account, storage, ledger, None, &[], None).unwrap();
1794
1795 primary.workers = Arc::from([Worker::new(
1797 0, Arc::new(primary.gateway.clone()),
1799 primary.storage.clone(),
1800 primary.ledger.clone(),
1801 primary.proposed_batch.clone(),
1802 )
1803 .unwrap()]);
1804 for a in accounts.iter() {
1805 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
1806 }
1807
1808 (primary, accounts)
1809 }
1810
1811 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1813 let solution_id = rng.gen::<u64>().into();
1815 let size = rng.gen_range(1024..10 * 1024);
1817 let mut vec = vec![0u8; size];
1819 rng.fill_bytes(&mut vec);
1820 let solution = Data::Buffer(Bytes::from(vec));
1821 (solution_id, solution)
1823 }
1824
1825 fn sample_unconfirmed_transaction(
1827 rng: &mut TestRng,
1828 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1829 let id = Field::<CurrentNetwork>::rand(rng).into();
1831 let size = rng.gen_range(1024..10 * 1024);
1833 let mut vec = vec![0u8; size];
1835 rng.fill_bytes(&mut vec);
1836 let transaction = Data::Buffer(Bytes::from(vec));
1837 (id, transaction)
1839 }
1840
1841 fn create_test_proposal(
1843 author: &Account<CurrentNetwork>,
1844 committee: Committee<CurrentNetwork>,
1845 round: u64,
1846 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1847 timestamp: i64,
1848 rng: &mut TestRng,
1849 ) -> Proposal<CurrentNetwork> {
1850 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1851 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1852 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1853 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1854
1855 let solution_transmission_id = (solution_id, solution_checksum).into();
1856 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1857
1858 let private_key = author.private_key();
1860 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1862 let transmissions = [
1863 (solution_transmission_id, Transmission::Solution(solution)),
1864 (transaction_transmission_id, Transmission::Transaction(transaction)),
1865 ]
1866 .into();
1867 let batch_header = BatchHeader::new(
1869 private_key,
1870 round,
1871 timestamp,
1872 committee.id(),
1873 transmission_ids,
1874 previous_certificate_ids,
1875 rng,
1876 )
1877 .unwrap();
1878 Proposal::new(committee, batch_header, transmissions).unwrap()
1880 }
1881
1882 fn peer_signatures_for_proposal(
1885 primary: &Primary<CurrentNetwork>,
1886 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1887 rng: &mut TestRng,
1888 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
1889 let mut signatures = Vec::with_capacity(accounts.len() - 1);
1891 for (socket_addr, account) in accounts {
1892 if account.address() == primary.gateway.account().address() {
1893 continue;
1894 }
1895 let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
1896 let signature = account.sign(&[batch_id], rng).unwrap();
1897 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
1898 }
1899
1900 signatures
1901 }
1902
1903 fn peer_signatures_for_batch(
1905 primary_address: Address<CurrentNetwork>,
1906 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1907 batch_id: Field<CurrentNetwork>,
1908 rng: &mut TestRng,
1909 ) -> IndexSet<Signature<CurrentNetwork>> {
1910 let mut signatures = IndexSet::new();
1911 for (_, account) in accounts {
1912 if account.address() == primary_address {
1913 continue;
1914 }
1915 let signature = account.sign(&[batch_id], rng).unwrap();
1916 signatures.insert(signature);
1917 }
1918 signatures
1919 }
1920
1921 fn create_batch_certificate(
1923 primary_address: Address<CurrentNetwork>,
1924 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1925 round: u64,
1926 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1927 rng: &mut TestRng,
1928 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1929 let timestamp = now();
1930
1931 let author =
1932 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1933 let private_key = author.private_key();
1934
1935 let committee_id = Field::rand(rng);
1936 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1937 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1938 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1939 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1940
1941 let solution_transmission_id = (solution_id, solution_checksum).into();
1942 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1943
1944 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1945 let transmissions = [
1946 (solution_transmission_id, Transmission::Solution(solution)),
1947 (transaction_transmission_id, Transmission::Transaction(transaction)),
1948 ]
1949 .into();
1950
1951 let batch_header = BatchHeader::new(
1952 private_key,
1953 round,
1954 timestamp,
1955 committee_id,
1956 transmission_ids,
1957 previous_certificate_ids,
1958 rng,
1959 )
1960 .unwrap();
1961 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1962 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1963 (certificate, transmissions)
1964 }
1965
1966 fn store_certificate_chain(
1968 primary: &Primary<CurrentNetwork>,
1969 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1970 round: u64,
1971 rng: &mut TestRng,
1972 ) -> IndexSet<Field<CurrentNetwork>> {
1973 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1974 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1975 for cur_round in 1..round {
1976 for (_, account) in accounts.iter() {
1977 let (certificate, transmissions) = create_batch_certificate(
1978 account.address(),
1979 accounts,
1980 cur_round,
1981 previous_certificates.clone(),
1982 rng,
1983 );
1984 next_certificates.insert(certificate.id());
1985 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1986 }
1987
1988 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
1989 previous_certificates = next_certificates;
1990 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1991 }
1992
1993 previous_certificates
1994 }
1995
1996 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
1999 for (addr, acct) in accounts.iter().skip(1) {
2001 primary.gateway.resolver().insert_peer(*addr, *addr, acct.address());
2002 }
2003 }
2004
2005 #[tokio::test]
2006 async fn test_propose_batch() {
2007 let mut rng = TestRng::default();
2008 let (primary, _) = primary_without_handlers(&mut rng).await;
2009
2010 assert!(primary.proposed_batch.read().is_none());
2012
2013 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2015 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2016
2017 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2019 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2020
2021 assert!(primary.propose_batch().await.is_ok());
2023 assert!(primary.proposed_batch.read().is_some());
2024 }
2025
2026 #[tokio::test]
2027 async fn test_propose_batch_with_no_transmissions() {
2028 let mut rng = TestRng::default();
2029 let (primary, _) = primary_without_handlers(&mut rng).await;
2030
2031 assert!(primary.proposed_batch.read().is_none());
2033
2034 assert!(primary.propose_batch().await.is_ok());
2036 assert!(primary.proposed_batch.read().is_some());
2037 }
2038
2039 #[tokio::test]
2040 async fn test_propose_batch_in_round() {
2041 let round = 3;
2042 let mut rng = TestRng::default();
2043 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2044
2045 store_certificate_chain(&primary, &accounts, round, &mut rng);
2047
2048 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2050
2051 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2053 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2054
2055 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2057 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2058
2059 assert!(primary.propose_batch().await.is_ok());
2061 assert!(primary.proposed_batch.read().is_some());
2062 }
2063
2064 #[tokio::test]
2065 async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2066 let round = 3;
2067 let prev_round = round - 1;
2068 let mut rng = TestRng::default();
2069 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2070 let peer_account = &accounts[1];
2071 let peer_ip = peer_account.0;
2072
2073 store_certificate_chain(&primary, &accounts, round, &mut rng);
2075
2076 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2078
2079 let mut num_transmissions_in_previous_round = 0;
2081
2082 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2084 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2085 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2086 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2087
2088 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2090 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2091
2092 assert_eq!(primary.workers[0].num_transmissions(), 2);
2094
2095 for (_, account) in accounts.iter() {
2097 let (certificate, transmissions) = create_batch_certificate(
2098 account.address(),
2099 &accounts,
2100 round,
2101 previous_certificate_ids.clone(),
2102 &mut rng,
2103 );
2104
2105 for (transmission_id, transmission) in transmissions.iter() {
2107 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2108 }
2109
2110 num_transmissions_in_previous_round += transmissions.len();
2112 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2113 }
2114
2115 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2117
2118 assert!(primary.storage.increment_to_next_round(round).is_ok());
2120
2121 assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2123
2124 assert!(primary.propose_batch().await.is_ok());
2126
2127 let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2129 assert_eq!(proposed_transmissions.len(), 2);
2130 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2131 assert!(
2132 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2133 );
2134 }
2135
2136 #[tokio::test]
2137 async fn test_batch_propose_from_peer() {
2138 let mut rng = TestRng::default();
2139 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2140
2141 let round = 1;
2143 let peer_account = &accounts[1];
2144 let peer_ip = peer_account.0;
2145 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2146 let proposal = create_test_proposal(
2147 &peer_account.1,
2148 primary.ledger.current_committee().unwrap(),
2149 round,
2150 Default::default(),
2151 timestamp,
2152 &mut rng,
2153 );
2154
2155 for (transmission_id, transmission) in proposal.transmissions() {
2157 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2158 }
2159
2160 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2162 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2164
2165 assert!(
2167 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2168 );
2169 }
2170
2171 #[tokio::test]
2172 async fn test_batch_propose_from_peer_when_not_synced() {
2173 let mut rng = TestRng::default();
2174 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2175
2176 let round = 1;
2178 let peer_account = &accounts[1];
2179 let peer_ip = peer_account.0;
2180 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2181 let proposal = create_test_proposal(
2182 &peer_account.1,
2183 primary.ledger.current_committee().unwrap(),
2184 round,
2185 Default::default(),
2186 timestamp,
2187 &mut rng,
2188 );
2189
2190 for (transmission_id, transmission) in proposal.transmissions() {
2192 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2193 }
2194
2195 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2197
2198 assert!(
2200 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2201 );
2202 }
2203
2204 #[tokio::test]
2205 async fn test_batch_propose_from_peer_in_round() {
2206 let round = 2;
2207 let mut rng = TestRng::default();
2208 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2209
2210 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2212
2213 let peer_account = &accounts[1];
2215 let peer_ip = peer_account.0;
2216 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2217 let proposal = create_test_proposal(
2218 &peer_account.1,
2219 primary.ledger.current_committee().unwrap(),
2220 round,
2221 previous_certificates,
2222 timestamp,
2223 &mut rng,
2224 );
2225
2226 for (transmission_id, transmission) in proposal.transmissions() {
2228 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2229 }
2230
2231 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2233 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2235
2236 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2238 }
2239
2240 #[tokio::test]
2241 async fn test_batch_propose_from_peer_wrong_round() {
2242 let mut rng = TestRng::default();
2243 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2244
2245 let round = 1;
2247 let peer_account = &accounts[1];
2248 let peer_ip = peer_account.0;
2249 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2250 let proposal = create_test_proposal(
2251 &peer_account.1,
2252 primary.ledger.current_committee().unwrap(),
2253 round,
2254 Default::default(),
2255 timestamp,
2256 &mut rng,
2257 );
2258
2259 for (transmission_id, transmission) in proposal.transmissions() {
2261 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2262 }
2263
2264 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2266 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2268
2269 assert!(
2271 primary
2272 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2273 round: round + 1,
2274 batch_header: Data::Object(proposal.batch_header().clone())
2275 })
2276 .await
2277 .is_err()
2278 );
2279 }
2280
2281 #[tokio::test]
2282 async fn test_batch_propose_from_peer_in_round_wrong_round() {
2283 let round = 4;
2284 let mut rng = TestRng::default();
2285 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2286
2287 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2289
2290 let peer_account = &accounts[1];
2292 let peer_ip = peer_account.0;
2293 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2294 let proposal = create_test_proposal(
2295 &peer_account.1,
2296 primary.ledger.current_committee().unwrap(),
2297 round,
2298 previous_certificates,
2299 timestamp,
2300 &mut rng,
2301 );
2302
2303 for (transmission_id, transmission) in proposal.transmissions() {
2305 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2306 }
2307
2308 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2310 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2312
2313 assert!(
2315 primary
2316 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2317 round: round + 1,
2318 batch_header: Data::Object(proposal.batch_header().clone())
2319 })
2320 .await
2321 .is_err()
2322 );
2323 }
2324
2325 #[tokio::test]
2326 async fn test_batch_propose_from_peer_with_invalid_timestamp() {
2327 let round = 2;
2328 let mut rng = TestRng::default();
2329 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2330
2331 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2333
2334 let peer_account = &accounts[1];
2336 let peer_ip = peer_account.0;
2337 let invalid_timestamp = now(); let proposal = create_test_proposal(
2339 &peer_account.1,
2340 primary.ledger.current_committee().unwrap(),
2341 round,
2342 previous_certificates,
2343 invalid_timestamp,
2344 &mut rng,
2345 );
2346
2347 for (transmission_id, transmission) in proposal.transmissions() {
2349 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2350 }
2351
2352 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2354 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2356
2357 assert!(
2359 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2360 );
2361 }
2362
2363 #[tokio::test]
2364 async fn test_batch_propose_from_peer_with_past_timestamp() {
2365 let round = 2;
2366 let mut rng = TestRng::default();
2367 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2368
2369 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2371
2372 let peer_account = &accounts[1];
2374 let peer_ip = peer_account.0;
2375 let past_timestamp = now() - 5; let proposal = create_test_proposal(
2377 &peer_account.1,
2378 primary.ledger.current_committee().unwrap(),
2379 round,
2380 previous_certificates,
2381 past_timestamp,
2382 &mut rng,
2383 );
2384
2385 for (transmission_id, transmission) in proposal.transmissions() {
2387 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2388 }
2389
2390 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2392 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2394
2395 assert!(
2397 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2398 );
2399 }
2400
2401 #[tokio::test]
2402 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2403 let round = 3;
2404 let mut rng = TestRng::default();
2405 let (primary, _) = primary_without_handlers(&mut rng).await;
2406
2407 assert!(primary.proposed_batch.read().is_none());
2409
2410 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2412 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2413
2414 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2416 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2417
2418 let old_proposal_lock_round = *primary.propose_lock.lock().await;
2420 *primary.propose_lock.lock().await = round + 1;
2421
2422 assert!(primary.propose_batch().await.is_ok());
2424 assert!(primary.proposed_batch.read().is_none());
2425
2426 *primary.propose_lock.lock().await = old_proposal_lock_round;
2428
2429 assert!(primary.propose_batch().await.is_ok());
2431 assert!(primary.proposed_batch.read().is_some());
2432 }
2433
2434 #[tokio::test]
2435 async fn test_propose_batch_with_storage_round_behind_proposal() {
2436 let round = 5;
2437 let mut rng = TestRng::default();
2438 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2439
2440 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2442
2443 let timestamp = now();
2445 let proposal = create_test_proposal(
2446 primary.gateway.account(),
2447 primary.ledger.current_committee().unwrap(),
2448 round + 1,
2449 previous_certificates,
2450 timestamp,
2451 &mut rng,
2452 );
2453
2454 *primary.proposed_batch.write() = Some(proposal);
2456
2457 assert!(primary.propose_batch().await.is_ok());
2459 assert!(primary.proposed_batch.read().is_some());
2460 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2461 }
2462
2463 #[tokio::test(flavor = "multi_thread")]
2464 async fn test_batch_signature_from_peer() {
2465 let mut rng = TestRng::default();
2466 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2467 map_account_addresses(&primary, &accounts);
2468
2469 let round = 1;
2471 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2472 let proposal = create_test_proposal(
2473 primary.gateway.account(),
2474 primary.ledger.current_committee().unwrap(),
2475 round,
2476 Default::default(),
2477 timestamp,
2478 &mut rng,
2479 );
2480
2481 *primary.proposed_batch.write() = Some(proposal);
2483
2484 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2486
2487 for (socket_addr, signature) in signatures {
2489 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2490 }
2491
2492 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2494 assert_eq!(primary.current_round(), round + 1);
2496 }
2497
2498 #[tokio::test(flavor = "multi_thread")]
2499 async fn test_batch_signature_from_peer_in_round() {
2500 let round = 5;
2501 let mut rng = TestRng::default();
2502 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2503 map_account_addresses(&primary, &accounts);
2504
2505 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2507
2508 let timestamp = now();
2510 let proposal = create_test_proposal(
2511 primary.gateway.account(),
2512 primary.ledger.current_committee().unwrap(),
2513 round,
2514 previous_certificates,
2515 timestamp,
2516 &mut rng,
2517 );
2518
2519 *primary.proposed_batch.write() = Some(proposal);
2521
2522 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2524
2525 for (socket_addr, signature) in signatures {
2527 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2528 }
2529
2530 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2532 assert_eq!(primary.current_round(), round + 1);
2534 }
2535
2536 #[tokio::test]
2537 async fn test_batch_signature_from_peer_no_quorum() {
2538 let mut rng = TestRng::default();
2539 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2540 map_account_addresses(&primary, &accounts);
2541
2542 let round = 1;
2544 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2545 let proposal = create_test_proposal(
2546 primary.gateway.account(),
2547 primary.ledger.current_committee().unwrap(),
2548 round,
2549 Default::default(),
2550 timestamp,
2551 &mut rng,
2552 );
2553
2554 *primary.proposed_batch.write() = Some(proposal);
2556
2557 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2559
2560 let (socket_addr, signature) = signatures.first().unwrap();
2562 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2563
2564 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2566 assert_eq!(primary.current_round(), round);
2568 }
2569
2570 #[tokio::test]
2571 async fn test_batch_signature_from_peer_in_round_no_quorum() {
2572 let round = 7;
2573 let mut rng = TestRng::default();
2574 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2575 map_account_addresses(&primary, &accounts);
2576
2577 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2579
2580 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2582 let proposal = create_test_proposal(
2583 primary.gateway.account(),
2584 primary.ledger.current_committee().unwrap(),
2585 round,
2586 previous_certificates,
2587 timestamp,
2588 &mut rng,
2589 );
2590
2591 *primary.proposed_batch.write() = Some(proposal);
2593
2594 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2596
2597 let (socket_addr, signature) = signatures.first().unwrap();
2599 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2600
2601 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2603 assert_eq!(primary.current_round(), round);
2605 }
2606
2607 #[tokio::test]
2608 async fn test_insert_certificate_with_aborted_transmissions() {
2609 let round = 3;
2610 let prev_round = round - 1;
2611 let mut rng = TestRng::default();
2612 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2613 let peer_account = &accounts[1];
2614 let peer_ip = peer_account.0;
2615
2616 store_certificate_chain(&primary, &accounts, round, &mut rng);
2618
2619 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2621
2622 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2624 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2625
2626 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2628 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2629
2630 assert_eq!(primary.workers[0].num_transmissions(), 2);
2632
2633 let account = accounts[0].1.clone();
2635 let (certificate, transmissions) =
2636 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2637 let certificate_id = certificate.id();
2638
2639 let mut aborted_transmissions = HashSet::new();
2641 let mut transmissions_without_aborted = HashMap::new();
2642 for (transmission_id, transmission) in transmissions.clone() {
2643 match rng.gen::<bool>() || aborted_transmissions.is_empty() {
2644 true => {
2645 aborted_transmissions.insert(transmission_id);
2647 }
2648 false => {
2649 transmissions_without_aborted.insert(transmission_id, transmission);
2651 }
2652 };
2653 }
2654
2655 for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2657 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2658 }
2659
2660 assert!(
2662 primary
2663 .storage
2664 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2665 .is_err()
2666 );
2667 assert!(
2668 primary
2669 .storage
2670 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
2671 .is_err()
2672 );
2673
2674 primary
2676 .storage
2677 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
2678 .unwrap();
2679
2680 assert!(primary.storage.contains_certificate(certificate_id));
2682 for aborted_transmission_id in aborted_transmissions {
2684 assert!(primary.storage.contains_transmission(aborted_transmission_id));
2685 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
2686 }
2687 }
2688}