1use crate::{
17 Gateway,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 PRIMARY_PING_IN_MS,
22 Sync,
23 Transport,
24 WORKER_PING_IN_MS,
25 Worker,
26 events::{BatchPropose, BatchSignature, Event},
27 helpers::{
28 BFTSender,
29 PrimaryReceiver,
30 PrimarySender,
31 Proposal,
32 ProposalCache,
33 SignedProposals,
34 Storage,
35 assign_to_worker,
36 assign_to_workers,
37 fmt_id,
38 init_sync_channels,
39 init_worker_channels,
40 now,
41 },
42 spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_sync::DUMMY_SELF_IP;
48use snarkvm::{
49 console::{
50 prelude::*,
51 types::{Address, Field},
52 },
53 ledger::{
54 block::Transaction,
55 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
56 puzzle::{Solution, SolutionID},
57 },
58 prelude::committee::Committee,
59};
60
61use aleo_std::StorageMode;
62use colored::Colorize;
63use futures::stream::{FuturesUnordered, StreamExt};
64use indexmap::{IndexMap, IndexSet};
65#[cfg(feature = "locktick")]
66use locktick::{
67 parking_lot::{Mutex, RwLock},
68 tokio::Mutex as TMutex,
69};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use rayon::prelude::*;
73use std::{
74 collections::{HashMap, HashSet},
75 future::Future,
76 net::SocketAddr,
77 sync::Arc,
78 time::Duration,
79};
80#[cfg(not(feature = "locktick"))]
81use tokio::sync::Mutex as TMutex;
82use tokio::{sync::OnceCell, task::JoinHandle};
83
84pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
86
87#[derive(Clone)]
88pub struct Primary<N: Network> {
89 sync: Sync<N>,
91 gateway: Gateway<N>,
93 storage: Storage<N>,
95 ledger: Arc<dyn LedgerService<N>>,
97 workers: Arc<[Worker<N>]>,
99 bft_sender: Arc<OnceCell<BFTSender<N>>>,
101 proposed_batch: Arc<ProposedBatch<N>>,
103 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
105 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
107 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
109 propose_lock: Arc<TMutex<u64>>,
111 storage_mode: StorageMode,
113}
114
115impl<N: Network> Primary<N> {
116 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
118
119 pub fn new(
121 account: Account<N>,
122 storage: Storage<N>,
123 ledger: Arc<dyn LedgerService<N>>,
124 ip: Option<SocketAddr>,
125 trusted_validators: &[SocketAddr],
126 storage_mode: StorageMode,
127 ) -> Result<Self> {
128 let gateway =
130 Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, storage_mode.dev())?;
131 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone());
133
134 Ok(Self {
136 sync,
137 gateway,
138 storage,
139 ledger,
140 workers: Arc::from(vec![]),
141 bft_sender: Default::default(),
142 proposed_batch: Default::default(),
143 latest_proposed_batch_timestamp: Default::default(),
144 signed_proposals: Default::default(),
145 handles: Default::default(),
146 propose_lock: Default::default(),
147 storage_mode,
148 })
149 }
150
151 async fn load_proposal_cache(&self) -> Result<()> {
153 match ProposalCache::<N>::exists(&self.storage_mode) {
155 true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) {
157 Ok(proposal_cache) => {
158 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
160 proposal_cache.into();
161
162 *self.proposed_batch.write() = proposed_batch;
164 *self.signed_proposals.write() = signed_proposals;
166 *self.propose_lock.lock().await = latest_certificate_round;
168
169 for certificate in pending_certificates {
171 let batch_id = certificate.batch_id();
172 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
176 {
177 warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
178 }
179 }
180 Ok(())
181 }
182 Err(err) => {
183 bail!("Failed to read the signed proposals from the file system - {err}.");
184 }
185 },
186 false => Ok(()),
188 }
189 }
190
191 pub async fn run(
193 &mut self,
194 bft_sender: Option<BFTSender<N>>,
195 primary_sender: PrimarySender<N>,
196 primary_receiver: PrimaryReceiver<N>,
197 ) -> Result<()> {
198 info!("Starting the primary instance of the memory pool...");
199
200 if let Some(bft_sender) = &bft_sender {
202 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
204 }
205
206 let mut worker_senders = IndexMap::new();
208 let mut workers = Vec::new();
210 for id in 0..MAX_WORKERS {
212 let (tx_worker, rx_worker) = init_worker_channels();
214 let worker = Worker::new(
216 id,
217 Arc::new(self.gateway.clone()),
218 self.storage.clone(),
219 self.ledger.clone(),
220 self.proposed_batch.clone(),
221 )?;
222 worker.run(rx_worker);
224 workers.push(worker);
226 worker_senders.insert(id, tx_worker);
228 }
229 self.workers = Arc::from(workers);
231
232 let (sync_sender, sync_receiver) = init_sync_channels();
234 self.sync.initialize(bft_sender).await?;
236 self.load_proposal_cache().await?;
238 self.sync.run(sync_receiver).await?;
240 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
242 self.start_handlers(primary_receiver);
245
246 Ok(())
247 }
248
249 pub fn current_round(&self) -> u64 {
251 self.storage.current_round()
252 }
253
254 pub fn is_synced(&self) -> bool {
256 self.sync.is_synced()
257 }
258
259 pub const fn gateway(&self) -> &Gateway<N> {
261 &self.gateway
262 }
263
264 pub const fn storage(&self) -> &Storage<N> {
266 &self.storage
267 }
268
269 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
271 &self.ledger
272 }
273
274 pub fn num_workers(&self) -> u8 {
276 u8::try_from(self.workers.len()).expect("Too many workers")
277 }
278
279 pub const fn workers(&self) -> &Arc<[Worker<N>]> {
281 &self.workers
282 }
283
284 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
286 &self.proposed_batch
287 }
288}
289
290impl<N: Network> Primary<N> {
291 pub fn num_unconfirmed_transmissions(&self) -> usize {
293 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
294 }
295
296 pub fn num_unconfirmed_ratifications(&self) -> usize {
298 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
299 }
300
301 pub fn num_unconfirmed_solutions(&self) -> usize {
303 self.workers.iter().map(|worker| worker.num_solutions()).sum()
304 }
305
306 pub fn num_unconfirmed_transactions(&self) -> usize {
308 self.workers.iter().map(|worker| worker.num_transactions()).sum()
309 }
310}
311
312impl<N: Network> Primary<N> {
313 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
315 self.workers.iter().flat_map(|worker| worker.transmission_ids())
316 }
317
318 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
320 self.workers.iter().flat_map(|worker| worker.transmissions())
321 }
322
323 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
325 self.workers.iter().flat_map(|worker| worker.solutions())
326 }
327
328 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
330 self.workers.iter().flat_map(|worker| worker.transactions())
331 }
332}
333
334impl<N: Network> Primary<N> {
335 pub fn clear_worker_solutions(&self) {
337 self.workers.iter().for_each(Worker::clear_solutions);
338 }
339}
340
341impl<N: Network> Primary<N> {
342 pub async fn propose_batch(&self) -> Result<()> {
350 let mut lock_guard = self.propose_lock.lock().await;
352
353 if let Err(e) = self.check_proposed_batch_for_expiration().await {
355 warn!("Failed to check the proposed batch for expiration - {e}");
356 return Ok(());
357 }
358
359 let round = self.current_round();
361 let previous_round = round.saturating_sub(1);
363
364 ensure!(round > 0, "Round 0 cannot have transaction batches");
366
367 if round < *lock_guard {
369 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
370 return Ok(());
371 }
372
373 if let Some(proposal) = self.proposed_batch.read().as_ref() {
376 if round < proposal.round()
378 || proposal
379 .batch_header()
380 .previous_certificate_ids()
381 .iter()
382 .any(|id| !self.storage.contains_certificate(*id))
383 {
384 warn!(
385 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
386 proposal.round(),
387 );
388 return Ok(());
389 }
390 let event = Event::BatchPropose(proposal.batch_header().clone().into());
393 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
395 match self.gateway.resolver().get_peer_ip_for_address(address) {
397 Some(peer_ip) => {
399 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
400 tokio::spawn(async move {
401 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
402 if gateway.send(peer_ip, event_).await.is_none() {
404 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
405 }
406 });
407 }
408 None => continue,
409 }
410 }
411 debug!("Proposed batch for round {} is still valid", proposal.round());
412 return Ok(());
413 }
414
415 #[cfg(feature = "metrics")]
416 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
417
418 if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
420 debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
421 return Ok(());
422 }
423
424 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
426 if let Some(bft_sender) = self.bft_sender.get() {
428 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
429 Ok(true) => (), Ok(false) => return Ok(()),
433 Err(e) => {
435 warn!("Failed to update the BFT to the next round - {e}");
436 return Err(e);
437 }
438 }
439 }
440 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
441 return Ok(());
442 }
443
444 if round == *lock_guard {
450 warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
451 return Ok(());
452 }
453
454 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
456 {
458 let mut connected_validators = self.gateway.connected_addresses();
460 connected_validators.insert(self.gateway.account().address());
462 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
464 debug!(
465 "Primary is safely skipping a batch proposal for round {round} {}",
466 "(please connect to more validators)".dimmed()
467 );
468 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
469 return Ok(());
470 }
471 }
472
473 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
475
476 let mut is_ready = previous_round == 0;
479 if previous_round > 0 {
481 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
483 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
484 };
485 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
487 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
489 is_ready = true;
490 }
491 }
492 if !is_ready {
494 debug!(
495 "Primary is safely skipping a batch proposal for round {round} {}",
496 format!("(previous round {previous_round} has not reached quorum)").dimmed()
497 );
498 return Ok(());
499 }
500
501 let mut transmissions: IndexMap<_, _> = Default::default();
503 let mut proposal_cost = 0u64;
505 debug_assert_eq!(MAX_WORKERS, 1);
509
510 'outer: for worker in self.workers().iter() {
511 let mut num_worker_transmissions = 0usize;
512
513 while let Some((id, transmission)) = worker.remove_front() {
514 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
516 break 'outer;
517 }
518
519 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
521 continue 'outer;
522 }
523
524 if self.ledger.contains_transmission(&id).unwrap_or(true) {
526 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
527 continue;
528 }
529
530 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
534 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
535 continue;
536 }
537
538 match (id, transmission.clone()) {
540 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
541 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
543 {
544 trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
545 continue;
546 }
547 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
549 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
550 continue;
551 }
552 }
553 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
554 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
556 {
557 trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
558 continue;
559 }
560
561 let transaction = spawn_blocking!({
563 match transaction {
564 Data::Object(transaction) => Ok(transaction),
565 Data::Buffer(bytes) => {
566 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
567 }
568 }
569 })?;
570
571 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await {
574 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
575 continue;
576 }
577
578 let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(transaction_id, transaction)
581 else {
582 debug!(
583 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
584 fmt_id(transaction_id)
585 );
586 continue;
587 };
588
589 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
592 debug!(
593 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
594 fmt_id(transaction_id)
595 );
596 continue;
597 };
598
599 if next_proposal_cost > BatchHeader::<N>::BATCH_SPEND_LIMIT {
601 trace!(
602 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
603 fmt_id(transaction_id),
604 BatchHeader::<N>::BATCH_SPEND_LIMIT
605 );
606
607 worker.insert_front(id, transmission);
609 break 'outer;
610 }
611
612 proposal_cost = next_proposal_cost;
614 }
615
616 (TransmissionID::Ratification, Transmission::Ratification) => continue,
619 _ => continue,
621 }
622
623 transmissions.insert(id, transmission);
625 num_worker_transmissions = num_worker_transmissions.saturating_add(1);
626 }
627 }
628
629 let current_timestamp = now();
631
632 *lock_guard = round;
633
634 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
636
637 let private_key = *self.gateway.account().private_key();
639 let committee_id = committee_lookback.id();
641 let transmission_ids = transmissions.keys().copied().collect();
643 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
645 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
647 &private_key,
648 round,
649 current_timestamp,
650 committee_id,
651 transmission_ids,
652 previous_certificate_ids,
653 &mut rand::thread_rng()
654 ))
655 .and_then(|batch_header| {
656 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
657 .map(|proposal| (batch_header, proposal))
658 })
659 .inspect_err(|_| {
660 if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
662 error!("Failed to reinsert transmissions: {e:?}");
663 }
664 })?;
665 self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
667 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
669 *self.proposed_batch.write() = Some(proposal);
671 Ok(())
672 }
673
674 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
684 let BatchPropose { round: batch_round, batch_header } = batch_propose;
685
686 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
688 if batch_round != batch_header.round() {
690 self.gateway.disconnect(peer_ip);
692 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
693 }
694
695 let batch_author = batch_header.author();
697
698 match self.gateway.resolver().get_address(peer_ip) {
700 Some(address) => {
702 if address != batch_author {
703 self.gateway.disconnect(peer_ip);
705 bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
706 }
707 }
708 None => bail!("Batch proposal from a disconnected validator"),
709 }
710 if !self.gateway.is_authorized_validator_address(batch_author) {
712 self.gateway.disconnect(peer_ip);
714 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
715 }
716 if self.gateway.account().address() == batch_author {
718 bail!("Invalid peer - proposed batch from myself ({batch_author})");
719 }
720
721 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
723 if expected_committee_id != batch_header.committee_id() {
724 self.gateway.disconnect(peer_ip);
726 bail!(
727 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
728 batch_header.committee_id()
729 );
730 }
731
732 if let Some((signed_round, signed_batch_id, signature)) =
734 self.signed_proposals.read().get(&batch_author).copied()
735 {
736 if signed_round > batch_header.round() {
739 bail!(
740 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
741 batch_header.round()
742 );
743 }
744
745 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
747 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
748 }
749 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
752 let gateway = self.gateway.clone();
753 tokio::spawn(async move {
754 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
755 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
756 if gateway.send(peer_ip, event).await.is_none() {
758 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
759 }
760 });
761 return Ok(());
763 }
764 }
765
766 if self.storage.contains_batch(batch_header.batch_id()) {
769 debug!(
770 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
771 format!("batch for round {batch_round} already exists in storage").dimmed()
772 );
773 return Ok(());
774 }
775
776 let previous_round = batch_round.saturating_sub(1);
778 if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
780 self.gateway.disconnect(peer_ip);
782 bail!("Malicious peer - {e} from '{peer_ip}'");
783 }
784
785 if batch_header.contains(TransmissionID::Ratification) {
787 self.gateway.disconnect(peer_ip);
789 bail!(
790 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
791 );
792 }
793
794 let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
796
797 if let Err(err) = cfg_iter_mut!(missing_transmissions).try_for_each(|(transmission_id, transmission)| {
799 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
801 }) {
802 debug!("Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission - {err}",);
803 return Ok(());
804 }
805
806 if let Err(e) = self.ensure_is_signing_round(batch_round) {
810 debug!("{e} from '{peer_ip}'");
812 return Ok(());
813 }
814
815 let (storage, header) = (self.storage.clone(), batch_header.clone());
817 let missing_transmissions =
818 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
819 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
821
822 let block_height = self.ledger.latest_block_height() + 1;
824 if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
825 let mut proposal_cost = 0u64;
826 for transmission_id in batch_header.transmission_ids() {
827 let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
828 let Some(worker) = self.workers.get(worker_id as usize) else {
829 debug!("Unable to find worker {worker_id}");
830 return Ok(());
831 };
832
833 let Some(transmission) = worker.get_transmission(*transmission_id) else {
834 debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
835 return Ok(());
836 };
837
838 if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
840 (transmission_id, transmission)
841 {
842 let transaction = spawn_blocking!({
844 match transaction {
845 Data::Object(transaction) => Ok(transaction),
846 Data::Buffer(bytes) => {
847 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
848 }
849 }
850 })?;
851
852 let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(*transaction_id, transaction)
855 else {
856 bail!(
857 "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
858 fmt_id(transaction_id)
859 )
860 };
861
862 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
865 bail!(
866 "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
867 fmt_id(transaction_id)
868 )
869 };
870
871 if next_proposal_cost > BatchHeader::<N>::BATCH_SPEND_LIMIT {
873 bail!(
874 "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
875 fmt_id(transaction_id),
876 BatchHeader::<N>::BATCH_SPEND_LIMIT
877 );
878 }
879
880 proposal_cost = next_proposal_cost;
882 }
883 }
884 }
885
886 let batch_id = batch_header.batch_id();
890 let account = self.gateway.account().clone();
892 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
893
894 match self.signed_proposals.write().0.entry(batch_author) {
900 std::collections::hash_map::Entry::Occupied(mut entry) => {
901 if entry.get().0 == batch_round {
906 return Ok(());
907 }
908 entry.insert((batch_round, batch_id, signature));
910 }
911 std::collections::hash_map::Entry::Vacant(entry) => {
913 entry.insert((batch_round, batch_id, signature));
915 }
916 };
917
918 let self_ = self.clone();
920 tokio::spawn(async move {
921 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
922 if self_.gateway.send(peer_ip, event).await.is_some() {
924 debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
925 }
926 });
927
928 Ok(())
929 }
930
931 async fn process_batch_signature_from_peer(
940 &self,
941 peer_ip: SocketAddr,
942 batch_signature: BatchSignature<N>,
943 ) -> Result<()> {
944 self.check_proposed_batch_for_expiration().await?;
946
947 let BatchSignature { batch_id, signature } = batch_signature;
949
950 let signer = signature.to_address();
952
953 if self.gateway.resolver().get_address(peer_ip).map_or(true, |address| address != signer) {
955 self.gateway.disconnect(peer_ip);
957 bail!("Malicious peer - batch signature is from a different validator ({signer})");
958 }
959 if self.gateway.account().address() == signer {
961 bail!("Invalid peer - received a batch signature from myself ({signer})");
962 }
963
964 let self_ = self.clone();
965 let Some(proposal) = spawn_blocking!({
966 let mut proposed_batch = self_.proposed_batch.write();
968 match proposed_batch.as_mut() {
970 Some(proposal) => {
971 if proposal.batch_id() != batch_id {
973 match self_.storage.contains_batch(batch_id) {
974 true => {
976 debug!(
977 "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
978 proposal.round()
979 );
980 return Ok(None);
981 }
982 false => bail!(
984 "Unknown batch ID '{batch_id}', expected '{}' for round {}",
985 proposal.batch_id(),
986 proposal.round()
987 ),
988 }
989 }
990 let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
992 let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
994 bail!("Signature is from a disconnected validator");
995 };
996 proposal.add_signature(signer, signature, &committee_lookback)?;
998 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
999 if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1001 return Ok(None);
1003 }
1004 }
1005 None => return Ok(None),
1007 };
1008 match proposed_batch.take() {
1010 Some(proposal) => Ok(Some(proposal)),
1011 None => Ok(None),
1012 }
1013 })?
1014 else {
1015 return Ok(());
1016 };
1017
1018 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1021
1022 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1024 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1027 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1029 return Err(e);
1030 }
1031
1032 #[cfg(feature = "metrics")]
1033 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1034 Ok(())
1035 }
1036
1037 async fn process_batch_certificate_from_peer(
1044 &self,
1045 peer_ip: SocketAddr,
1046 certificate: BatchCertificate<N>,
1047 ) -> Result<()> {
1048 if !self.gateway.is_authorized_validator_ip(peer_ip) {
1050 self.gateway.disconnect(peer_ip);
1052 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1053 }
1054 if self.storage.contains_certificate(certificate.id()) {
1056 return Ok(());
1057 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1059 self.storage.insert_unprocessed_certificate(certificate.clone())?;
1060 }
1061
1062 let author = certificate.author();
1064 let certificate_round = certificate.round();
1066 let committee_id = certificate.committee_id();
1068
1069 if self.gateway.account().address() == author {
1071 bail!("Received a batch certificate for myself ({author})");
1072 }
1073
1074 self.storage.check_incoming_certificate(&certificate)?;
1076
1077 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1089
1090 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1095
1096 let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1098 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1100
1101 let expected_committee_id = committee_lookback.id();
1103 if expected_committee_id != committee_id {
1104 self.gateway.disconnect(peer_ip);
1106 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1107 }
1108
1109 let should_advance = match &*self.proposed_batch.read() {
1113 Some(proposal) => proposal.round() < certificate_round,
1115 None => true,
1117 };
1118
1119 let current_round = self.current_round();
1121
1122 if is_quorum && should_advance && certificate_round >= current_round {
1124 self.try_increment_to_the_next_round(current_round + 1).await?;
1126 }
1127 Ok(())
1128 }
1129}
1130
1131impl<N: Network> Primary<N> {
1132 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1134 let PrimaryReceiver {
1135 mut rx_batch_propose,
1136 mut rx_batch_signature,
1137 mut rx_batch_certified,
1138 mut rx_primary_ping,
1139 mut rx_unconfirmed_solution,
1140 mut rx_unconfirmed_transaction,
1141 } = primary_receiver;
1142
1143 let self_ = self.clone();
1145 self.spawn(async move {
1146 loop {
1147 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1149
1150 let self__ = self_.clone();
1152 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1153 Ok(block_locators) => block_locators,
1154 Err(e) => {
1155 warn!("Failed to retrieve block locators - {e}");
1156 continue;
1157 }
1158 };
1159
1160 let primary_certificate = {
1162 let primary_address = self_.gateway.account().address();
1164
1165 let mut certificate = None;
1167 let mut current_round = self_.current_round();
1168 while certificate.is_none() {
1169 if current_round == 0 {
1171 break;
1172 }
1173 if let Some(primary_certificate) =
1175 self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1176 {
1177 certificate = Some(primary_certificate);
1178 } else {
1180 current_round = current_round.saturating_sub(1);
1181 }
1182 }
1183
1184 match certificate {
1186 Some(certificate) => certificate,
1187 None => continue,
1189 }
1190 };
1191
1192 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1194 self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1196 }
1197 });
1198
1199 let self_ = self.clone();
1201 self.spawn(async move {
1202 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1203 if !self_.sync.is_synced() {
1205 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1206 continue;
1207 }
1208
1209 {
1211 let self_ = self_.clone();
1212 tokio::spawn(async move {
1213 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1215 else {
1216 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1217 return;
1218 };
1219 let id = fmt_id(primary_certificate.id());
1221 let round = primary_certificate.round();
1222 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1223 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1224 }
1225 });
1226 }
1227 }
1228 });
1229
1230 let self_ = self.clone();
1232 self.spawn(async move {
1233 loop {
1234 tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1235 if !self_.sync.is_synced() {
1237 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1238 continue;
1239 }
1240 for worker in self_.workers.iter() {
1242 worker.broadcast_ping();
1243 }
1244 }
1245 });
1246
1247 let self_ = self.clone();
1249 self.spawn(async move {
1250 loop {
1251 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1253 let current_round = self_.current_round();
1254 if !self_.sync.is_synced() {
1256 debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1257 continue;
1258 }
1259 if self_.propose_lock.try_lock().is_err() {
1262 trace!(
1263 "Skipping batch proposal for round {current_round} {}",
1264 "(node is already proposing)".dimmed()
1265 );
1266 continue;
1267 };
1268 if let Err(e) = self_.propose_batch().await {
1272 warn!("Cannot propose a batch - {e}");
1273 }
1274 }
1275 });
1276
1277 let self_ = self.clone();
1279 self.spawn(async move {
1280 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1281 if !self_.sync.is_synced() {
1283 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1284 continue;
1285 }
1286 let self_ = self_.clone();
1288 tokio::spawn(async move {
1289 let round = batch_propose.round;
1291 if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1292 warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1293 }
1294 });
1295 }
1296 });
1297
1298 let self_ = self.clone();
1300 self.spawn(async move {
1301 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1302 if !self_.sync.is_synced() {
1304 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1305 continue;
1306 }
1307 let id = fmt_id(batch_signature.batch_id);
1313 if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1314 warn!("Cannot store a signature for batch '{id}' from '{peer_ip}' - {e}");
1315 }
1316 }
1317 });
1318
1319 let self_ = self.clone();
1321 self.spawn(async move {
1322 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1323 if !self_.sync.is_synced() {
1325 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1326 continue;
1327 }
1328 let self_ = self_.clone();
1330 tokio::spawn(async move {
1331 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1333 warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1334 return;
1335 };
1336 let id = fmt_id(batch_certificate.id());
1338 let round = batch_certificate.round();
1339 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1340 warn!("Cannot store a certificate '{id}' for round {round} from '{peer_ip}' - {e}");
1341 }
1342 });
1343 }
1344 });
1345
1346 let self_ = self.clone();
1350 self.spawn(async move {
1351 loop {
1352 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1354 if !self_.sync.is_synced() {
1356 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1357 continue;
1358 }
1359 let next_round = self_.current_round().saturating_add(1);
1361 let is_quorum_threshold_reached = {
1363 let authors = self_.storage.get_certificate_authors_for_round(next_round);
1365 if authors.is_empty() {
1367 continue;
1368 }
1369 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
1370 warn!("Failed to retrieve the committee lookback for round {next_round}");
1371 continue;
1372 };
1373 committee_lookback.is_quorum_threshold_reached(&authors)
1374 };
1375 if is_quorum_threshold_reached {
1377 debug!("Quorum threshold reached for round {}", next_round);
1378 if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1379 warn!("Failed to increment to the next round - {e}");
1380 }
1381 }
1382 }
1383 });
1384
1385 let self_ = self.clone();
1387 self.spawn(async move {
1388 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1389 let Ok(checksum) = solution.to_checksum::<N>() else {
1391 error!("Failed to compute the checksum for the unconfirmed solution");
1392 continue;
1393 };
1394 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1396 error!("Unable to determine the worker ID for the unconfirmed solution");
1397 continue;
1398 };
1399 let self_ = self_.clone();
1400 tokio::spawn(async move {
1401 let worker = &self_.workers[worker_id as usize];
1403 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1405 callback.send(result).ok();
1407 });
1408 }
1409 });
1410
1411 let self_ = self.clone();
1413 self.spawn(async move {
1414 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1415 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1416 let Ok(checksum) = transaction.to_checksum::<N>() else {
1418 error!("Failed to compute the checksum for the unconfirmed transaction");
1419 continue;
1420 };
1421 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1423 error!("Unable to determine the worker ID for the unconfirmed transaction");
1424 continue;
1425 };
1426 let self_ = self_.clone();
1427 tokio::spawn(async move {
1428 let worker = &self_.workers[worker_id as usize];
1430 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1432 callback.send(result).ok();
1434 });
1435 }
1436 });
1437 }
1438
1439 async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1441 let is_expired = match self.proposed_batch.read().as_ref() {
1443 Some(proposal) => proposal.round() < self.current_round(),
1444 None => false,
1445 };
1446 if is_expired {
1448 let proposal = self.proposed_batch.write().take();
1450 if let Some(proposal) = proposal {
1451 debug!("Cleared expired proposal for round {}", proposal.round());
1452 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1453 }
1454 }
1455 Ok(())
1456 }
1457
1458 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1460 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1462 let mut fast_forward_round = self.current_round();
1463 while fast_forward_round < next_round.saturating_sub(1) {
1465 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1467 *self.proposed_batch.write() = None;
1469 }
1470 }
1471
1472 let current_round = self.current_round();
1474 if current_round < next_round {
1476 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1478 match bft_sender.send_primary_round_to_bft(current_round).await {
1479 Ok(is_ready) => is_ready,
1480 Err(e) => {
1481 warn!("Failed to update the BFT to the next round - {e}");
1482 return Err(e);
1483 }
1484 }
1485 }
1486 else {
1488 self.storage.increment_to_next_round(current_round)?;
1490 true
1492 };
1493
1494 match is_ready {
1496 true => debug!("Primary is ready to propose the next round"),
1497 false => debug!("Primary is not ready to propose the next round"),
1498 }
1499
1500 if is_ready {
1502 self.propose_batch().await?;
1503 }
1504 }
1505 Ok(())
1506 }
1507
1508 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1512 let current_round = self.current_round();
1514 if current_round + self.storage.max_gc_rounds() <= batch_round {
1516 bail!("Round {batch_round} is too far in the future")
1517 }
1518 if current_round > batch_round + 1 {
1522 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1523 }
1524 if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1526 if signing_round > batch_round {
1527 bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1528 }
1529 }
1530 Ok(())
1531 }
1532
1533 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1536 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1538 Some(certificate) => certificate.timestamp(),
1540 None => match self.gateway.account().address() == author {
1541 true => *self.latest_proposed_batch_timestamp.read(),
1543 false => return Ok(()),
1545 },
1546 };
1547
1548 let elapsed = timestamp
1550 .checked_sub(previous_timestamp)
1551 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1552 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1554 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1555 false => Ok(()),
1556 }
1557 }
1558
1559 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1561 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1563 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1566 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1568 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1569 debug!("Stored a batch certificate for round {}", certificate.round());
1570 if let Some(bft_sender) = self.bft_sender.get() {
1572 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1574 warn!("Failed to update the BFT DAG from primary - {e}");
1575 return Err(e);
1576 };
1577 }
1578 self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1580 let num_transmissions = certificate.transmission_ids().len();
1582 let round = certificate.round();
1583 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1584 self.try_increment_to_the_next_round(round + 1).await
1586 }
1587
1588 fn insert_missing_transmissions_into_workers(
1590 &self,
1591 peer_ip: SocketAddr,
1592 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1593 ) -> Result<()> {
1594 assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1596 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1597 })
1598 }
1599
1600 fn reinsert_transmissions_into_workers(
1602 &self,
1603 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1604 ) -> Result<()> {
1605 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1607 worker.reinsert(transmission_id, transmission);
1608 })
1609 }
1610
1611 #[async_recursion::async_recursion]
1621 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1622 &self,
1623 peer_ip: SocketAddr,
1624 certificate: BatchCertificate<N>,
1625 ) -> Result<()> {
1626 let batch_header = certificate.batch_header();
1628 let batch_round = batch_header.round();
1630
1631 if batch_round <= self.storage.gc_round() {
1633 return Ok(());
1634 }
1635 if self.storage.contains_certificate(certificate.id()) {
1637 return Ok(());
1638 }
1639
1640 if !IS_SYNCING && !self.is_synced() {
1642 bail!(
1643 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1644 fmt_id(certificate.id())
1645 );
1646 }
1647
1648 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1650
1651 if !self.storage.contains_certificate(certificate.id()) {
1653 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1655 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1656 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1657 if let Some(bft_sender) = self.bft_sender.get() {
1659 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1661 warn!("Failed to update the BFT DAG from sync: {e}");
1662 return Err(e);
1663 };
1664 }
1665 }
1666 Ok(())
1667 }
1668
1669 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1671 &self,
1672 peer_ip: SocketAddr,
1673 batch_header: &BatchHeader<N>,
1674 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1675 let batch_round = batch_header.round();
1677
1678 if batch_round <= self.storage.gc_round() {
1680 bail!("Round {batch_round} is too far in the past")
1681 }
1682
1683 if !IS_SYNCING && !self.is_synced() {
1685 bail!(
1686 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1687 fmt_id(batch_header.batch_id())
1688 );
1689 }
1690
1691 let is_quorum_threshold_reached = {
1693 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1694 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1695 committee_lookback.is_quorum_threshold_reached(&authors)
1696 };
1697
1698 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1703 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1705 if is_behind_schedule || is_peer_far_in_future {
1707 self.try_increment_to_the_next_round(batch_round).await?;
1709 }
1710
1711 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1713
1714 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1716
1717 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1719 missing_transmissions_handle,
1720 missing_previous_certificates_handle,
1721 ).map_err(|e| {
1722 anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1723 })?;
1724
1725 for batch_certificate in missing_previous_certificates {
1727 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1729 }
1730 Ok(missing_transmissions)
1731 }
1732
1733 async fn fetch_missing_transmissions(
1736 &self,
1737 peer_ip: SocketAddr,
1738 batch_header: &BatchHeader<N>,
1739 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1740 if batch_header.round() <= self.storage.gc_round() {
1742 return Ok(Default::default());
1743 }
1744
1745 if self.storage.contains_batch(batch_header.batch_id()) {
1747 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1748 return Ok(Default::default());
1749 }
1750
1751 let workers = self.workers.clone();
1753
1754 let mut fetch_transmissions = FuturesUnordered::new();
1756
1757 let num_workers = self.num_workers();
1759 for transmission_id in batch_header.transmission_ids() {
1761 if !self.storage.contains_transmission(*transmission_id) {
1763 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1765 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1766 };
1767 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1769 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1771 }
1772 }
1773
1774 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1776 while let Some(result) = fetch_transmissions.next().await {
1778 let (transmission_id, transmission) = result?;
1780 transmissions.insert(transmission_id, transmission);
1782 }
1783 Ok(transmissions)
1785 }
1786
1787 async fn fetch_missing_previous_certificates(
1789 &self,
1790 peer_ip: SocketAddr,
1791 batch_header: &BatchHeader<N>,
1792 ) -> Result<HashSet<BatchCertificate<N>>> {
1793 let round = batch_header.round();
1795 if round == 1 || round <= self.storage.gc_round() + 1 {
1797 return Ok(Default::default());
1798 }
1799
1800 let missing_previous_certificates =
1802 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1803 if !missing_previous_certificates.is_empty() {
1804 debug!(
1805 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1806 missing_previous_certificates.len(),
1807 );
1808 }
1809 Ok(missing_previous_certificates)
1811 }
1812
1813 async fn fetch_missing_certificates(
1815 &self,
1816 peer_ip: SocketAddr,
1817 round: u64,
1818 certificate_ids: &IndexSet<Field<N>>,
1819 ) -> Result<HashSet<BatchCertificate<N>>> {
1820 let mut fetch_certificates = FuturesUnordered::new();
1822 let mut missing_certificates = HashSet::default();
1824 for certificate_id in certificate_ids {
1826 if self.ledger.contains_certificate(certificate_id)? {
1828 continue;
1829 }
1830 if self.storage.contains_certificate(*certificate_id) {
1832 continue;
1833 }
1834 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1836 missing_certificates.insert(certificate);
1837 } else {
1838 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1840 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1843 }
1844 }
1845
1846 match fetch_certificates.is_empty() {
1848 true => return Ok(missing_certificates),
1849 false => trace!(
1850 "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1851 fetch_certificates.len(),
1852 ),
1853 }
1854
1855 while let Some(result) = fetch_certificates.next().await {
1857 missing_certificates.insert(result?);
1859 }
1860 Ok(missing_certificates)
1862 }
1863}
1864
1865impl<N: Network> Primary<N> {
1866 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1868 self.handles.lock().push(tokio::spawn(future));
1869 }
1870
1871 pub async fn shut_down(&self) {
1873 info!("Shutting down the primary...");
1874 self.workers.iter().for_each(|worker| worker.shut_down());
1876 self.handles.lock().iter().for_each(|handle| handle.abort());
1878 let proposal_cache = {
1880 let proposal = self.proposed_batch.write().take();
1881 let signed_proposals = self.signed_proposals.read().clone();
1882 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1883 let pending_certificates = self.storage.get_pending_certificates();
1884 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1885 };
1886 if let Err(err) = proposal_cache.store(&self.storage_mode) {
1887 error!("Failed to store the current proposal cache: {err}");
1888 }
1889 self.gateway.shut_down().await;
1891 }
1892}
1893
1894#[cfg(test)]
1895mod tests {
1896 use super::*;
1897 use snarkos_node_bft_ledger_service::MockLedgerService;
1898 use snarkos_node_bft_storage_service::BFTMemoryService;
1899 use snarkvm::{
1900 ledger::{
1901 committee::{Committee, MIN_VALIDATOR_STAKE},
1902 ledger_test_helpers::sample_execution_transaction_with_fee,
1903 },
1904 prelude::{Address, Signature},
1905 };
1906
1907 use bytes::Bytes;
1908 use indexmap::IndexSet;
1909 use rand::RngCore;
1910
1911 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1912
1913 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
1914 const COMMITTEE_SIZE: usize = 4;
1916 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1917 let mut members = IndexMap::new();
1918
1919 for i in 0..COMMITTEE_SIZE {
1920 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1921 let account = Account::new(rng).unwrap();
1922
1923 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1924 accounts.push((socket_addr, account));
1925 }
1926
1927 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1928 }
1929
1930 fn primary_with_committee(
1932 account_index: usize,
1933 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1934 committee: Committee<CurrentNetwork>,
1935 height: u32,
1936 ) -> Primary<CurrentNetwork> {
1937 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
1938 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1939
1940 let account = accounts[account_index].1.clone();
1942 let mut primary = Primary::new(account, storage, ledger, None, &[], StorageMode::new_test(None)).unwrap();
1943
1944 primary.workers = Arc::from([Worker::new(
1946 0, Arc::new(primary.gateway.clone()),
1948 primary.storage.clone(),
1949 primary.ledger.clone(),
1950 primary.proposed_batch.clone(),
1951 )
1952 .unwrap()]);
1953 for a in accounts.iter().skip(account_index) {
1954 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
1955 }
1956
1957 primary
1958 }
1959
1960 fn primary_without_handlers(
1961 rng: &mut TestRng,
1962 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1963 let (accounts, committee) = sample_committee(rng);
1964 let primary = primary_with_committee(
1965 0, &accounts,
1967 committee,
1968 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
1969 );
1970
1971 (primary, accounts)
1972 }
1973
1974 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1976 let solution_id = rng.gen::<u64>().into();
1978 let size = rng.gen_range(1024..10 * 1024);
1980 let mut vec = vec![0u8; size];
1982 rng.fill_bytes(&mut vec);
1983 let solution = Data::Buffer(Bytes::from(vec));
1984 (solution_id, solution)
1986 }
1987
1988 fn sample_unconfirmed_transaction(
1990 rng: &mut TestRng,
1991 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1992 let transaction = sample_execution_transaction_with_fee(false, rng);
1993 let id = transaction.id();
1994
1995 (id, Data::Object(transaction))
1996 }
1997
1998 fn create_test_proposal(
2000 author: &Account<CurrentNetwork>,
2001 committee: Committee<CurrentNetwork>,
2002 round: u64,
2003 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2004 timestamp: i64,
2005 num_transactions: u64,
2006 rng: &mut TestRng,
2007 ) -> Proposal<CurrentNetwork> {
2008 let mut transmission_ids = IndexSet::new();
2009 let mut transmissions = IndexMap::new();
2010
2011 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2013 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2014 let solution_transmission_id = (solution_id, solution_checksum).into();
2015 transmission_ids.insert(solution_transmission_id);
2016 transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2017
2018 for _ in 0..num_transactions {
2020 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2021 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2022 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2023 transmission_ids.insert(transaction_transmission_id);
2024 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2025 }
2026
2027 let private_key = author.private_key();
2029 let batch_header = BatchHeader::new(
2031 private_key,
2032 round,
2033 timestamp,
2034 committee.id(),
2035 transmission_ids,
2036 previous_certificate_ids,
2037 rng,
2038 )
2039 .unwrap();
2040 Proposal::new(committee, batch_header, transmissions).unwrap()
2042 }
2043
2044 fn peer_signatures_for_proposal(
2047 primary: &Primary<CurrentNetwork>,
2048 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2049 rng: &mut TestRng,
2050 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2051 let mut signatures = Vec::with_capacity(accounts.len() - 1);
2053 for (socket_addr, account) in accounts {
2054 if account.address() == primary.gateway.account().address() {
2055 continue;
2056 }
2057 let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2058 let signature = account.sign(&[batch_id], rng).unwrap();
2059 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2060 }
2061
2062 signatures
2063 }
2064
2065 fn peer_signatures_for_batch(
2067 primary_address: Address<CurrentNetwork>,
2068 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2069 batch_id: Field<CurrentNetwork>,
2070 rng: &mut TestRng,
2071 ) -> IndexSet<Signature<CurrentNetwork>> {
2072 let mut signatures = IndexSet::new();
2073 for (_, account) in accounts {
2074 if account.address() == primary_address {
2075 continue;
2076 }
2077 let signature = account.sign(&[batch_id], rng).unwrap();
2078 signatures.insert(signature);
2079 }
2080 signatures
2081 }
2082
2083 fn create_batch_certificate(
2085 primary_address: Address<CurrentNetwork>,
2086 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2087 round: u64,
2088 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2089 rng: &mut TestRng,
2090 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2091 let timestamp = now();
2092
2093 let author =
2094 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2095 let private_key = author.private_key();
2096
2097 let committee_id = Field::rand(rng);
2098 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2099 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2100 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2101 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2102
2103 let solution_transmission_id = (solution_id, solution_checksum).into();
2104 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2105
2106 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2107 let transmissions = [
2108 (solution_transmission_id, Transmission::Solution(solution)),
2109 (transaction_transmission_id, Transmission::Transaction(transaction)),
2110 ]
2111 .into();
2112
2113 let batch_header = BatchHeader::new(
2114 private_key,
2115 round,
2116 timestamp,
2117 committee_id,
2118 transmission_ids,
2119 previous_certificate_ids,
2120 rng,
2121 )
2122 .unwrap();
2123 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2124 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2125 (certificate, transmissions)
2126 }
2127
2128 fn store_certificate_chain(
2130 primary: &Primary<CurrentNetwork>,
2131 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2132 round: u64,
2133 rng: &mut TestRng,
2134 ) -> IndexSet<Field<CurrentNetwork>> {
2135 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2136 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2137 for cur_round in 1..round {
2138 for (_, account) in accounts.iter() {
2139 let (certificate, transmissions) = create_batch_certificate(
2140 account.address(),
2141 accounts,
2142 cur_round,
2143 previous_certificates.clone(),
2144 rng,
2145 );
2146 next_certificates.insert(certificate.id());
2147 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2148 }
2149
2150 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2151 previous_certificates = next_certificates;
2152 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2153 }
2154
2155 previous_certificates
2156 }
2157
2158 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2161 for (addr, acct) in accounts.iter().skip(1) {
2163 primary.gateway.resolver().insert_peer(*addr, *addr, acct.address());
2164 }
2165 }
2166
2167 #[tokio::test]
2168 async fn test_propose_batch() {
2169 let mut rng = TestRng::default();
2170 let (primary, _) = primary_without_handlers(&mut rng);
2171
2172 assert!(primary.proposed_batch.read().is_none());
2174
2175 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2177 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2178
2179 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2181 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2182
2183 assert!(primary.propose_batch().await.is_ok());
2185 assert!(primary.proposed_batch.read().is_some());
2186 }
2187
2188 #[tokio::test]
2189 async fn test_propose_batch_with_no_transmissions() {
2190 let mut rng = TestRng::default();
2191 let (primary, _) = primary_without_handlers(&mut rng);
2192
2193 assert!(primary.proposed_batch.read().is_none());
2195
2196 assert!(primary.propose_batch().await.is_ok());
2198 assert!(primary.proposed_batch.read().is_some());
2199 }
2200
2201 #[tokio::test]
2202 async fn test_propose_batch_in_round() {
2203 let round = 3;
2204 let mut rng = TestRng::default();
2205 let (primary, accounts) = primary_without_handlers(&mut rng);
2206
2207 store_certificate_chain(&primary, &accounts, round, &mut rng);
2209
2210 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2212
2213 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2215 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2216
2217 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2219 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2220
2221 assert!(primary.propose_batch().await.is_ok());
2223 assert!(primary.proposed_batch.read().is_some());
2224 }
2225
2226 #[tokio::test]
2227 async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2228 let round = 3;
2229 let prev_round = round - 1;
2230 let mut rng = TestRng::default();
2231 let (primary, accounts) = primary_without_handlers(&mut rng);
2232 let peer_account = &accounts[1];
2233 let peer_ip = peer_account.0;
2234
2235 store_certificate_chain(&primary, &accounts, round, &mut rng);
2237
2238 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2240
2241 let mut num_transmissions_in_previous_round = 0;
2243
2244 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2246 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2247 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2248 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2249
2250 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2252 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2253
2254 assert_eq!(primary.workers[0].num_transmissions(), 2);
2256
2257 for (_, account) in accounts.iter() {
2259 let (certificate, transmissions) = create_batch_certificate(
2260 account.address(),
2261 &accounts,
2262 round,
2263 previous_certificate_ids.clone(),
2264 &mut rng,
2265 );
2266
2267 for (transmission_id, transmission) in transmissions.iter() {
2269 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2270 }
2271
2272 num_transmissions_in_previous_round += transmissions.len();
2274 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2275 }
2276
2277 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2279
2280 assert!(primary.storage.increment_to_next_round(round).is_ok());
2282
2283 assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2285
2286 assert!(primary.propose_batch().await.is_ok());
2288
2289 let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2291 assert_eq!(proposed_transmissions.len(), 2);
2292 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2293 assert!(
2294 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2295 );
2296 }
2297
2298 #[tokio::test]
2299 async fn test_propose_batch_over_spend_limit() {
2300 let mut rng = TestRng::default();
2301
2302 let (accounts, committee) = sample_committee(&mut rng);
2304 let primary = primary_with_committee(
2305 0,
2306 &accounts,
2307 committee.clone(),
2308 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2309 );
2310
2311 assert!(primary.proposed_batch.read().is_none());
2313 primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2315
2316 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2318 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2319
2320 for _i in 0..5 {
2321 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2322 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2324 }
2325
2326 assert!(primary.propose_batch().await.is_ok());
2328 assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2330 assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2332 }
2333
2334 #[tokio::test]
2335 async fn test_batch_propose_from_peer() {
2336 let mut rng = TestRng::default();
2337 let (primary, accounts) = primary_without_handlers(&mut rng);
2338
2339 let round = 1;
2341 let peer_account = &accounts[1];
2342 let peer_ip = peer_account.0;
2343 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2344 let proposal = create_test_proposal(
2345 &peer_account.1,
2346 primary.ledger.current_committee().unwrap(),
2347 round,
2348 Default::default(),
2349 timestamp,
2350 1,
2351 &mut rng,
2352 );
2353
2354 for (transmission_id, transmission) in proposal.transmissions() {
2356 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2357 }
2358
2359 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2361 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2363
2364 assert!(
2366 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2367 );
2368 }
2369
2370 #[tokio::test]
2371 async fn test_batch_propose_from_peer_when_not_synced() {
2372 let mut rng = TestRng::default();
2373 let (primary, accounts) = primary_without_handlers(&mut rng);
2374
2375 let round = 1;
2377 let peer_account = &accounts[1];
2378 let peer_ip = peer_account.0;
2379 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2380 let proposal = create_test_proposal(
2381 &peer_account.1,
2382 primary.ledger.current_committee().unwrap(),
2383 round,
2384 Default::default(),
2385 timestamp,
2386 1,
2387 &mut rng,
2388 );
2389
2390 for (transmission_id, transmission) in proposal.transmissions() {
2392 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2393 }
2394
2395 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2397
2398 assert!(
2400 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2401 );
2402 }
2403
2404 #[tokio::test]
2405 async fn test_batch_propose_from_peer_in_round() {
2406 let round = 2;
2407 let mut rng = TestRng::default();
2408 let (primary, accounts) = primary_without_handlers(&mut rng);
2409
2410 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2412
2413 let peer_account = &accounts[1];
2415 let peer_ip = peer_account.0;
2416 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2417 let proposal = create_test_proposal(
2418 &peer_account.1,
2419 primary.ledger.current_committee().unwrap(),
2420 round,
2421 previous_certificates,
2422 timestamp,
2423 1,
2424 &mut rng,
2425 );
2426
2427 for (transmission_id, transmission) in proposal.transmissions() {
2429 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2430 }
2431
2432 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2434 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2436
2437 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2439 }
2440
2441 #[tokio::test]
2442 async fn test_batch_propose_from_peer_wrong_round() {
2443 let mut rng = TestRng::default();
2444 let (primary, accounts) = primary_without_handlers(&mut rng);
2445
2446 let round = 1;
2448 let peer_account = &accounts[1];
2449 let peer_ip = peer_account.0;
2450 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2451 let proposal = create_test_proposal(
2452 &peer_account.1,
2453 primary.ledger.current_committee().unwrap(),
2454 round,
2455 Default::default(),
2456 timestamp,
2457 1,
2458 &mut rng,
2459 );
2460
2461 for (transmission_id, transmission) in proposal.transmissions() {
2463 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2464 }
2465
2466 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2468 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2470
2471 assert!(
2473 primary
2474 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2475 round: round + 1,
2476 batch_header: Data::Object(proposal.batch_header().clone())
2477 })
2478 .await
2479 .is_err()
2480 );
2481 }
2482
2483 #[tokio::test]
2484 async fn test_batch_propose_from_peer_in_round_wrong_round() {
2485 let round = 4;
2486 let mut rng = TestRng::default();
2487 let (primary, accounts) = primary_without_handlers(&mut rng);
2488
2489 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2491
2492 let peer_account = &accounts[1];
2494 let peer_ip = peer_account.0;
2495 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2496 let proposal = create_test_proposal(
2497 &peer_account.1,
2498 primary.ledger.current_committee().unwrap(),
2499 round,
2500 previous_certificates,
2501 timestamp,
2502 1,
2503 &mut rng,
2504 );
2505
2506 for (transmission_id, transmission) in proposal.transmissions() {
2508 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2509 }
2510
2511 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2513 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2515
2516 assert!(
2518 primary
2519 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2520 round: round + 1,
2521 batch_header: Data::Object(proposal.batch_header().clone())
2522 })
2523 .await
2524 .is_err()
2525 );
2526 }
2527
2528 #[tokio::test]
2529 async fn test_batch_propose_from_peer_with_past_timestamp() {
2530 let round = 2;
2531 let mut rng = TestRng::default();
2532 let (primary, accounts) = primary_without_handlers(&mut rng);
2533
2534 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2536
2537 let peer_account = &accounts[1];
2539 let peer_ip = peer_account.0;
2540 let past_timestamp = now() - 100; let proposal = create_test_proposal(
2542 &peer_account.1,
2543 primary.ledger.current_committee().unwrap(),
2544 round,
2545 previous_certificates,
2546 past_timestamp,
2547 1,
2548 &mut rng,
2549 );
2550
2551 for (transmission_id, transmission) in proposal.transmissions() {
2553 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2554 }
2555
2556 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2558 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2560
2561 assert!(
2563 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2564 );
2565 }
2566
2567 #[tokio::test]
2568 async fn test_batch_propose_from_peer_over_spend_limit() {
2569 let mut rng = TestRng::default();
2570
2571 let (accounts, committee) = sample_committee(&mut rng);
2573 let primary_v4 = primary_with_committee(
2574 0,
2575 &accounts,
2576 committee.clone(),
2577 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2578 );
2579 let primary_v5 = primary_with_committee(
2580 1,
2581 &accounts,
2582 committee.clone(),
2583 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2584 );
2585
2586 let round = 1;
2588 let peer_account = &accounts[2];
2589 let peer_ip = peer_account.0;
2590 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2591 let proposal =
2592 create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2593
2594 for (transmission_id, transmission) in proposal.transmissions() {
2596 primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2597 primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2598 }
2599
2600 primary_v4.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2602 primary_v5.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2603 primary_v4.sync.block_sync().try_block_sync(&primary_v4.gateway.clone()).await;
2605 primary_v5.sync.block_sync().try_block_sync(&primary_v5.gateway.clone()).await;
2606
2607 assert!(
2609 primary_v4
2610 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2611 .await
2612 .is_ok()
2613 );
2614 assert!(
2615 primary_v5
2616 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2617 .await
2618 .is_err()
2619 );
2620 }
2621
2622 #[tokio::test]
2623 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2624 let round = 3;
2625 let mut rng = TestRng::default();
2626 let (primary, _) = primary_without_handlers(&mut rng);
2627
2628 assert!(primary.proposed_batch.read().is_none());
2630
2631 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2633 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2634
2635 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2637 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2638
2639 let old_proposal_lock_round = *primary.propose_lock.lock().await;
2641 *primary.propose_lock.lock().await = round + 1;
2642
2643 assert!(primary.propose_batch().await.is_ok());
2645 assert!(primary.proposed_batch.read().is_none());
2646
2647 *primary.propose_lock.lock().await = old_proposal_lock_round;
2649
2650 assert!(primary.propose_batch().await.is_ok());
2652 assert!(primary.proposed_batch.read().is_some());
2653 }
2654
2655 #[tokio::test]
2656 async fn test_propose_batch_with_storage_round_behind_proposal() {
2657 let round = 5;
2658 let mut rng = TestRng::default();
2659 let (primary, accounts) = primary_without_handlers(&mut rng);
2660
2661 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2663
2664 let timestamp = now();
2666 let proposal = create_test_proposal(
2667 primary.gateway.account(),
2668 primary.ledger.current_committee().unwrap(),
2669 round + 1,
2670 previous_certificates,
2671 timestamp,
2672 1,
2673 &mut rng,
2674 );
2675
2676 *primary.proposed_batch.write() = Some(proposal);
2678
2679 assert!(primary.propose_batch().await.is_ok());
2681 assert!(primary.proposed_batch.read().is_some());
2682 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2683 }
2684
2685 #[tokio::test(flavor = "multi_thread")]
2686 async fn test_batch_signature_from_peer() {
2687 let mut rng = TestRng::default();
2688 let (primary, accounts) = primary_without_handlers(&mut rng);
2689 map_account_addresses(&primary, &accounts);
2690
2691 let round = 1;
2693 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2694 let proposal = create_test_proposal(
2695 primary.gateway.account(),
2696 primary.ledger.current_committee().unwrap(),
2697 round,
2698 Default::default(),
2699 timestamp,
2700 1,
2701 &mut rng,
2702 );
2703
2704 *primary.proposed_batch.write() = Some(proposal);
2706
2707 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2709
2710 for (socket_addr, signature) in signatures {
2712 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2713 }
2714
2715 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2717 assert_eq!(primary.current_round(), round + 1);
2719 }
2720
2721 #[tokio::test(flavor = "multi_thread")]
2722 async fn test_batch_signature_from_peer_in_round() {
2723 let round = 5;
2724 let mut rng = TestRng::default();
2725 let (primary, accounts) = primary_without_handlers(&mut rng);
2726 map_account_addresses(&primary, &accounts);
2727
2728 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2730
2731 let timestamp = now();
2733 let proposal = create_test_proposal(
2734 primary.gateway.account(),
2735 primary.ledger.current_committee().unwrap(),
2736 round,
2737 previous_certificates,
2738 timestamp,
2739 1,
2740 &mut rng,
2741 );
2742
2743 *primary.proposed_batch.write() = Some(proposal);
2745
2746 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2748
2749 for (socket_addr, signature) in signatures {
2751 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2752 }
2753
2754 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2756 assert_eq!(primary.current_round(), round + 1);
2758 }
2759
2760 #[tokio::test]
2761 async fn test_batch_signature_from_peer_no_quorum() {
2762 let mut rng = TestRng::default();
2763 let (primary, accounts) = primary_without_handlers(&mut rng);
2764 map_account_addresses(&primary, &accounts);
2765
2766 let round = 1;
2768 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2769 let proposal = create_test_proposal(
2770 primary.gateway.account(),
2771 primary.ledger.current_committee().unwrap(),
2772 round,
2773 Default::default(),
2774 timestamp,
2775 1,
2776 &mut rng,
2777 );
2778
2779 *primary.proposed_batch.write() = Some(proposal);
2781
2782 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2784
2785 let (socket_addr, signature) = signatures.first().unwrap();
2787 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2788
2789 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2791 assert_eq!(primary.current_round(), round);
2793 }
2794
2795 #[tokio::test]
2796 async fn test_batch_signature_from_peer_in_round_no_quorum() {
2797 let round = 7;
2798 let mut rng = TestRng::default();
2799 let (primary, accounts) = primary_without_handlers(&mut rng);
2800 map_account_addresses(&primary, &accounts);
2801
2802 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2804
2805 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2807 let proposal = create_test_proposal(
2808 primary.gateway.account(),
2809 primary.ledger.current_committee().unwrap(),
2810 round,
2811 previous_certificates,
2812 timestamp,
2813 1,
2814 &mut rng,
2815 );
2816
2817 *primary.proposed_batch.write() = Some(proposal);
2819
2820 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2822
2823 let (socket_addr, signature) = signatures.first().unwrap();
2825 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2826
2827 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2829 assert_eq!(primary.current_round(), round);
2831 }
2832
2833 #[tokio::test]
2834 async fn test_insert_certificate_with_aborted_transmissions() {
2835 let round = 3;
2836 let prev_round = round - 1;
2837 let mut rng = TestRng::default();
2838 let (primary, accounts) = primary_without_handlers(&mut rng);
2839 let peer_account = &accounts[1];
2840 let peer_ip = peer_account.0;
2841
2842 store_certificate_chain(&primary, &accounts, round, &mut rng);
2844
2845 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2847
2848 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2850 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2851
2852 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2854 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2855
2856 assert_eq!(primary.workers[0].num_transmissions(), 2);
2858
2859 let account = accounts[0].1.clone();
2861 let (certificate, transmissions) =
2862 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2863 let certificate_id = certificate.id();
2864
2865 let mut aborted_transmissions = HashSet::new();
2867 let mut transmissions_without_aborted = HashMap::new();
2868 for (transmission_id, transmission) in transmissions.clone() {
2869 match rng.gen::<bool>() || aborted_transmissions.is_empty() {
2870 true => {
2871 aborted_transmissions.insert(transmission_id);
2873 }
2874 false => {
2875 transmissions_without_aborted.insert(transmission_id, transmission);
2877 }
2878 };
2879 }
2880
2881 for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2883 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2884 }
2885
2886 assert!(
2888 primary
2889 .storage
2890 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2891 .is_err()
2892 );
2893 assert!(
2894 primary
2895 .storage
2896 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
2897 .is_err()
2898 );
2899
2900 primary
2902 .storage
2903 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
2904 .unwrap();
2905
2906 assert!(primary.storage.contains_certificate(certificate_id));
2908 for aborted_transmission_id in aborted_transmissions {
2910 assert!(primary.storage.contains_transmission(aborted_transmission_id));
2911 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
2912 }
2913 }
2914}