1use crate::{
17 Gateway,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 PRIMARY_PING_IN_MS,
22 Sync,
23 Transport,
24 WORKER_PING_IN_MS,
25 Worker,
26 events::{BatchPropose, BatchSignature, Event},
27 helpers::{
28 BFTSender,
29 PrimaryReceiver,
30 PrimarySender,
31 Proposal,
32 ProposalCache,
33 SignedProposals,
34 Storage,
35 assign_to_worker,
36 assign_to_workers,
37 fmt_id,
38 init_sync_channels,
39 init_worker_channels,
40 now,
41 },
42 spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_sync::DUMMY_SELF_IP;
48use snarkvm::{
49 console::{
50 prelude::*,
51 types::{Address, Field},
52 },
53 ledger::{
54 block::Transaction,
55 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
56 puzzle::{Solution, SolutionID},
57 },
58 prelude::committee::Committee,
59};
60
61use colored::Colorize;
62use futures::stream::{FuturesUnordered, StreamExt};
63use indexmap::{IndexMap, IndexSet};
64use parking_lot::{Mutex, RwLock};
65use rayon::prelude::*;
66use std::{
67 collections::{HashMap, HashSet},
68 future::Future,
69 net::SocketAddr,
70 sync::Arc,
71 time::Duration,
72};
73use tokio::{
74 sync::{Mutex as TMutex, OnceCell},
75 task::JoinHandle,
76};
77
78pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
80
81#[derive(Clone)]
82pub struct Primary<N: Network> {
83 sync: Sync<N>,
85 gateway: Gateway<N>,
87 storage: Storage<N>,
89 ledger: Arc<dyn LedgerService<N>>,
91 workers: Arc<[Worker<N>]>,
93 bft_sender: Arc<OnceCell<BFTSender<N>>>,
95 proposed_batch: Arc<ProposedBatch<N>>,
97 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
99 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
101 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
103 propose_lock: Arc<TMutex<u64>>,
105}
106
107impl<N: Network> Primary<N> {
108 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
110
111 pub fn new(
113 account: Account<N>,
114 storage: Storage<N>,
115 ledger: Arc<dyn LedgerService<N>>,
116 ip: Option<SocketAddr>,
117 trusted_validators: &[SocketAddr],
118 dev: Option<u16>,
119 ) -> Result<Self> {
120 let gateway = Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, dev)?;
122 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone());
124
125 Ok(Self {
127 sync,
128 gateway,
129 storage,
130 ledger,
131 workers: Arc::from(vec![]),
132 bft_sender: Default::default(),
133 proposed_batch: Default::default(),
134 latest_proposed_batch_timestamp: Default::default(),
135 signed_proposals: Default::default(),
136 handles: Default::default(),
137 propose_lock: Default::default(),
138 })
139 }
140
141 async fn load_proposal_cache(&self) -> Result<()> {
143 match ProposalCache::<N>::exists(self.gateway.dev()) {
145 true => match ProposalCache::<N>::load(self.gateway.account().address(), self.gateway.dev()) {
147 Ok(proposal_cache) => {
148 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
150 proposal_cache.into();
151
152 *self.proposed_batch.write() = proposed_batch;
154 *self.signed_proposals.write() = signed_proposals;
156 *self.propose_lock.lock().await = latest_certificate_round;
158
159 for certificate in pending_certificates {
161 let batch_id = certificate.batch_id();
162 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
166 {
167 warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
168 }
169 }
170 Ok(())
171 }
172 Err(err) => {
173 bail!("Failed to read the signed proposals from the file system - {err}.");
174 }
175 },
176 false => Ok(()),
178 }
179 }
180
181 pub async fn run(
183 &mut self,
184 bft_sender: Option<BFTSender<N>>,
185 primary_sender: PrimarySender<N>,
186 primary_receiver: PrimaryReceiver<N>,
187 ) -> Result<()> {
188 info!("Starting the primary instance of the memory pool...");
189
190 if let Some(bft_sender) = &bft_sender {
192 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
194 }
195
196 let mut worker_senders = IndexMap::new();
198 let mut workers = Vec::new();
200 for id in 0..MAX_WORKERS {
202 let (tx_worker, rx_worker) = init_worker_channels();
204 let worker = Worker::new(
206 id,
207 Arc::new(self.gateway.clone()),
208 self.storage.clone(),
209 self.ledger.clone(),
210 self.proposed_batch.clone(),
211 )?;
212 worker.run(rx_worker);
214 workers.push(worker);
216 worker_senders.insert(id, tx_worker);
218 }
219 self.workers = Arc::from(workers);
221
222 let (sync_sender, sync_receiver) = init_sync_channels();
224 self.sync.initialize(bft_sender).await?;
226 self.load_proposal_cache().await?;
228 self.sync.run(sync_receiver).await?;
230 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
232 self.start_handlers(primary_receiver);
235
236 Ok(())
237 }
238
239 pub fn current_round(&self) -> u64 {
241 self.storage.current_round()
242 }
243
244 pub fn is_synced(&self) -> bool {
246 self.sync.is_synced()
247 }
248
249 pub const fn gateway(&self) -> &Gateway<N> {
251 &self.gateway
252 }
253
254 pub const fn storage(&self) -> &Storage<N> {
256 &self.storage
257 }
258
259 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
261 &self.ledger
262 }
263
264 pub fn num_workers(&self) -> u8 {
266 u8::try_from(self.workers.len()).expect("Too many workers")
267 }
268
269 pub const fn workers(&self) -> &Arc<[Worker<N>]> {
271 &self.workers
272 }
273
274 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
276 &self.proposed_batch
277 }
278}
279
280impl<N: Network> Primary<N> {
281 pub fn num_unconfirmed_transmissions(&self) -> usize {
283 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
284 }
285
286 pub fn num_unconfirmed_ratifications(&self) -> usize {
288 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
289 }
290
291 pub fn num_unconfirmed_solutions(&self) -> usize {
293 self.workers.iter().map(|worker| worker.num_solutions()).sum()
294 }
295
296 pub fn num_unconfirmed_transactions(&self) -> usize {
298 self.workers.iter().map(|worker| worker.num_transactions()).sum()
299 }
300}
301
302impl<N: Network> Primary<N> {
303 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
305 self.workers.iter().flat_map(|worker| worker.transmission_ids())
306 }
307
308 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
310 self.workers.iter().flat_map(|worker| worker.transmissions())
311 }
312
313 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
315 self.workers.iter().flat_map(|worker| worker.solutions())
316 }
317
318 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
320 self.workers.iter().flat_map(|worker| worker.transactions())
321 }
322}
323
324impl<N: Network> Primary<N> {
325 pub fn clear_worker_solutions(&self) {
327 self.workers.iter().for_each(Worker::clear_solutions);
328 }
329}
330
331impl<N: Network> Primary<N> {
332 pub async fn propose_batch(&self) -> Result<()> {
340 let mut lock_guard = self.propose_lock.lock().await;
342
343 if let Err(e) = self.check_proposed_batch_for_expiration().await {
345 warn!("Failed to check the proposed batch for expiration - {e}");
346 return Ok(());
347 }
348
349 let round = self.current_round();
351 let previous_round = round.saturating_sub(1);
353
354 ensure!(round > 0, "Round 0 cannot have transaction batches");
356
357 if round < *lock_guard {
359 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
360 return Ok(());
361 }
362
363 if let Some(proposal) = self.proposed_batch.read().as_ref() {
366 if round < proposal.round()
368 || proposal
369 .batch_header()
370 .previous_certificate_ids()
371 .iter()
372 .any(|id| !self.storage.contains_certificate(*id))
373 {
374 warn!(
375 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
376 proposal.round(),
377 );
378 return Ok(());
379 }
380 let event = Event::BatchPropose(proposal.batch_header().clone().into());
383 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
385 match self.gateway.resolver().get_peer_ip_for_address(address) {
387 Some(peer_ip) => {
389 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
390 tokio::spawn(async move {
391 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
392 if gateway.send(peer_ip, event_).await.is_none() {
394 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
395 }
396 });
397 }
398 None => continue,
399 }
400 }
401 debug!("Proposed batch for round {} is still valid", proposal.round());
402 return Ok(());
403 }
404
405 #[cfg(feature = "metrics")]
406 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
407
408 if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
410 debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
411 return Ok(());
412 }
413
414 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
416 if let Some(bft_sender) = self.bft_sender.get() {
418 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
419 Ok(true) => (), Ok(false) => return Ok(()),
423 Err(e) => {
425 warn!("Failed to update the BFT to the next round - {e}");
426 return Err(e);
427 }
428 }
429 }
430 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
431 return Ok(());
432 }
433
434 if round == *lock_guard {
440 warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
441 return Ok(());
442 }
443
444 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
446 {
448 let mut connected_validators = self.gateway.connected_addresses();
450 connected_validators.insert(self.gateway.account().address());
452 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
454 debug!(
455 "Primary is safely skipping a batch proposal for round {round} {}",
456 "(please connect to more validators)".dimmed()
457 );
458 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
459 return Ok(());
460 }
461 }
462
463 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
465
466 let mut is_ready = previous_round == 0;
469 if previous_round > 0 {
471 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
473 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
474 };
475 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
477 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
479 is_ready = true;
480 }
481 }
482 if !is_ready {
484 debug!(
485 "Primary is safely skipping a batch proposal for round {round} {}",
486 format!("(previous round {previous_round} has not reached quorum)").dimmed()
487 );
488 return Ok(());
489 }
490
491 let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
493 let mut transmissions: IndexMap<_, _> = Default::default();
495 for worker in self.workers.iter() {
497 let mut num_transmissions_included_for_worker = 0;
499 'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
501 let num_remaining_transmissions =
503 num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
504 let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
506 if worker_transmissions.peek().is_none() {
508 break 'outer;
509 }
510 'inner: for (id, transmission) in worker_transmissions {
512 if self.ledger.contains_transmission(&id).unwrap_or(true) {
514 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
515 continue 'inner;
516 }
517 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
521 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
522 continue 'inner;
523 }
524 match (id, transmission.clone()) {
526 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
527 match solution.to_checksum::<N>() {
529 Ok(solution_checksum) if solution_checksum == checksum => (),
530 _ => {
531 trace!(
532 "Proposing - Skipping solution '{}' - Checksum mismatch",
533 fmt_id(solution_id)
534 );
535 continue 'inner;
536 }
537 }
538 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
540 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
541 continue 'inner;
542 }
543 }
544 (
545 TransmissionID::Transaction(transaction_id, checksum),
546 Transmission::Transaction(transaction),
547 ) => {
548 match transaction.to_checksum::<N>() {
550 Ok(transaction_checksum) if transaction_checksum == checksum => (),
551 _ => {
552 trace!(
553 "Proposing - Skipping transaction '{}' - Checksum mismatch",
554 fmt_id(transaction_id)
555 );
556 continue 'inner;
557 }
558 }
559 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
561 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
562 continue 'inner;
563 }
564 }
565 (TransmissionID::Ratification, Transmission::Ratification) => continue,
568 _ => continue 'inner,
570 }
571 transmissions.insert(id, transmission);
573 num_transmissions_included_for_worker += 1;
574 }
575 }
576 }
577
578 let current_timestamp = now();
580
581 *lock_guard = round;
582
583 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
585
586 let private_key = *self.gateway.account().private_key();
588 let committee_id = committee_lookback.id();
590 let transmission_ids = transmissions.keys().copied().collect();
592 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
594 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
596 &private_key,
597 round,
598 current_timestamp,
599 committee_id,
600 transmission_ids,
601 previous_certificate_ids,
602 &mut rand::thread_rng()
603 ))
604 .and_then(|batch_header| {
605 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
606 .map(|proposal| (batch_header, proposal))
607 })
608 .inspect_err(|_| {
609 if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
611 error!("Failed to reinsert transmissions: {e:?}");
612 }
613 })?;
614 self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
616 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
618 *self.proposed_batch.write() = Some(proposal);
620 Ok(())
621 }
622
623 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
633 let BatchPropose { round: batch_round, batch_header } = batch_propose;
634
635 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
637 if batch_round != batch_header.round() {
639 self.gateway.disconnect(peer_ip);
641 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
642 }
643
644 let batch_author = batch_header.author();
646
647 match self.gateway.resolver().get_address(peer_ip) {
649 Some(address) => {
651 if address != batch_author {
652 self.gateway.disconnect(peer_ip);
654 bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
655 }
656 }
657 None => bail!("Batch proposal from a disconnected validator"),
658 }
659 if !self.gateway.is_authorized_validator_address(batch_author) {
661 self.gateway.disconnect(peer_ip);
663 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
664 }
665 if self.gateway.account().address() == batch_author {
667 bail!("Invalid peer - proposed batch from myself ({batch_author})");
668 }
669
670 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
672 if expected_committee_id != batch_header.committee_id() {
673 self.gateway.disconnect(peer_ip);
675 bail!(
676 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
677 batch_header.committee_id()
678 );
679 }
680
681 if let Some((signed_round, signed_batch_id, signature)) =
683 self.signed_proposals.read().get(&batch_author).copied()
684 {
685 if signed_round > batch_header.round() {
688 bail!(
689 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
690 batch_header.round()
691 );
692 }
693
694 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
696 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
697 }
698 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
701 let gateway = self.gateway.clone();
702 tokio::spawn(async move {
703 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
704 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
705 if gateway.send(peer_ip, event).await.is_none() {
707 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
708 }
709 });
710 return Ok(());
712 }
713 }
714
715 if self.storage.contains_batch(batch_header.batch_id()) {
718 debug!(
719 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
720 format!("batch for round {batch_round} already exists in storage").dimmed()
721 );
722 return Ok(());
723 }
724
725 let previous_round = batch_round.saturating_sub(1);
727 if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
729 self.gateway.disconnect(peer_ip);
731 bail!("Malicious peer - {e} from '{peer_ip}'");
732 }
733
734 if batch_header.contains(TransmissionID::Ratification) {
736 self.gateway.disconnect(peer_ip);
738 bail!(
739 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
740 );
741 }
742
743 let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
745
746 if let Err(err) = cfg_iter_mut!(missing_transmissions).try_for_each(|(transmission_id, transmission)| {
748 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
750 }) {
751 debug!("Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission - {err}",);
752 return Ok(());
753 }
754
755 if let Err(e) = self.ensure_is_signing_round(batch_round) {
759 debug!("{e} from '{peer_ip}'");
761 return Ok(());
762 }
763
764 let (storage, header) = (self.storage.clone(), batch_header.clone());
766 let missing_transmissions =
767 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
768 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
770
771 let batch_id = batch_header.batch_id();
775 let account = self.gateway.account().clone();
777 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
778
779 match self.signed_proposals.write().0.entry(batch_author) {
785 std::collections::hash_map::Entry::Occupied(mut entry) => {
786 if entry.get().0 == batch_round {
791 return Ok(());
792 }
793 entry.insert((batch_round, batch_id, signature));
795 }
796 std::collections::hash_map::Entry::Vacant(entry) => {
798 entry.insert((batch_round, batch_id, signature));
800 }
801 };
802
803 let self_ = self.clone();
805 tokio::spawn(async move {
806 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
807 if self_.gateway.send(peer_ip, event).await.is_some() {
809 debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
810 }
811 });
812 Ok(())
813 }
814
815 async fn process_batch_signature_from_peer(
824 &self,
825 peer_ip: SocketAddr,
826 batch_signature: BatchSignature<N>,
827 ) -> Result<()> {
828 self.check_proposed_batch_for_expiration().await?;
830
831 let BatchSignature { batch_id, signature } = batch_signature;
833
834 let signer = signature.to_address();
836
837 if self.gateway.resolver().get_address(peer_ip).map_or(true, |address| address != signer) {
839 self.gateway.disconnect(peer_ip);
841 bail!("Malicious peer - batch signature is from a different validator ({signer})");
842 }
843 if self.gateway.account().address() == signer {
845 bail!("Invalid peer - received a batch signature from myself ({signer})");
846 }
847
848 let self_ = self.clone();
849 let Some(proposal) = spawn_blocking!({
850 let mut proposed_batch = self_.proposed_batch.write();
852 match proposed_batch.as_mut() {
854 Some(proposal) => {
855 if proposal.batch_id() != batch_id {
857 match self_.storage.contains_batch(batch_id) {
858 true => {
860 debug!(
861 "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
862 proposal.round()
863 );
864 return Ok(None);
865 }
866 false => bail!(
868 "Unknown batch ID '{batch_id}', expected '{}' for round {}",
869 proposal.batch_id(),
870 proposal.round()
871 ),
872 }
873 }
874 let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
876 let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
878 bail!("Signature is from a disconnected validator");
879 };
880 proposal.add_signature(signer, signature, &committee_lookback)?;
882 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
883 if !proposal.is_quorum_threshold_reached(&committee_lookback) {
885 return Ok(None);
887 }
888 }
889 None => return Ok(None),
891 };
892 match proposed_batch.take() {
894 Some(proposal) => Ok(Some(proposal)),
895 None => Ok(None),
896 }
897 })?
898 else {
899 return Ok(());
900 };
901
902 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
905
906 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
908 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
911 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
913 return Err(e);
914 }
915
916 #[cfg(feature = "metrics")]
917 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
918 Ok(())
919 }
920
921 async fn process_batch_certificate_from_peer(
928 &self,
929 peer_ip: SocketAddr,
930 certificate: BatchCertificate<N>,
931 ) -> Result<()> {
932 if !self.gateway.is_authorized_validator_ip(peer_ip) {
934 self.gateway.disconnect(peer_ip);
936 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
937 }
938 if self.storage.contains_certificate(certificate.id()) {
940 return Ok(());
941 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
943 self.storage.insert_unprocessed_certificate(certificate.clone())?;
944 }
945
946 let author = certificate.author();
948 let certificate_round = certificate.round();
950 let committee_id = certificate.committee_id();
952
953 if self.gateway.account().address() == author {
955 bail!("Received a batch certificate for myself ({author})");
956 }
957
958 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
960
961 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
966 let authors = self.storage.get_certificate_authors_for_round(certificate_round);
968 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
970
971 let expected_committee_id = committee_lookback.id();
973 if expected_committee_id != committee_id {
974 self.gateway.disconnect(peer_ip);
976 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
977 }
978
979 let should_advance = match &*self.proposed_batch.read() {
983 Some(proposal) => proposal.round() < certificate_round,
985 None => true,
987 };
988
989 let current_round = self.current_round();
991
992 if is_quorum && should_advance && certificate_round >= current_round {
994 self.try_increment_to_the_next_round(current_round + 1).await?;
996 }
997 Ok(())
998 }
999}
1000
1001impl<N: Network> Primary<N> {
1002 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1004 let PrimaryReceiver {
1005 mut rx_batch_propose,
1006 mut rx_batch_signature,
1007 mut rx_batch_certified,
1008 mut rx_primary_ping,
1009 mut rx_unconfirmed_solution,
1010 mut rx_unconfirmed_transaction,
1011 } = primary_receiver;
1012
1013 let self_ = self.clone();
1015 self.spawn(async move {
1016 loop {
1017 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1019
1020 let self__ = self_.clone();
1022 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1023 Ok(block_locators) => block_locators,
1024 Err(e) => {
1025 warn!("Failed to retrieve block locators - {e}");
1026 continue;
1027 }
1028 };
1029
1030 let primary_certificate = {
1032 let primary_address = self_.gateway.account().address();
1034
1035 let mut certificate = None;
1037 let mut current_round = self_.current_round();
1038 while certificate.is_none() {
1039 if current_round == 0 {
1041 break;
1042 }
1043 if let Some(primary_certificate) =
1045 self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1046 {
1047 certificate = Some(primary_certificate);
1048 } else {
1050 current_round = current_round.saturating_sub(1);
1051 }
1052 }
1053
1054 match certificate {
1056 Some(certificate) => certificate,
1057 None => continue,
1059 }
1060 };
1061
1062 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1064 self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1066 }
1067 });
1068
1069 let self_ = self.clone();
1071 self.spawn(async move {
1072 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1073 if !self_.sync.is_synced() {
1075 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1076 continue;
1077 }
1078
1079 {
1081 let self_ = self_.clone();
1082 tokio::spawn(async move {
1083 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1085 else {
1086 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1087 return;
1088 };
1089 let id = fmt_id(primary_certificate.id());
1091 let round = primary_certificate.round();
1092 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1093 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1094 }
1095 });
1096 }
1097 }
1098 });
1099
1100 let self_ = self.clone();
1102 self.spawn(async move {
1103 loop {
1104 tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1105 if !self_.sync.is_synced() {
1107 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1108 continue;
1109 }
1110 for worker in self_.workers.iter() {
1112 worker.broadcast_ping();
1113 }
1114 }
1115 });
1116
1117 let self_ = self.clone();
1119 self.spawn(async move {
1120 loop {
1121 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1123 let current_round = self_.current_round();
1124 if !self_.sync.is_synced() {
1126 debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1127 continue;
1128 }
1129 if self_.propose_lock.try_lock().is_err() {
1132 trace!(
1133 "Skipping batch proposal for round {current_round} {}",
1134 "(node is already proposing)".dimmed()
1135 );
1136 continue;
1137 };
1138 if let Err(e) = self_.propose_batch().await {
1142 warn!("Cannot propose a batch - {e}");
1143 }
1144 }
1145 });
1146
1147 let self_ = self.clone();
1149 self.spawn(async move {
1150 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1151 if !self_.sync.is_synced() {
1153 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1154 continue;
1155 }
1156 let self_ = self_.clone();
1158 tokio::spawn(async move {
1159 let round = batch_propose.round;
1161 if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1162 warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1163 }
1164 });
1165 }
1166 });
1167
1168 let self_ = self.clone();
1170 self.spawn(async move {
1171 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1172 if !self_.sync.is_synced() {
1174 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1175 continue;
1176 }
1177 let id = fmt_id(batch_signature.batch_id);
1183 if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1184 warn!("Cannot store a signature for batch '{id}' from '{peer_ip}' - {e}");
1185 }
1186 }
1187 });
1188
1189 let self_ = self.clone();
1191 self.spawn(async move {
1192 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1193 if !self_.sync.is_synced() {
1195 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1196 continue;
1197 }
1198 let self_ = self_.clone();
1200 tokio::spawn(async move {
1201 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1203 warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1204 return;
1205 };
1206 let id = fmt_id(batch_certificate.id());
1208 let round = batch_certificate.round();
1209 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1210 warn!("Cannot store a certificate '{id}' for round {round} from '{peer_ip}' - {e}");
1211 }
1212 });
1213 }
1214 });
1215
1216 let self_ = self.clone();
1220 self.spawn(async move {
1221 loop {
1222 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1224 if !self_.sync.is_synced() {
1226 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1227 continue;
1228 }
1229 let next_round = self_.current_round().saturating_add(1);
1231 let is_quorum_threshold_reached = {
1233 let authors = self_.storage.get_certificate_authors_for_round(next_round);
1235 if authors.is_empty() {
1237 continue;
1238 }
1239 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
1240 warn!("Failed to retrieve the committee lookback for round {next_round}");
1241 continue;
1242 };
1243 committee_lookback.is_quorum_threshold_reached(&authors)
1244 };
1245 if is_quorum_threshold_reached {
1247 debug!("Quorum threshold reached for round {}", next_round);
1248 if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1249 warn!("Failed to increment to the next round - {e}");
1250 }
1251 }
1252 }
1253 });
1254
1255 let self_ = self.clone();
1257 self.spawn(async move {
1258 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1259 let Ok(checksum) = solution.to_checksum::<N>() else {
1261 error!("Failed to compute the checksum for the unconfirmed solution");
1262 continue;
1263 };
1264 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1266 error!("Unable to determine the worker ID for the unconfirmed solution");
1267 continue;
1268 };
1269 let self_ = self_.clone();
1270 tokio::spawn(async move {
1271 let worker = &self_.workers[worker_id as usize];
1273 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1275 callback.send(result).ok();
1277 });
1278 }
1279 });
1280
1281 let self_ = self.clone();
1283 self.spawn(async move {
1284 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1285 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1286 let Ok(checksum) = transaction.to_checksum::<N>() else {
1288 error!("Failed to compute the checksum for the unconfirmed transaction");
1289 continue;
1290 };
1291 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1293 error!("Unable to determine the worker ID for the unconfirmed transaction");
1294 continue;
1295 };
1296 let self_ = self_.clone();
1297 tokio::spawn(async move {
1298 let worker = &self_.workers[worker_id as usize];
1300 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1302 callback.send(result).ok();
1304 });
1305 }
1306 });
1307 }
1308
1309 async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1311 let is_expired = match self.proposed_batch.read().as_ref() {
1313 Some(proposal) => proposal.round() < self.current_round(),
1314 None => false,
1315 };
1316 if is_expired {
1318 let proposal = self.proposed_batch.write().take();
1320 if let Some(proposal) = proposal {
1321 debug!("Cleared expired proposal for round {}", proposal.round());
1322 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1323 }
1324 }
1325 Ok(())
1326 }
1327
1328 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1330 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1332 let mut fast_forward_round = self.current_round();
1333 while fast_forward_round < next_round.saturating_sub(1) {
1335 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1337 *self.proposed_batch.write() = None;
1339 }
1340 }
1341
1342 let current_round = self.current_round();
1344 if current_round < next_round {
1346 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1348 match bft_sender.send_primary_round_to_bft(current_round).await {
1349 Ok(is_ready) => is_ready,
1350 Err(e) => {
1351 warn!("Failed to update the BFT to the next round - {e}");
1352 return Err(e);
1353 }
1354 }
1355 }
1356 else {
1358 self.storage.increment_to_next_round(current_round)?;
1360 true
1362 };
1363
1364 match is_ready {
1366 true => debug!("Primary is ready to propose the next round"),
1367 false => debug!("Primary is not ready to propose the next round"),
1368 }
1369
1370 if is_ready {
1372 self.propose_batch().await?;
1373 }
1374 }
1375 Ok(())
1376 }
1377
1378 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1382 let current_round = self.current_round();
1384 if current_round + self.storage.max_gc_rounds() <= batch_round {
1386 bail!("Round {batch_round} is too far in the future")
1387 }
1388 if current_round > batch_round + 1 {
1392 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1393 }
1394 if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1396 if signing_round > batch_round {
1397 bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1398 }
1399 }
1400 Ok(())
1401 }
1402
1403 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1406 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1408 Some(certificate) => certificate.timestamp(),
1410 None => match self.gateway.account().address() == author {
1411 true => *self.latest_proposed_batch_timestamp.read(),
1413 false => return Ok(()),
1415 },
1416 };
1417
1418 let elapsed = timestamp
1420 .checked_sub(previous_timestamp)
1421 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1422 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1424 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1425 false => Ok(()),
1426 }
1427 }
1428
1429 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1431 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1433 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1436 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1438 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1439 debug!("Stored a batch certificate for round {}", certificate.round());
1440 if let Some(bft_sender) = self.bft_sender.get() {
1442 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1444 warn!("Failed to update the BFT DAG from primary - {e}");
1445 return Err(e);
1446 };
1447 }
1448 self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1450 let num_transmissions = certificate.transmission_ids().len();
1452 let round = certificate.round();
1453 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1454 self.try_increment_to_the_next_round(round + 1).await
1456 }
1457
1458 fn insert_missing_transmissions_into_workers(
1460 &self,
1461 peer_ip: SocketAddr,
1462 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1463 ) -> Result<()> {
1464 assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1466 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1467 })
1468 }
1469
1470 fn reinsert_transmissions_into_workers(
1472 &self,
1473 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1474 ) -> Result<()> {
1475 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1477 worker.reinsert(transmission_id, transmission);
1478 })
1479 }
1480
1481 #[async_recursion::async_recursion]
1491 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1492 &self,
1493 peer_ip: SocketAddr,
1494 certificate: BatchCertificate<N>,
1495 ) -> Result<()> {
1496 let batch_header = certificate.batch_header();
1498 let batch_round = batch_header.round();
1500
1501 if batch_round <= self.storage.gc_round() {
1503 return Ok(());
1504 }
1505 if self.storage.contains_certificate(certificate.id()) {
1507 return Ok(());
1508 }
1509
1510 if !IS_SYNCING && !self.is_synced() {
1512 bail!(
1513 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1514 fmt_id(certificate.id())
1515 );
1516 }
1517
1518 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1520
1521 if !self.storage.contains_certificate(certificate.id()) {
1523 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1525 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1526 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1527 if let Some(bft_sender) = self.bft_sender.get() {
1529 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1531 warn!("Failed to update the BFT DAG from sync: {e}");
1532 return Err(e);
1533 };
1534 }
1535 }
1536 Ok(())
1537 }
1538
1539 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1541 &self,
1542 peer_ip: SocketAddr,
1543 batch_header: &BatchHeader<N>,
1544 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1545 let batch_round = batch_header.round();
1547
1548 if batch_round <= self.storage.gc_round() {
1550 bail!("Round {batch_round} is too far in the past")
1551 }
1552
1553 if !IS_SYNCING && !self.is_synced() {
1555 bail!(
1556 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1557 fmt_id(batch_header.batch_id())
1558 );
1559 }
1560
1561 let is_quorum_threshold_reached = {
1563 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1564 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1565 committee_lookback.is_quorum_threshold_reached(&authors)
1566 };
1567
1568 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1573 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1575 if is_behind_schedule || is_peer_far_in_future {
1577 self.try_increment_to_the_next_round(batch_round).await?;
1579 }
1580
1581 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1583
1584 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1586
1587 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1589 missing_transmissions_handle,
1590 missing_previous_certificates_handle,
1591 ).map_err(|e| {
1592 anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1593 })?;
1594
1595 for batch_certificate in missing_previous_certificates {
1597 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1599 }
1600 Ok(missing_transmissions)
1601 }
1602
1603 async fn fetch_missing_transmissions(
1606 &self,
1607 peer_ip: SocketAddr,
1608 batch_header: &BatchHeader<N>,
1609 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1610 if batch_header.round() <= self.storage.gc_round() {
1612 return Ok(Default::default());
1613 }
1614
1615 if self.storage.contains_batch(batch_header.batch_id()) {
1617 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1618 return Ok(Default::default());
1619 }
1620
1621 let workers = self.workers.clone();
1623
1624 let mut fetch_transmissions = FuturesUnordered::new();
1626
1627 let num_workers = self.num_workers();
1629 for transmission_id in batch_header.transmission_ids() {
1631 if !self.storage.contains_transmission(*transmission_id) {
1633 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1635 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1636 };
1637 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1639 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1641 }
1642 }
1643
1644 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1646 while let Some(result) = fetch_transmissions.next().await {
1648 let (transmission_id, transmission) = result?;
1650 transmissions.insert(transmission_id, transmission);
1652 }
1653 Ok(transmissions)
1655 }
1656
1657 async fn fetch_missing_previous_certificates(
1659 &self,
1660 peer_ip: SocketAddr,
1661 batch_header: &BatchHeader<N>,
1662 ) -> Result<HashSet<BatchCertificate<N>>> {
1663 let round = batch_header.round();
1665 if round == 1 || round <= self.storage.gc_round() + 1 {
1667 return Ok(Default::default());
1668 }
1669
1670 let missing_previous_certificates =
1672 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1673 if !missing_previous_certificates.is_empty() {
1674 debug!(
1675 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1676 missing_previous_certificates.len(),
1677 );
1678 }
1679 Ok(missing_previous_certificates)
1681 }
1682
1683 async fn fetch_missing_certificates(
1685 &self,
1686 peer_ip: SocketAddr,
1687 round: u64,
1688 certificate_ids: &IndexSet<Field<N>>,
1689 ) -> Result<HashSet<BatchCertificate<N>>> {
1690 let mut fetch_certificates = FuturesUnordered::new();
1692 let mut missing_certificates = HashSet::default();
1694 for certificate_id in certificate_ids {
1696 if self.ledger.contains_certificate(certificate_id)? {
1698 continue;
1699 }
1700 if self.storage.contains_certificate(*certificate_id) {
1702 continue;
1703 }
1704 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1706 missing_certificates.insert(certificate);
1707 } else {
1708 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1710 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1713 }
1714 }
1715
1716 match fetch_certificates.is_empty() {
1718 true => return Ok(missing_certificates),
1719 false => trace!(
1720 "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1721 fetch_certificates.len(),
1722 ),
1723 }
1724
1725 while let Some(result) = fetch_certificates.next().await {
1727 missing_certificates.insert(result?);
1729 }
1730 Ok(missing_certificates)
1732 }
1733}
1734
1735impl<N: Network> Primary<N> {
1736 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1738 self.handles.lock().push(tokio::spawn(future));
1739 }
1740
1741 pub async fn shut_down(&self) {
1743 info!("Shutting down the primary...");
1744 self.workers.iter().for_each(|worker| worker.shut_down());
1746 self.handles.lock().iter().for_each(|handle| handle.abort());
1748 let proposal_cache = {
1750 let proposal = self.proposed_batch.write().take();
1751 let signed_proposals = self.signed_proposals.read().clone();
1752 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1753 let pending_certificates = self.storage.get_pending_certificates();
1754 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1755 };
1756 if let Err(err) = proposal_cache.store(self.gateway.dev()) {
1757 error!("Failed to store the current proposal cache: {err}");
1758 }
1759 self.gateway.shut_down().await;
1761 }
1762}
1763
1764#[cfg(test)]
1765mod tests {
1766 use super::*;
1767 use snarkos_node_bft_ledger_service::MockLedgerService;
1768 use snarkos_node_bft_storage_service::BFTMemoryService;
1769 use snarkvm::{
1770 ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1771 prelude::{Address, Signature},
1772 };
1773
1774 use bytes::Bytes;
1775 use indexmap::IndexSet;
1776 use rand::RngCore;
1777
1778 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1779
1780 async fn primary_without_handlers(
1782 rng: &mut TestRng,
1783 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1784 let (accounts, committee) = {
1786 const COMMITTEE_SIZE: usize = 4;
1787 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1788 let mut members = IndexMap::new();
1789
1790 for i in 0..COMMITTEE_SIZE {
1791 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1792 let account = Account::new(rng).unwrap();
1793 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1794 accounts.push((socket_addr, account));
1795 }
1796
1797 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1798 };
1799
1800 let account = accounts.first().unwrap().1.clone();
1801 let ledger = Arc::new(MockLedgerService::new(committee));
1802 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1803
1804 let mut primary = Primary::new(account, storage, ledger, None, &[], None).unwrap();
1806
1807 primary.workers = Arc::from([Worker::new(
1809 0, Arc::new(primary.gateway.clone()),
1811 primary.storage.clone(),
1812 primary.ledger.clone(),
1813 primary.proposed_batch.clone(),
1814 )
1815 .unwrap()]);
1816 for a in accounts.iter() {
1817 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
1818 }
1819
1820 (primary, accounts)
1821 }
1822
1823 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1825 let solution_id = rng.gen::<u64>().into();
1827 let size = rng.gen_range(1024..10 * 1024);
1829 let mut vec = vec![0u8; size];
1831 rng.fill_bytes(&mut vec);
1832 let solution = Data::Buffer(Bytes::from(vec));
1833 (solution_id, solution)
1835 }
1836
1837 fn sample_unconfirmed_transaction(
1839 rng: &mut TestRng,
1840 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1841 let id = Field::<CurrentNetwork>::rand(rng).into();
1843 let size = rng.gen_range(1024..10 * 1024);
1845 let mut vec = vec![0u8; size];
1847 rng.fill_bytes(&mut vec);
1848 let transaction = Data::Buffer(Bytes::from(vec));
1849 (id, transaction)
1851 }
1852
1853 fn create_test_proposal(
1855 author: &Account<CurrentNetwork>,
1856 committee: Committee<CurrentNetwork>,
1857 round: u64,
1858 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1859 timestamp: i64,
1860 rng: &mut TestRng,
1861 ) -> Proposal<CurrentNetwork> {
1862 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1863 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1864 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1865 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1866
1867 let solution_transmission_id = (solution_id, solution_checksum).into();
1868 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1869
1870 let private_key = author.private_key();
1872 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1874 let transmissions = [
1875 (solution_transmission_id, Transmission::Solution(solution)),
1876 (transaction_transmission_id, Transmission::Transaction(transaction)),
1877 ]
1878 .into();
1879 let batch_header = BatchHeader::new(
1881 private_key,
1882 round,
1883 timestamp,
1884 committee.id(),
1885 transmission_ids,
1886 previous_certificate_ids,
1887 rng,
1888 )
1889 .unwrap();
1890 Proposal::new(committee, batch_header, transmissions).unwrap()
1892 }
1893
1894 fn peer_signatures_for_proposal(
1897 primary: &Primary<CurrentNetwork>,
1898 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1899 rng: &mut TestRng,
1900 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
1901 let mut signatures = Vec::with_capacity(accounts.len() - 1);
1903 for (socket_addr, account) in accounts {
1904 if account.address() == primary.gateway.account().address() {
1905 continue;
1906 }
1907 let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
1908 let signature = account.sign(&[batch_id], rng).unwrap();
1909 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
1910 }
1911
1912 signatures
1913 }
1914
1915 fn peer_signatures_for_batch(
1917 primary_address: Address<CurrentNetwork>,
1918 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1919 batch_id: Field<CurrentNetwork>,
1920 rng: &mut TestRng,
1921 ) -> IndexSet<Signature<CurrentNetwork>> {
1922 let mut signatures = IndexSet::new();
1923 for (_, account) in accounts {
1924 if account.address() == primary_address {
1925 continue;
1926 }
1927 let signature = account.sign(&[batch_id], rng).unwrap();
1928 signatures.insert(signature);
1929 }
1930 signatures
1931 }
1932
1933 fn create_batch_certificate(
1935 primary_address: Address<CurrentNetwork>,
1936 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1937 round: u64,
1938 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1939 rng: &mut TestRng,
1940 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1941 let timestamp = now();
1942
1943 let author =
1944 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1945 let private_key = author.private_key();
1946
1947 let committee_id = Field::rand(rng);
1948 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1949 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1950 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1951 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1952
1953 let solution_transmission_id = (solution_id, solution_checksum).into();
1954 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1955
1956 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1957 let transmissions = [
1958 (solution_transmission_id, Transmission::Solution(solution)),
1959 (transaction_transmission_id, Transmission::Transaction(transaction)),
1960 ]
1961 .into();
1962
1963 let batch_header = BatchHeader::new(
1964 private_key,
1965 round,
1966 timestamp,
1967 committee_id,
1968 transmission_ids,
1969 previous_certificate_ids,
1970 rng,
1971 )
1972 .unwrap();
1973 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1974 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1975 (certificate, transmissions)
1976 }
1977
1978 fn store_certificate_chain(
1980 primary: &Primary<CurrentNetwork>,
1981 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1982 round: u64,
1983 rng: &mut TestRng,
1984 ) -> IndexSet<Field<CurrentNetwork>> {
1985 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1986 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1987 for cur_round in 1..round {
1988 for (_, account) in accounts.iter() {
1989 let (certificate, transmissions) = create_batch_certificate(
1990 account.address(),
1991 accounts,
1992 cur_round,
1993 previous_certificates.clone(),
1994 rng,
1995 );
1996 next_certificates.insert(certificate.id());
1997 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1998 }
1999
2000 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2001 previous_certificates = next_certificates;
2002 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2003 }
2004
2005 previous_certificates
2006 }
2007
2008 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2011 for (addr, acct) in accounts.iter().skip(1) {
2013 primary.gateway.resolver().insert_peer(*addr, *addr, acct.address());
2014 }
2015 }
2016
2017 #[tokio::test]
2018 async fn test_propose_batch() {
2019 let mut rng = TestRng::default();
2020 let (primary, _) = primary_without_handlers(&mut rng).await;
2021
2022 assert!(primary.proposed_batch.read().is_none());
2024
2025 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2027 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2028
2029 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2031 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2032
2033 assert!(primary.propose_batch().await.is_ok());
2035 assert!(primary.proposed_batch.read().is_some());
2036 }
2037
2038 #[tokio::test]
2039 async fn test_propose_batch_with_no_transmissions() {
2040 let mut rng = TestRng::default();
2041 let (primary, _) = primary_without_handlers(&mut rng).await;
2042
2043 assert!(primary.proposed_batch.read().is_none());
2045
2046 assert!(primary.propose_batch().await.is_ok());
2048 assert!(primary.proposed_batch.read().is_some());
2049 }
2050
2051 #[tokio::test]
2052 async fn test_propose_batch_in_round() {
2053 let round = 3;
2054 let mut rng = TestRng::default();
2055 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2056
2057 store_certificate_chain(&primary, &accounts, round, &mut rng);
2059
2060 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2062
2063 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2065 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2066
2067 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2069 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2070
2071 assert!(primary.propose_batch().await.is_ok());
2073 assert!(primary.proposed_batch.read().is_some());
2074 }
2075
2076 #[tokio::test]
2077 async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2078 let round = 3;
2079 let prev_round = round - 1;
2080 let mut rng = TestRng::default();
2081 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2082 let peer_account = &accounts[1];
2083 let peer_ip = peer_account.0;
2084
2085 store_certificate_chain(&primary, &accounts, round, &mut rng);
2087
2088 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2090
2091 let mut num_transmissions_in_previous_round = 0;
2093
2094 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2096 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2097 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2098 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2099
2100 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2102 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2103
2104 assert_eq!(primary.workers[0].num_transmissions(), 2);
2106
2107 for (_, account) in accounts.iter() {
2109 let (certificate, transmissions) = create_batch_certificate(
2110 account.address(),
2111 &accounts,
2112 round,
2113 previous_certificate_ids.clone(),
2114 &mut rng,
2115 );
2116
2117 for (transmission_id, transmission) in transmissions.iter() {
2119 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2120 }
2121
2122 num_transmissions_in_previous_round += transmissions.len();
2124 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2125 }
2126
2127 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2129
2130 assert!(primary.storage.increment_to_next_round(round).is_ok());
2132
2133 assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2135
2136 assert!(primary.propose_batch().await.is_ok());
2138
2139 let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2141 assert_eq!(proposed_transmissions.len(), 2);
2142 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2143 assert!(
2144 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2145 );
2146 }
2147
2148 #[tokio::test]
2149 async fn test_batch_propose_from_peer() {
2150 let mut rng = TestRng::default();
2151 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2152
2153 let round = 1;
2155 let peer_account = &accounts[1];
2156 let peer_ip = peer_account.0;
2157 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2158 let proposal = create_test_proposal(
2159 &peer_account.1,
2160 primary.ledger.current_committee().unwrap(),
2161 round,
2162 Default::default(),
2163 timestamp,
2164 &mut rng,
2165 );
2166
2167 for (transmission_id, transmission) in proposal.transmissions() {
2169 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2170 }
2171
2172 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2174 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2176
2177 assert!(
2179 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2180 );
2181 }
2182
2183 #[tokio::test]
2184 async fn test_batch_propose_from_peer_when_not_synced() {
2185 let mut rng = TestRng::default();
2186 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2187
2188 let round = 1;
2190 let peer_account = &accounts[1];
2191 let peer_ip = peer_account.0;
2192 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2193 let proposal = create_test_proposal(
2194 &peer_account.1,
2195 primary.ledger.current_committee().unwrap(),
2196 round,
2197 Default::default(),
2198 timestamp,
2199 &mut rng,
2200 );
2201
2202 for (transmission_id, transmission) in proposal.transmissions() {
2204 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2205 }
2206
2207 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2209
2210 assert!(
2212 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2213 );
2214 }
2215
2216 #[tokio::test]
2217 async fn test_batch_propose_from_peer_in_round() {
2218 let round = 2;
2219 let mut rng = TestRng::default();
2220 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2221
2222 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2224
2225 let peer_account = &accounts[1];
2227 let peer_ip = peer_account.0;
2228 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2229 let proposal = create_test_proposal(
2230 &peer_account.1,
2231 primary.ledger.current_committee().unwrap(),
2232 round,
2233 previous_certificates,
2234 timestamp,
2235 &mut rng,
2236 );
2237
2238 for (transmission_id, transmission) in proposal.transmissions() {
2240 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2241 }
2242
2243 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2245 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2247
2248 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2250 }
2251
2252 #[tokio::test]
2253 async fn test_batch_propose_from_peer_wrong_round() {
2254 let mut rng = TestRng::default();
2255 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2256
2257 let round = 1;
2259 let peer_account = &accounts[1];
2260 let peer_ip = peer_account.0;
2261 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2262 let proposal = create_test_proposal(
2263 &peer_account.1,
2264 primary.ledger.current_committee().unwrap(),
2265 round,
2266 Default::default(),
2267 timestamp,
2268 &mut rng,
2269 );
2270
2271 for (transmission_id, transmission) in proposal.transmissions() {
2273 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2274 }
2275
2276 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2278 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2280
2281 assert!(
2283 primary
2284 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2285 round: round + 1,
2286 batch_header: Data::Object(proposal.batch_header().clone())
2287 })
2288 .await
2289 .is_err()
2290 );
2291 }
2292
2293 #[tokio::test]
2294 async fn test_batch_propose_from_peer_in_round_wrong_round() {
2295 let round = 4;
2296 let mut rng = TestRng::default();
2297 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2298
2299 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2301
2302 let peer_account = &accounts[1];
2304 let peer_ip = peer_account.0;
2305 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2306 let proposal = create_test_proposal(
2307 &peer_account.1,
2308 primary.ledger.current_committee().unwrap(),
2309 round,
2310 previous_certificates,
2311 timestamp,
2312 &mut rng,
2313 );
2314
2315 for (transmission_id, transmission) in proposal.transmissions() {
2317 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2318 }
2319
2320 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2322 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2324
2325 assert!(
2327 primary
2328 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2329 round: round + 1,
2330 batch_header: Data::Object(proposal.batch_header().clone())
2331 })
2332 .await
2333 .is_err()
2334 );
2335 }
2336
2337 #[tokio::test]
2338 async fn test_batch_propose_from_peer_with_invalid_timestamp() {
2339 let round = 2;
2340 let mut rng = TestRng::default();
2341 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2342
2343 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2345
2346 let peer_account = &accounts[1];
2348 let peer_ip = peer_account.0;
2349 let invalid_timestamp = now(); let proposal = create_test_proposal(
2351 &peer_account.1,
2352 primary.ledger.current_committee().unwrap(),
2353 round,
2354 previous_certificates,
2355 invalid_timestamp,
2356 &mut rng,
2357 );
2358
2359 for (transmission_id, transmission) in proposal.transmissions() {
2361 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2362 }
2363
2364 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2366 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2368
2369 assert!(
2371 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2372 );
2373 }
2374
2375 #[tokio::test]
2376 async fn test_batch_propose_from_peer_with_past_timestamp() {
2377 let round = 2;
2378 let mut rng = TestRng::default();
2379 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2380
2381 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2383
2384 let peer_account = &accounts[1];
2386 let peer_ip = peer_account.0;
2387 let past_timestamp = now() - 5; let proposal = create_test_proposal(
2389 &peer_account.1,
2390 primary.ledger.current_committee().unwrap(),
2391 round,
2392 previous_certificates,
2393 past_timestamp,
2394 &mut rng,
2395 );
2396
2397 for (transmission_id, transmission) in proposal.transmissions() {
2399 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2400 }
2401
2402 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2404 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2406
2407 assert!(
2409 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2410 );
2411 }
2412
2413 #[tokio::test]
2414 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2415 let round = 3;
2416 let mut rng = TestRng::default();
2417 let (primary, _) = primary_without_handlers(&mut rng).await;
2418
2419 assert!(primary.proposed_batch.read().is_none());
2421
2422 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2424 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2425
2426 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2428 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2429
2430 let old_proposal_lock_round = *primary.propose_lock.lock().await;
2432 *primary.propose_lock.lock().await = round + 1;
2433
2434 assert!(primary.propose_batch().await.is_ok());
2436 assert!(primary.proposed_batch.read().is_none());
2437
2438 *primary.propose_lock.lock().await = old_proposal_lock_round;
2440
2441 assert!(primary.propose_batch().await.is_ok());
2443 assert!(primary.proposed_batch.read().is_some());
2444 }
2445
2446 #[tokio::test]
2447 async fn test_propose_batch_with_storage_round_behind_proposal() {
2448 let round = 5;
2449 let mut rng = TestRng::default();
2450 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2451
2452 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2454
2455 let timestamp = now();
2457 let proposal = create_test_proposal(
2458 primary.gateway.account(),
2459 primary.ledger.current_committee().unwrap(),
2460 round + 1,
2461 previous_certificates,
2462 timestamp,
2463 &mut rng,
2464 );
2465
2466 *primary.proposed_batch.write() = Some(proposal);
2468
2469 assert!(primary.propose_batch().await.is_ok());
2471 assert!(primary.proposed_batch.read().is_some());
2472 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2473 }
2474
2475 #[tokio::test(flavor = "multi_thread")]
2476 async fn test_batch_signature_from_peer() {
2477 let mut rng = TestRng::default();
2478 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2479 map_account_addresses(&primary, &accounts);
2480
2481 let round = 1;
2483 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2484 let proposal = create_test_proposal(
2485 primary.gateway.account(),
2486 primary.ledger.current_committee().unwrap(),
2487 round,
2488 Default::default(),
2489 timestamp,
2490 &mut rng,
2491 );
2492
2493 *primary.proposed_batch.write() = Some(proposal);
2495
2496 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2498
2499 for (socket_addr, signature) in signatures {
2501 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2502 }
2503
2504 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2506 assert_eq!(primary.current_round(), round + 1);
2508 }
2509
2510 #[tokio::test(flavor = "multi_thread")]
2511 async fn test_batch_signature_from_peer_in_round() {
2512 let round = 5;
2513 let mut rng = TestRng::default();
2514 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2515 map_account_addresses(&primary, &accounts);
2516
2517 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2519
2520 let timestamp = now();
2522 let proposal = create_test_proposal(
2523 primary.gateway.account(),
2524 primary.ledger.current_committee().unwrap(),
2525 round,
2526 previous_certificates,
2527 timestamp,
2528 &mut rng,
2529 );
2530
2531 *primary.proposed_batch.write() = Some(proposal);
2533
2534 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2536
2537 for (socket_addr, signature) in signatures {
2539 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2540 }
2541
2542 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2544 assert_eq!(primary.current_round(), round + 1);
2546 }
2547
2548 #[tokio::test]
2549 async fn test_batch_signature_from_peer_no_quorum() {
2550 let mut rng = TestRng::default();
2551 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2552 map_account_addresses(&primary, &accounts);
2553
2554 let round = 1;
2556 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2557 let proposal = create_test_proposal(
2558 primary.gateway.account(),
2559 primary.ledger.current_committee().unwrap(),
2560 round,
2561 Default::default(),
2562 timestamp,
2563 &mut rng,
2564 );
2565
2566 *primary.proposed_batch.write() = Some(proposal);
2568
2569 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2571
2572 let (socket_addr, signature) = signatures.first().unwrap();
2574 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2575
2576 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2578 assert_eq!(primary.current_round(), round);
2580 }
2581
2582 #[tokio::test]
2583 async fn test_batch_signature_from_peer_in_round_no_quorum() {
2584 let round = 7;
2585 let mut rng = TestRng::default();
2586 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2587 map_account_addresses(&primary, &accounts);
2588
2589 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2591
2592 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2594 let proposal = create_test_proposal(
2595 primary.gateway.account(),
2596 primary.ledger.current_committee().unwrap(),
2597 round,
2598 previous_certificates,
2599 timestamp,
2600 &mut rng,
2601 );
2602
2603 *primary.proposed_batch.write() = Some(proposal);
2605
2606 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2608
2609 let (socket_addr, signature) = signatures.first().unwrap();
2611 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2612
2613 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2615 assert_eq!(primary.current_round(), round);
2617 }
2618
2619 #[tokio::test]
2620 async fn test_insert_certificate_with_aborted_transmissions() {
2621 let round = 3;
2622 let prev_round = round - 1;
2623 let mut rng = TestRng::default();
2624 let (primary, accounts) = primary_without_handlers(&mut rng).await;
2625 let peer_account = &accounts[1];
2626 let peer_ip = peer_account.0;
2627
2628 store_certificate_chain(&primary, &accounts, round, &mut rng);
2630
2631 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2633
2634 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2636 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2637
2638 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2640 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2641
2642 assert_eq!(primary.workers[0].num_transmissions(), 2);
2644
2645 let account = accounts[0].1.clone();
2647 let (certificate, transmissions) =
2648 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2649 let certificate_id = certificate.id();
2650
2651 let mut aborted_transmissions = HashSet::new();
2653 let mut transmissions_without_aborted = HashMap::new();
2654 for (transmission_id, transmission) in transmissions.clone() {
2655 match rng.gen::<bool>() || aborted_transmissions.is_empty() {
2656 true => {
2657 aborted_transmissions.insert(transmission_id);
2659 }
2660 false => {
2661 transmissions_without_aborted.insert(transmission_id, transmission);
2663 }
2664 };
2665 }
2666
2667 for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2669 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2670 }
2671
2672 assert!(
2674 primary
2675 .storage
2676 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2677 .is_err()
2678 );
2679 assert!(
2680 primary
2681 .storage
2682 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
2683 .is_err()
2684 );
2685
2686 primary
2688 .storage
2689 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
2690 .unwrap();
2691
2692 assert!(primary.storage.contains_certificate(certificate_id));
2694 for aborted_transmission_id in aborted_transmissions {
2696 assert!(primary.storage.contains_transmission(aborted_transmission_id));
2697 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
2698 }
2699 }
2700}