1use crate::{
17 Gateway,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 PRIMARY_PING_IN_MS,
22 Sync,
23 Transport,
24 WORKER_PING_IN_MS,
25 Worker,
26 events::{BatchPropose, BatchSignature, Event},
27 helpers::{
28 BFTSender,
29 PrimaryReceiver,
30 PrimarySender,
31 Proposal,
32 ProposalCache,
33 SignedProposals,
34 Storage,
35 assign_to_worker,
36 assign_to_workers,
37 fmt_id,
38 init_sync_channels,
39 init_worker_channels,
40 now,
41 },
42 spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_sync::DUMMY_SELF_IP;
48use snarkvm::{
49 console::{
50 prelude::*,
51 types::{Address, Field},
52 },
53 ledger::{
54 block::Transaction,
55 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
56 puzzle::{Solution, SolutionID},
57 },
58 prelude::committee::Committee,
59};
60
61use aleo_std::StorageMode;
62use colored::Colorize;
63use futures::stream::{FuturesUnordered, StreamExt};
64use indexmap::{IndexMap, IndexSet};
65#[cfg(feature = "locktick")]
66use locktick::{
67 parking_lot::{Mutex, RwLock},
68 tokio::Mutex as TMutex,
69};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use rayon::prelude::*;
73use std::{
74 collections::{HashMap, HashSet},
75 future::Future,
76 net::SocketAddr,
77 sync::Arc,
78 time::Duration,
79};
80#[cfg(not(feature = "locktick"))]
81use tokio::sync::Mutex as TMutex;
82use tokio::{sync::OnceCell, task::JoinHandle};
83
84pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
86
87#[derive(Clone)]
88pub struct Primary<N: Network> {
89 sync: Sync<N>,
91 gateway: Gateway<N>,
93 storage: Storage<N>,
95 ledger: Arc<dyn LedgerService<N>>,
97 workers: Arc<[Worker<N>]>,
99 bft_sender: Arc<OnceCell<BFTSender<N>>>,
101 proposed_batch: Arc<ProposedBatch<N>>,
103 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
105 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
107 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
109 propose_lock: Arc<TMutex<u64>>,
111 storage_mode: StorageMode,
113}
114
115impl<N: Network> Primary<N> {
116 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
118
119 pub fn new(
121 account: Account<N>,
122 storage: Storage<N>,
123 ledger: Arc<dyn LedgerService<N>>,
124 ip: Option<SocketAddr>,
125 trusted_validators: &[SocketAddr],
126 storage_mode: StorageMode,
127 ) -> Result<Self> {
128 let gateway =
130 Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, storage_mode.dev())?;
131 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone());
133
134 Ok(Self {
136 sync,
137 gateway,
138 storage,
139 ledger,
140 workers: Arc::from(vec![]),
141 bft_sender: Default::default(),
142 proposed_batch: Default::default(),
143 latest_proposed_batch_timestamp: Default::default(),
144 signed_proposals: Default::default(),
145 handles: Default::default(),
146 propose_lock: Default::default(),
147 storage_mode,
148 })
149 }
150
151 async fn load_proposal_cache(&self) -> Result<()> {
153 match ProposalCache::<N>::exists(&self.storage_mode) {
155 true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) {
157 Ok(proposal_cache) => {
158 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
160 proposal_cache.into();
161
162 *self.proposed_batch.write() = proposed_batch;
164 *self.signed_proposals.write() = signed_proposals;
166 *self.propose_lock.lock().await = latest_certificate_round;
168
169 for certificate in pending_certificates {
171 let batch_id = certificate.batch_id();
172 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
176 {
177 warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
178 }
179 }
180 Ok(())
181 }
182 Err(err) => {
183 bail!("Failed to read the signed proposals from the file system - {err}.");
184 }
185 },
186 false => Ok(()),
188 }
189 }
190
191 pub async fn run(
193 &mut self,
194 bft_sender: Option<BFTSender<N>>,
195 primary_sender: PrimarySender<N>,
196 primary_receiver: PrimaryReceiver<N>,
197 ) -> Result<()> {
198 info!("Starting the primary instance of the memory pool...");
199
200 if let Some(bft_sender) = &bft_sender {
202 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
204 }
205
206 let mut worker_senders = IndexMap::new();
208 let mut workers = Vec::new();
210 for id in 0..MAX_WORKERS {
212 let (tx_worker, rx_worker) = init_worker_channels();
214 let worker = Worker::new(
216 id,
217 Arc::new(self.gateway.clone()),
218 self.storage.clone(),
219 self.ledger.clone(),
220 self.proposed_batch.clone(),
221 )?;
222 worker.run(rx_worker);
224 workers.push(worker);
226 worker_senders.insert(id, tx_worker);
228 }
229 self.workers = Arc::from(workers);
231
232 let (sync_sender, sync_receiver) = init_sync_channels();
234 self.sync.initialize(bft_sender).await?;
236 self.load_proposal_cache().await?;
238 self.sync.run(sync_receiver).await?;
240 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
242 self.start_handlers(primary_receiver);
245
246 Ok(())
247 }
248
249 pub fn current_round(&self) -> u64 {
251 self.storage.current_round()
252 }
253
254 pub fn is_synced(&self) -> bool {
256 self.sync.is_synced()
257 }
258
259 pub const fn gateway(&self) -> &Gateway<N> {
261 &self.gateway
262 }
263
264 pub const fn storage(&self) -> &Storage<N> {
266 &self.storage
267 }
268
269 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
271 &self.ledger
272 }
273
274 pub fn num_workers(&self) -> u8 {
276 u8::try_from(self.workers.len()).expect("Too many workers")
277 }
278
279 pub const fn workers(&self) -> &Arc<[Worker<N>]> {
281 &self.workers
282 }
283
284 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
286 &self.proposed_batch
287 }
288}
289
290impl<N: Network> Primary<N> {
291 pub fn num_unconfirmed_transmissions(&self) -> usize {
293 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
294 }
295
296 pub fn num_unconfirmed_ratifications(&self) -> usize {
298 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
299 }
300
301 pub fn num_unconfirmed_solutions(&self) -> usize {
303 self.workers.iter().map(|worker| worker.num_solutions()).sum()
304 }
305
306 pub fn num_unconfirmed_transactions(&self) -> usize {
308 self.workers.iter().map(|worker| worker.num_transactions()).sum()
309 }
310}
311
312impl<N: Network> Primary<N> {
313 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
315 self.workers.iter().flat_map(|worker| worker.transmission_ids())
316 }
317
318 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
320 self.workers.iter().flat_map(|worker| worker.transmissions())
321 }
322
323 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
325 self.workers.iter().flat_map(|worker| worker.solutions())
326 }
327
328 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
330 self.workers.iter().flat_map(|worker| worker.transactions())
331 }
332}
333
334impl<N: Network> Primary<N> {
335 pub fn clear_worker_solutions(&self) {
337 self.workers.iter().for_each(Worker::clear_solutions);
338 }
339}
340
341impl<N: Network> Primary<N> {
342 pub async fn propose_batch(&self) -> Result<()> {
350 let mut lock_guard = self.propose_lock.lock().await;
352
353 if let Err(e) = self.check_proposed_batch_for_expiration().await {
355 warn!("Failed to check the proposed batch for expiration - {e}");
356 return Ok(());
357 }
358
359 let round = self.current_round();
361 let previous_round = round.saturating_sub(1);
363
364 ensure!(round > 0, "Round 0 cannot have transaction batches");
368
369 if round < *lock_guard {
371 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
372 return Ok(());
373 }
374
375 if let Some(proposal) = self.proposed_batch.read().as_ref() {
378 if round < proposal.round()
380 || proposal
381 .batch_header()
382 .previous_certificate_ids()
383 .iter()
384 .any(|id| !self.storage.contains_certificate(*id))
385 {
386 warn!(
387 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
388 proposal.round(),
389 );
390 return Ok(());
391 }
392 let event = Event::BatchPropose(proposal.batch_header().clone().into());
395 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
397 match self.gateway.resolver().get_peer_ip_for_address(address) {
399 Some(peer_ip) => {
401 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
402 tokio::spawn(async move {
403 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
404 if gateway.send(peer_ip, event_).await.is_none() {
406 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
407 }
408 });
409 }
410 None => continue,
411 }
412 }
413 debug!("Proposed batch for round {} is still valid", proposal.round());
414 return Ok(());
415 }
416
417 #[cfg(feature = "metrics")]
418 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
419
420 if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
422 debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
423 return Ok(());
424 }
425
426 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
428 if let Some(bft_sender) = self.bft_sender.get() {
430 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
431 Ok(true) => (), Ok(false) => return Ok(()),
435 Err(e) => {
437 warn!("Failed to update the BFT to the next round - {e}");
438 return Err(e);
439 }
440 }
441 }
442 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
443 return Ok(());
444 }
445
446 if round == *lock_guard {
452 warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
453 return Ok(());
454 }
455
456 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
458 {
460 let mut connected_validators = self.gateway.connected_addresses();
462 connected_validators.insert(self.gateway.account().address());
464 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
466 debug!(
467 "Primary is safely skipping a batch proposal for round {round} {}",
468 "(please connect to more validators)".dimmed()
469 );
470 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
471 return Ok(());
472 }
473 }
474
475 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
477
478 let mut is_ready = previous_round == 0;
481 if previous_round > 0 {
483 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
485 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
486 };
487 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
489 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
491 is_ready = true;
492 }
493 }
494 if !is_ready {
496 debug!(
497 "Primary is safely skipping a batch proposal for round {round} {}",
498 format!("(previous round {previous_round} has not reached quorum)").dimmed()
499 );
500 return Ok(());
501 }
502
503 let mut transmissions: IndexMap<_, _> = Default::default();
505 let mut proposal_cost = 0u64;
507 debug_assert_eq!(MAX_WORKERS, 1);
511
512 'outer: for worker in self.workers().iter() {
513 let mut num_worker_transmissions = 0usize;
514
515 while let Some((id, transmission)) = worker.remove_front() {
516 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
518 break 'outer;
519 }
520
521 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
523 continue 'outer;
524 }
525
526 if self.ledger.contains_transmission(&id).unwrap_or(true) {
528 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
529 continue;
530 }
531
532 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
536 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
537 continue;
538 }
539
540 match (id, transmission.clone()) {
542 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
543 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
545 {
546 trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
547 continue;
548 }
549 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
551 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
552 continue;
553 }
554 }
555 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
556 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
558 {
559 trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
560 continue;
561 }
562
563 let transaction = spawn_blocking!({
565 match transaction {
566 Data::Object(transaction) => Ok(transaction),
567 Data::Buffer(bytes) => {
568 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
569 }
570 }
571 })?;
572
573 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await {
576 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
577 continue;
578 }
579
580 let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(transaction_id, transaction)
583 else {
584 debug!(
585 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
586 fmt_id(transaction_id)
587 );
588 continue;
589 };
590
591 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
594 debug!(
595 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
596 fmt_id(transaction_id)
597 );
598 continue;
599 };
600
601 if next_proposal_cost > BatchHeader::<N>::BATCH_SPEND_LIMIT {
603 trace!(
604 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
605 fmt_id(transaction_id),
606 BatchHeader::<N>::BATCH_SPEND_LIMIT
607 );
608
609 worker.insert_front(id, transmission);
611 break 'outer;
612 }
613
614 proposal_cost = next_proposal_cost;
616 }
617
618 (TransmissionID::Ratification, Transmission::Ratification) => continue,
621 _ => continue,
623 }
624
625 transmissions.insert(id, transmission);
627 num_worker_transmissions = num_worker_transmissions.saturating_add(1);
628 }
629 }
630
631 let current_timestamp = now();
633
634 *lock_guard = round;
635
636 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
638
639 let private_key = *self.gateway.account().private_key();
641 let committee_id = committee_lookback.id();
643 let transmission_ids = transmissions.keys().copied().collect();
645 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
647 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
649 &private_key,
650 round,
651 current_timestamp,
652 committee_id,
653 transmission_ids,
654 previous_certificate_ids,
655 &mut rand::thread_rng()
656 ))
657 .and_then(|batch_header| {
658 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
659 .map(|proposal| (batch_header, proposal))
660 })
661 .inspect_err(|_| {
662 if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
664 error!("Failed to reinsert transmissions: {e:?}");
665 }
666 })?;
667 self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
669 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
671 *self.proposed_batch.write() = Some(proposal);
673 Ok(())
674 }
675
676 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
686 let BatchPropose { round: batch_round, batch_header } = batch_propose;
687
688 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
690 if batch_round != batch_header.round() {
692 self.gateway.disconnect(peer_ip);
694 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
695 }
696
697 let batch_author = batch_header.author();
699
700 match self.gateway.resolver().get_address(peer_ip) {
702 Some(address) => {
704 if address != batch_author {
705 self.gateway.disconnect(peer_ip);
707 bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
708 }
709 }
710 None => bail!("Batch proposal from a disconnected validator"),
711 }
712 if !self.gateway.is_authorized_validator_address(batch_author) {
714 self.gateway.disconnect(peer_ip);
716 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
717 }
718 if self.gateway.account().address() == batch_author {
720 bail!("Invalid peer - proposed batch from myself ({batch_author})");
721 }
722
723 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
725 if expected_committee_id != batch_header.committee_id() {
726 self.gateway.disconnect(peer_ip);
728 bail!(
729 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
730 batch_header.committee_id()
731 );
732 }
733
734 if let Some((signed_round, signed_batch_id, signature)) =
736 self.signed_proposals.read().get(&batch_author).copied()
737 {
738 if signed_round > batch_header.round() {
741 bail!(
742 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
743 batch_header.round()
744 );
745 }
746
747 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
749 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
750 }
751 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
754 let gateway = self.gateway.clone();
755 tokio::spawn(async move {
756 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
757 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
758 if gateway.send(peer_ip, event).await.is_none() {
760 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
761 }
762 });
763 return Ok(());
765 }
766 }
767
768 if self.storage.contains_batch(batch_header.batch_id()) {
771 debug!(
772 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
773 format!("batch for round {batch_round} already exists in storage").dimmed()
774 );
775 return Ok(());
776 }
777
778 let previous_round = batch_round.saturating_sub(1);
780 if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
782 self.gateway.disconnect(peer_ip);
784 bail!("Malicious peer - {e} from '{peer_ip}'");
785 }
786
787 if batch_header.contains(TransmissionID::Ratification) {
789 self.gateway.disconnect(peer_ip);
791 bail!(
792 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
793 );
794 }
795
796 let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
798
799 if let Err(err) = cfg_iter_mut!(missing_transmissions).try_for_each(|(transmission_id, transmission)| {
801 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
803 }) {
804 debug!("Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission - {err}",);
805 return Ok(());
806 }
807
808 if let Err(e) = self.ensure_is_signing_round(batch_round) {
812 debug!("{e} from '{peer_ip}'");
814 return Ok(());
815 }
816
817 let (storage, header) = (self.storage.clone(), batch_header.clone());
819 let missing_transmissions =
820 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
821 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
823
824 let block_height = self.ledger.latest_block_height() + 1;
826 if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
827 let mut proposal_cost = 0u64;
828 for transmission_id in batch_header.transmission_ids() {
829 let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
830 let Some(worker) = self.workers.get(worker_id as usize) else {
831 debug!("Unable to find worker {worker_id}");
832 return Ok(());
833 };
834
835 let Some(transmission) = worker.get_transmission(*transmission_id) else {
836 debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
837 return Ok(());
838 };
839
840 if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
842 (transmission_id, transmission)
843 {
844 let transaction = spawn_blocking!({
846 match transaction {
847 Data::Object(transaction) => Ok(transaction),
848 Data::Buffer(bytes) => {
849 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
850 }
851 }
852 })?;
853
854 let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(*transaction_id, transaction)
857 else {
858 bail!(
859 "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
860 fmt_id(transaction_id)
861 )
862 };
863
864 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
867 bail!(
868 "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
869 fmt_id(transaction_id)
870 )
871 };
872
873 if next_proposal_cost > BatchHeader::<N>::BATCH_SPEND_LIMIT {
875 bail!(
876 "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
877 fmt_id(transaction_id),
878 BatchHeader::<N>::BATCH_SPEND_LIMIT
879 );
880 }
881
882 proposal_cost = next_proposal_cost;
884 }
885 }
886 }
887
888 let batch_id = batch_header.batch_id();
892 let account = self.gateway.account().clone();
894 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
895
896 match self.signed_proposals.write().0.entry(batch_author) {
902 std::collections::hash_map::Entry::Occupied(mut entry) => {
903 if entry.get().0 == batch_round {
908 return Ok(());
909 }
910 entry.insert((batch_round, batch_id, signature));
912 }
913 std::collections::hash_map::Entry::Vacant(entry) => {
915 entry.insert((batch_round, batch_id, signature));
917 }
918 };
919
920 let self_ = self.clone();
922 tokio::spawn(async move {
923 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
924 if self_.gateway.send(peer_ip, event).await.is_some() {
926 debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
927 }
928 });
929
930 Ok(())
931 }
932
933 async fn process_batch_signature_from_peer(
942 &self,
943 peer_ip: SocketAddr,
944 batch_signature: BatchSignature<N>,
945 ) -> Result<()> {
946 self.check_proposed_batch_for_expiration().await?;
948
949 let BatchSignature { batch_id, signature } = batch_signature;
951
952 let signer = signature.to_address();
954
955 if self.gateway.resolver().get_address(peer_ip).map_or(true, |address| address != signer) {
957 self.gateway.disconnect(peer_ip);
959 bail!("Malicious peer - batch signature is from a different validator ({signer})");
960 }
961 if self.gateway.account().address() == signer {
963 bail!("Invalid peer - received a batch signature from myself ({signer})");
964 }
965
966 let self_ = self.clone();
967 let Some(proposal) = spawn_blocking!({
968 let mut proposed_batch = self_.proposed_batch.write();
970 match proposed_batch.as_mut() {
972 Some(proposal) => {
973 if proposal.batch_id() != batch_id {
975 match self_.storage.contains_batch(batch_id) {
976 true => {
978 debug!(
979 "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
980 proposal.round()
981 );
982 return Ok(None);
983 }
984 false => bail!(
986 "Unknown batch ID '{batch_id}', expected '{}' for round {}",
987 proposal.batch_id(),
988 proposal.round()
989 ),
990 }
991 }
992 let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
994 let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
996 bail!("Signature is from a disconnected validator");
997 };
998 proposal.add_signature(signer, signature, &committee_lookback)?;
1000 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1001 if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1003 return Ok(None);
1005 }
1006 }
1007 None => return Ok(None),
1009 };
1010 match proposed_batch.take() {
1012 Some(proposal) => Ok(Some(proposal)),
1013 None => Ok(None),
1014 }
1015 })?
1016 else {
1017 return Ok(());
1018 };
1019
1020 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1023
1024 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1026 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1029 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1031 return Err(e);
1032 }
1033
1034 #[cfg(feature = "metrics")]
1035 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1036 Ok(())
1037 }
1038
1039 async fn process_batch_certificate_from_peer(
1046 &self,
1047 peer_ip: SocketAddr,
1048 certificate: BatchCertificate<N>,
1049 ) -> Result<()> {
1050 if !self.gateway.is_authorized_validator_ip(peer_ip) {
1052 self.gateway.disconnect(peer_ip);
1054 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1055 }
1056 if self.storage.contains_certificate(certificate.id()) {
1058 return Ok(());
1059 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1061 self.storage.insert_unprocessed_certificate(certificate.clone())?;
1062 }
1063
1064 let author = certificate.author();
1066 let certificate_round = certificate.round();
1068 let committee_id = certificate.committee_id();
1070
1071 if self.gateway.account().address() == author {
1073 bail!("Received a batch certificate for myself ({author})");
1074 }
1075
1076 self.storage.check_incoming_certificate(&certificate)?;
1078
1079 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1091
1092 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1097
1098 let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1100 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1102
1103 let expected_committee_id = committee_lookback.id();
1105 if expected_committee_id != committee_id {
1106 self.gateway.disconnect(peer_ip);
1108 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1109 }
1110
1111 let should_advance = match &*self.proposed_batch.read() {
1115 Some(proposal) => proposal.round() < certificate_round,
1117 None => true,
1119 };
1120
1121 let current_round = self.current_round();
1123
1124 if is_quorum && should_advance && certificate_round >= current_round {
1126 self.try_increment_to_the_next_round(current_round + 1).await?;
1128 }
1129 Ok(())
1130 }
1131}
1132
1133impl<N: Network> Primary<N> {
1134 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1136 let PrimaryReceiver {
1137 mut rx_batch_propose,
1138 mut rx_batch_signature,
1139 mut rx_batch_certified,
1140 mut rx_primary_ping,
1141 mut rx_unconfirmed_solution,
1142 mut rx_unconfirmed_transaction,
1143 } = primary_receiver;
1144
1145 let self_ = self.clone();
1147 self.spawn(async move {
1148 loop {
1149 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1151
1152 let self__ = self_.clone();
1154 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1155 Ok(block_locators) => block_locators,
1156 Err(e) => {
1157 warn!("Failed to retrieve block locators - {e}");
1158 continue;
1159 }
1160 };
1161
1162 let primary_certificate = {
1164 let primary_address = self_.gateway.account().address();
1166
1167 let mut certificate = None;
1169 let mut current_round = self_.current_round();
1170 while certificate.is_none() {
1171 if current_round == 0 {
1173 break;
1174 }
1175 if let Some(primary_certificate) =
1177 self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1178 {
1179 certificate = Some(primary_certificate);
1180 } else {
1182 current_round = current_round.saturating_sub(1);
1183 }
1184 }
1185
1186 match certificate {
1188 Some(certificate) => certificate,
1189 None => continue,
1191 }
1192 };
1193
1194 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1196 self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1198 }
1199 });
1200
1201 let self_ = self.clone();
1203 self.spawn(async move {
1204 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1205 if !self_.sync.is_synced() {
1207 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1208 continue;
1209 }
1210
1211 {
1213 let self_ = self_.clone();
1214 tokio::spawn(async move {
1215 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1217 else {
1218 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1219 return;
1220 };
1221 let id = fmt_id(primary_certificate.id());
1223 let round = primary_certificate.round();
1224 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1225 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1226 }
1227 });
1228 }
1229 }
1230 });
1231
1232 let self_ = self.clone();
1234 self.spawn(async move {
1235 loop {
1236 tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1237 if !self_.sync.is_synced() {
1239 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1240 continue;
1241 }
1242 for worker in self_.workers.iter() {
1244 worker.broadcast_ping();
1245 }
1246 }
1247 });
1248
1249 let self_ = self.clone();
1251 self.spawn(async move {
1252 loop {
1253 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1255 let current_round = self_.current_round();
1256 if !self_.sync.is_synced() {
1258 debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1259 continue;
1260 }
1261 if self_.propose_lock.try_lock().is_err() {
1264 trace!(
1265 "Skipping batch proposal for round {current_round} {}",
1266 "(node is already proposing)".dimmed()
1267 );
1268 continue;
1269 };
1270 if let Err(e) = self_.propose_batch().await {
1274 warn!("Cannot propose a batch - {e}");
1275 }
1276 }
1277 });
1278
1279 let self_ = self.clone();
1281 self.spawn(async move {
1282 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1283 if !self_.sync.is_synced() {
1285 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1286 continue;
1287 }
1288 let self_ = self_.clone();
1290 tokio::spawn(async move {
1291 let round = batch_propose.round;
1293 if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1294 warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1295 }
1296 });
1297 }
1298 });
1299
1300 let self_ = self.clone();
1302 self.spawn(async move {
1303 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1304 if !self_.sync.is_synced() {
1306 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1307 continue;
1308 }
1309 let id = fmt_id(batch_signature.batch_id);
1315 if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1316 warn!("Cannot store a signature for batch '{id}' from '{peer_ip}' - {e}");
1317 }
1318 }
1319 });
1320
1321 let self_ = self.clone();
1323 self.spawn(async move {
1324 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1325 if !self_.sync.is_synced() {
1327 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1328 continue;
1329 }
1330 let self_ = self_.clone();
1332 tokio::spawn(async move {
1333 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1335 warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1336 return;
1337 };
1338 let id = fmt_id(batch_certificate.id());
1340 let round = batch_certificate.round();
1341 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1342 warn!("Cannot store a certificate '{id}' for round {round} from '{peer_ip}' - {e}");
1343 }
1344 });
1345 }
1346 });
1347
1348 let self_ = self.clone();
1352 self.spawn(async move {
1353 loop {
1354 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1356 if !self_.sync.is_synced() {
1358 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1359 continue;
1360 }
1361 let next_round = self_.current_round().saturating_add(1);
1363 let is_quorum_threshold_reached = {
1365 let authors = self_.storage.get_certificate_authors_for_round(next_round);
1367 if authors.is_empty() {
1369 continue;
1370 }
1371 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
1372 warn!("Failed to retrieve the committee lookback for round {next_round}");
1373 continue;
1374 };
1375 committee_lookback.is_quorum_threshold_reached(&authors)
1376 };
1377 if is_quorum_threshold_reached {
1379 debug!("Quorum threshold reached for round {}", next_round);
1380 if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1381 warn!("Failed to increment to the next round - {e}");
1382 }
1383 }
1384 }
1385 });
1386
1387 let self_ = self.clone();
1389 self.spawn(async move {
1390 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1391 let Ok(checksum) = solution.to_checksum::<N>() else {
1393 error!("Failed to compute the checksum for the unconfirmed solution");
1394 continue;
1395 };
1396 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1398 error!("Unable to determine the worker ID for the unconfirmed solution");
1399 continue;
1400 };
1401 let self_ = self_.clone();
1402 tokio::spawn(async move {
1403 let worker = &self_.workers[worker_id as usize];
1405 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1407 callback.send(result).ok();
1409 });
1410 }
1411 });
1412
1413 let self_ = self.clone();
1415 self.spawn(async move {
1416 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1417 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1418 let Ok(checksum) = transaction.to_checksum::<N>() else {
1420 error!("Failed to compute the checksum for the unconfirmed transaction");
1421 continue;
1422 };
1423 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1425 error!("Unable to determine the worker ID for the unconfirmed transaction");
1426 continue;
1427 };
1428 let self_ = self_.clone();
1429 tokio::spawn(async move {
1430 let worker = &self_.workers[worker_id as usize];
1432 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1434 callback.send(result).ok();
1436 });
1437 }
1438 });
1439 }
1440
1441 async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1443 let is_expired = match self.proposed_batch.read().as_ref() {
1445 Some(proposal) => proposal.round() < self.current_round(),
1446 None => false,
1447 };
1448 if is_expired {
1450 let proposal = self.proposed_batch.write().take();
1452 if let Some(proposal) = proposal {
1453 debug!("Cleared expired proposal for round {}", proposal.round());
1454 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1455 }
1456 }
1457 Ok(())
1458 }
1459
1460 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1462 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1464 let mut fast_forward_round = self.current_round();
1465 while fast_forward_round < next_round.saturating_sub(1) {
1467 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1469 *self.proposed_batch.write() = None;
1471 }
1472 }
1473
1474 let current_round = self.current_round();
1476 if current_round < next_round {
1478 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1480 match bft_sender.send_primary_round_to_bft(current_round).await {
1481 Ok(is_ready) => is_ready,
1482 Err(e) => {
1483 warn!("Failed to update the BFT to the next round - {e}");
1484 return Err(e);
1485 }
1486 }
1487 }
1488 else {
1490 self.storage.increment_to_next_round(current_round)?;
1492 true
1494 };
1495
1496 match is_ready {
1498 true => debug!("Primary is ready to propose the next round"),
1499 false => debug!("Primary is not ready to propose the next round"),
1500 }
1501
1502 if is_ready {
1504 self.propose_batch().await?;
1505 }
1506 }
1507 Ok(())
1508 }
1509
1510 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1514 let current_round = self.current_round();
1516 if current_round + self.storage.max_gc_rounds() <= batch_round {
1518 bail!("Round {batch_round} is too far in the future")
1519 }
1520 if current_round > batch_round + 1 {
1524 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1525 }
1526 if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1528 if signing_round > batch_round {
1529 bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1530 }
1531 }
1532 Ok(())
1533 }
1534
1535 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1538 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1540 Some(certificate) => certificate.timestamp(),
1542 None => match self.gateway.account().address() == author {
1543 true => *self.latest_proposed_batch_timestamp.read(),
1545 false => return Ok(()),
1547 },
1548 };
1549
1550 let elapsed = timestamp
1552 .checked_sub(previous_timestamp)
1553 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1554 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1556 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1557 false => Ok(()),
1558 }
1559 }
1560
1561 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1563 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1565 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1568 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1570 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1571 debug!("Stored a batch certificate for round {}", certificate.round());
1572 if let Some(bft_sender) = self.bft_sender.get() {
1574 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1576 warn!("Failed to update the BFT DAG from primary - {e}");
1577 return Err(e);
1578 };
1579 }
1580 self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1582 let num_transmissions = certificate.transmission_ids().len();
1584 let round = certificate.round();
1585 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1586 self.try_increment_to_the_next_round(round + 1).await
1588 }
1589
1590 fn insert_missing_transmissions_into_workers(
1592 &self,
1593 peer_ip: SocketAddr,
1594 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1595 ) -> Result<()> {
1596 assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1598 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1599 })
1600 }
1601
1602 fn reinsert_transmissions_into_workers(
1604 &self,
1605 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1606 ) -> Result<()> {
1607 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1609 worker.reinsert(transmission_id, transmission);
1610 })
1611 }
1612
1613 #[async_recursion::async_recursion]
1623 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1624 &self,
1625 peer_ip: SocketAddr,
1626 certificate: BatchCertificate<N>,
1627 ) -> Result<()> {
1628 let batch_header = certificate.batch_header();
1630 let batch_round = batch_header.round();
1632
1633 if batch_round <= self.storage.gc_round() {
1635 return Ok(());
1636 }
1637 if self.storage.contains_certificate(certificate.id()) {
1639 return Ok(());
1640 }
1641
1642 if !IS_SYNCING && !self.is_synced() {
1644 bail!(
1645 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1646 fmt_id(certificate.id())
1647 );
1648 }
1649
1650 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1652
1653 if !self.storage.contains_certificate(certificate.id()) {
1655 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1657 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1658 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1659 if let Some(bft_sender) = self.bft_sender.get() {
1661 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1663 warn!("Failed to update the BFT DAG from sync: {e}");
1664 return Err(e);
1665 };
1666 }
1667 }
1668 Ok(())
1669 }
1670
1671 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1673 &self,
1674 peer_ip: SocketAddr,
1675 batch_header: &BatchHeader<N>,
1676 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1677 let batch_round = batch_header.round();
1679
1680 if batch_round <= self.storage.gc_round() {
1682 bail!("Round {batch_round} is too far in the past")
1683 }
1684
1685 if !IS_SYNCING && !self.is_synced() {
1687 bail!(
1688 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1689 fmt_id(batch_header.batch_id())
1690 );
1691 }
1692
1693 let is_quorum_threshold_reached = {
1695 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1696 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1697 committee_lookback.is_quorum_threshold_reached(&authors)
1698 };
1699
1700 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1705 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1707 if is_behind_schedule || is_peer_far_in_future {
1709 self.try_increment_to_the_next_round(batch_round).await?;
1711 }
1712
1713 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1715
1716 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1718
1719 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1721 missing_transmissions_handle,
1722 missing_previous_certificates_handle,
1723 ).map_err(|e| {
1724 anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1725 })?;
1726
1727 for batch_certificate in missing_previous_certificates {
1729 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1731 }
1732 Ok(missing_transmissions)
1733 }
1734
1735 async fn fetch_missing_transmissions(
1738 &self,
1739 peer_ip: SocketAddr,
1740 batch_header: &BatchHeader<N>,
1741 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1742 if batch_header.round() <= self.storage.gc_round() {
1744 return Ok(Default::default());
1745 }
1746
1747 if self.storage.contains_batch(batch_header.batch_id()) {
1749 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1750 return Ok(Default::default());
1751 }
1752
1753 let workers = self.workers.clone();
1755
1756 let mut fetch_transmissions = FuturesUnordered::new();
1758
1759 let num_workers = self.num_workers();
1761 for transmission_id in batch_header.transmission_ids() {
1763 if !self.storage.contains_transmission(*transmission_id) {
1765 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1767 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1768 };
1769 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1771 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1773 }
1774 }
1775
1776 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1778 while let Some(result) = fetch_transmissions.next().await {
1780 let (transmission_id, transmission) = result?;
1782 transmissions.insert(transmission_id, transmission);
1784 }
1785 Ok(transmissions)
1787 }
1788
1789 async fn fetch_missing_previous_certificates(
1791 &self,
1792 peer_ip: SocketAddr,
1793 batch_header: &BatchHeader<N>,
1794 ) -> Result<HashSet<BatchCertificate<N>>> {
1795 let round = batch_header.round();
1797 if round == 1 || round <= self.storage.gc_round() + 1 {
1799 return Ok(Default::default());
1800 }
1801
1802 let missing_previous_certificates =
1804 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1805 if !missing_previous_certificates.is_empty() {
1806 debug!(
1807 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1808 missing_previous_certificates.len(),
1809 );
1810 }
1811 Ok(missing_previous_certificates)
1813 }
1814
1815 async fn fetch_missing_certificates(
1817 &self,
1818 peer_ip: SocketAddr,
1819 round: u64,
1820 certificate_ids: &IndexSet<Field<N>>,
1821 ) -> Result<HashSet<BatchCertificate<N>>> {
1822 let mut fetch_certificates = FuturesUnordered::new();
1824 let mut missing_certificates = HashSet::default();
1826 for certificate_id in certificate_ids {
1828 if self.ledger.contains_certificate(certificate_id)? {
1830 continue;
1831 }
1832 if self.storage.contains_certificate(*certificate_id) {
1834 continue;
1835 }
1836 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1838 missing_certificates.insert(certificate);
1839 } else {
1840 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1842 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1845 }
1846 }
1847
1848 match fetch_certificates.is_empty() {
1850 true => return Ok(missing_certificates),
1851 false => trace!(
1852 "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1853 fetch_certificates.len(),
1854 ),
1855 }
1856
1857 while let Some(result) = fetch_certificates.next().await {
1859 missing_certificates.insert(result?);
1861 }
1862 Ok(missing_certificates)
1864 }
1865}
1866
1867impl<N: Network> Primary<N> {
1868 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1870 self.handles.lock().push(tokio::spawn(future));
1871 }
1872
1873 pub async fn shut_down(&self) {
1875 info!("Shutting down the primary...");
1876 self.workers.iter().for_each(|worker| worker.shut_down());
1878 self.handles.lock().iter().for_each(|handle| handle.abort());
1880 let proposal_cache = {
1882 let proposal = self.proposed_batch.write().take();
1883 let signed_proposals = self.signed_proposals.read().clone();
1884 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1885 let pending_certificates = self.storage.get_pending_certificates();
1886 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1887 };
1888 if let Err(err) = proposal_cache.store(&self.storage_mode) {
1889 error!("Failed to store the current proposal cache: {err}");
1890 }
1891 self.gateway.shut_down().await;
1893 }
1894}
1895
1896#[cfg(test)]
1897mod tests {
1898 use super::*;
1899 use snarkos_node_bft_ledger_service::MockLedgerService;
1900 use snarkos_node_bft_storage_service::BFTMemoryService;
1901 use snarkvm::{
1902 ledger::{
1903 committee::{Committee, MIN_VALIDATOR_STAKE},
1904 ledger_test_helpers::sample_execution_transaction_with_fee,
1905 },
1906 prelude::{Address, Signature},
1907 };
1908
1909 use bytes::Bytes;
1910 use indexmap::IndexSet;
1911 use rand::RngCore;
1912
1913 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1914
1915 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
1916 const COMMITTEE_SIZE: usize = 4;
1918 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1919 let mut members = IndexMap::new();
1920
1921 for i in 0..COMMITTEE_SIZE {
1922 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1923 let account = Account::new(rng).unwrap();
1924
1925 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1926 accounts.push((socket_addr, account));
1927 }
1928
1929 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1930 }
1931
1932 fn primary_with_committee(
1934 account_index: usize,
1935 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1936 committee: Committee<CurrentNetwork>,
1937 height: u32,
1938 ) -> Primary<CurrentNetwork> {
1939 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
1940 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1941
1942 let account = accounts[account_index].1.clone();
1944 let mut primary = Primary::new(account, storage, ledger, None, &[], StorageMode::new_test(None)).unwrap();
1945
1946 primary.workers = Arc::from([Worker::new(
1948 0, Arc::new(primary.gateway.clone()),
1950 primary.storage.clone(),
1951 primary.ledger.clone(),
1952 primary.proposed_batch.clone(),
1953 )
1954 .unwrap()]);
1955 for a in accounts.iter().skip(account_index) {
1956 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
1957 }
1958
1959 primary
1960 }
1961
1962 fn primary_without_handlers(
1963 rng: &mut TestRng,
1964 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1965 let (accounts, committee) = sample_committee(rng);
1966 let primary = primary_with_committee(
1967 0, &accounts,
1969 committee,
1970 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
1971 );
1972
1973 (primary, accounts)
1974 }
1975
1976 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1978 let solution_id = rng.gen::<u64>().into();
1980 let size = rng.gen_range(1024..10 * 1024);
1982 let mut vec = vec![0u8; size];
1984 rng.fill_bytes(&mut vec);
1985 let solution = Data::Buffer(Bytes::from(vec));
1986 (solution_id, solution)
1988 }
1989
1990 fn sample_unconfirmed_transaction(
1992 rng: &mut TestRng,
1993 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1994 let transaction = sample_execution_transaction_with_fee(false, rng);
1995 let id = transaction.id();
1996
1997 (id, Data::Object(transaction))
1998 }
1999
2000 fn create_test_proposal(
2002 author: &Account<CurrentNetwork>,
2003 committee: Committee<CurrentNetwork>,
2004 round: u64,
2005 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2006 timestamp: i64,
2007 num_transactions: u64,
2008 rng: &mut TestRng,
2009 ) -> Proposal<CurrentNetwork> {
2010 let mut transmission_ids = IndexSet::new();
2011 let mut transmissions = IndexMap::new();
2012
2013 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2015 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2016 let solution_transmission_id = (solution_id, solution_checksum).into();
2017 transmission_ids.insert(solution_transmission_id);
2018 transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2019
2020 for _ in 0..num_transactions {
2022 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2023 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2024 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2025 transmission_ids.insert(transaction_transmission_id);
2026 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2027 }
2028
2029 let private_key = author.private_key();
2031 let batch_header = BatchHeader::new(
2033 private_key,
2034 round,
2035 timestamp,
2036 committee.id(),
2037 transmission_ids,
2038 previous_certificate_ids,
2039 rng,
2040 )
2041 .unwrap();
2042 Proposal::new(committee, batch_header, transmissions).unwrap()
2044 }
2045
2046 fn peer_signatures_for_proposal(
2049 primary: &Primary<CurrentNetwork>,
2050 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2051 rng: &mut TestRng,
2052 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2053 let mut signatures = Vec::with_capacity(accounts.len() - 1);
2055 for (socket_addr, account) in accounts {
2056 if account.address() == primary.gateway.account().address() {
2057 continue;
2058 }
2059 let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2060 let signature = account.sign(&[batch_id], rng).unwrap();
2061 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2062 }
2063
2064 signatures
2065 }
2066
2067 fn peer_signatures_for_batch(
2069 primary_address: Address<CurrentNetwork>,
2070 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2071 batch_id: Field<CurrentNetwork>,
2072 rng: &mut TestRng,
2073 ) -> IndexSet<Signature<CurrentNetwork>> {
2074 let mut signatures = IndexSet::new();
2075 for (_, account) in accounts {
2076 if account.address() == primary_address {
2077 continue;
2078 }
2079 let signature = account.sign(&[batch_id], rng).unwrap();
2080 signatures.insert(signature);
2081 }
2082 signatures
2083 }
2084
2085 fn create_batch_certificate(
2087 primary_address: Address<CurrentNetwork>,
2088 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2089 round: u64,
2090 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2091 rng: &mut TestRng,
2092 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2093 let timestamp = now();
2094
2095 let author =
2096 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2097 let private_key = author.private_key();
2098
2099 let committee_id = Field::rand(rng);
2100 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2101 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2102 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2103 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2104
2105 let solution_transmission_id = (solution_id, solution_checksum).into();
2106 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2107
2108 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2109 let transmissions = [
2110 (solution_transmission_id, Transmission::Solution(solution)),
2111 (transaction_transmission_id, Transmission::Transaction(transaction)),
2112 ]
2113 .into();
2114
2115 let batch_header = BatchHeader::new(
2116 private_key,
2117 round,
2118 timestamp,
2119 committee_id,
2120 transmission_ids,
2121 previous_certificate_ids,
2122 rng,
2123 )
2124 .unwrap();
2125 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2126 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2127 (certificate, transmissions)
2128 }
2129
2130 fn store_certificate_chain(
2132 primary: &Primary<CurrentNetwork>,
2133 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2134 round: u64,
2135 rng: &mut TestRng,
2136 ) -> IndexSet<Field<CurrentNetwork>> {
2137 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2138 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2139 for cur_round in 1..round {
2140 for (_, account) in accounts.iter() {
2141 let (certificate, transmissions) = create_batch_certificate(
2142 account.address(),
2143 accounts,
2144 cur_round,
2145 previous_certificates.clone(),
2146 rng,
2147 );
2148 next_certificates.insert(certificate.id());
2149 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2150 }
2151
2152 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2153 previous_certificates = next_certificates;
2154 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2155 }
2156
2157 previous_certificates
2158 }
2159
2160 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2163 for (addr, acct) in accounts.iter().skip(1) {
2165 primary.gateway.resolver().insert_peer(*addr, *addr, acct.address());
2166 }
2167 }
2168
2169 #[tokio::test]
2170 async fn test_propose_batch() {
2171 let mut rng = TestRng::default();
2172 let (primary, _) = primary_without_handlers(&mut rng);
2173
2174 assert!(primary.proposed_batch.read().is_none());
2176
2177 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2179 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2180
2181 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2183 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2184
2185 assert!(primary.propose_batch().await.is_ok());
2187 assert!(primary.proposed_batch.read().is_some());
2188 }
2189
2190 #[tokio::test]
2191 async fn test_propose_batch_with_no_transmissions() {
2192 let mut rng = TestRng::default();
2193 let (primary, _) = primary_without_handlers(&mut rng);
2194
2195 assert!(primary.proposed_batch.read().is_none());
2197
2198 assert!(primary.propose_batch().await.is_ok());
2200 assert!(primary.proposed_batch.read().is_some());
2201 }
2202
2203 #[tokio::test]
2204 async fn test_propose_batch_in_round() {
2205 let round = 3;
2206 let mut rng = TestRng::default();
2207 let (primary, accounts) = primary_without_handlers(&mut rng);
2208
2209 store_certificate_chain(&primary, &accounts, round, &mut rng);
2211
2212 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2214
2215 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2217 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2218
2219 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2221 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2222
2223 assert!(primary.propose_batch().await.is_ok());
2225 assert!(primary.proposed_batch.read().is_some());
2226 }
2227
2228 #[tokio::test]
2229 async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2230 let round = 3;
2231 let prev_round = round - 1;
2232 let mut rng = TestRng::default();
2233 let (primary, accounts) = primary_without_handlers(&mut rng);
2234 let peer_account = &accounts[1];
2235 let peer_ip = peer_account.0;
2236
2237 store_certificate_chain(&primary, &accounts, round, &mut rng);
2239
2240 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2242
2243 let mut num_transmissions_in_previous_round = 0;
2245
2246 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2248 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2249 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2250 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2251
2252 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2254 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2255
2256 assert_eq!(primary.workers[0].num_transmissions(), 2);
2258
2259 for (_, account) in accounts.iter() {
2261 let (certificate, transmissions) = create_batch_certificate(
2262 account.address(),
2263 &accounts,
2264 round,
2265 previous_certificate_ids.clone(),
2266 &mut rng,
2267 );
2268
2269 for (transmission_id, transmission) in transmissions.iter() {
2271 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2272 }
2273
2274 num_transmissions_in_previous_round += transmissions.len();
2276 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2277 }
2278
2279 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2281
2282 assert!(primary.storage.increment_to_next_round(round).is_ok());
2284
2285 assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2287
2288 assert!(primary.propose_batch().await.is_ok());
2290
2291 let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2293 assert_eq!(proposed_transmissions.len(), 2);
2294 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2295 assert!(
2296 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2297 );
2298 }
2299
2300 #[tokio::test]
2301 async fn test_propose_batch_over_spend_limit() {
2302 let mut rng = TestRng::default();
2303
2304 let (accounts, committee) = sample_committee(&mut rng);
2306 let primary = primary_with_committee(
2307 0,
2308 &accounts,
2309 committee.clone(),
2310 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2311 );
2312
2313 assert!(primary.proposed_batch.read().is_none());
2315 primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2317
2318 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2320 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2321
2322 for _i in 0..5 {
2323 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2324 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2326 }
2327
2328 assert!(primary.propose_batch().await.is_ok());
2330 assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2332 assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2334 }
2335
2336 #[tokio::test]
2337 async fn test_batch_propose_from_peer() {
2338 let mut rng = TestRng::default();
2339 let (primary, accounts) = primary_without_handlers(&mut rng);
2340
2341 let round = 1;
2343 let peer_account = &accounts[1];
2344 let peer_ip = peer_account.0;
2345 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2346 let proposal = create_test_proposal(
2347 &peer_account.1,
2348 primary.ledger.current_committee().unwrap(),
2349 round,
2350 Default::default(),
2351 timestamp,
2352 1,
2353 &mut rng,
2354 );
2355
2356 for (transmission_id, transmission) in proposal.transmissions() {
2358 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2359 }
2360
2361 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2363 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2365
2366 assert!(
2368 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2369 );
2370 }
2371
2372 #[tokio::test]
2373 async fn test_batch_propose_from_peer_when_not_synced() {
2374 let mut rng = TestRng::default();
2375 let (primary, accounts) = primary_without_handlers(&mut rng);
2376
2377 let round = 1;
2379 let peer_account = &accounts[1];
2380 let peer_ip = peer_account.0;
2381 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2382 let proposal = create_test_proposal(
2383 &peer_account.1,
2384 primary.ledger.current_committee().unwrap(),
2385 round,
2386 Default::default(),
2387 timestamp,
2388 1,
2389 &mut rng,
2390 );
2391
2392 for (transmission_id, transmission) in proposal.transmissions() {
2394 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2395 }
2396
2397 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2399
2400 assert!(
2402 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2403 );
2404 }
2405
2406 #[tokio::test]
2407 async fn test_batch_propose_from_peer_in_round() {
2408 let round = 2;
2409 let mut rng = TestRng::default();
2410 let (primary, accounts) = primary_without_handlers(&mut rng);
2411
2412 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2414
2415 let peer_account = &accounts[1];
2417 let peer_ip = peer_account.0;
2418 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2419 let proposal = create_test_proposal(
2420 &peer_account.1,
2421 primary.ledger.current_committee().unwrap(),
2422 round,
2423 previous_certificates,
2424 timestamp,
2425 1,
2426 &mut rng,
2427 );
2428
2429 for (transmission_id, transmission) in proposal.transmissions() {
2431 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2432 }
2433
2434 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2436 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2438
2439 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2441 }
2442
2443 #[tokio::test]
2444 async fn test_batch_propose_from_peer_wrong_round() {
2445 let mut rng = TestRng::default();
2446 let (primary, accounts) = primary_without_handlers(&mut rng);
2447
2448 let round = 1;
2450 let peer_account = &accounts[1];
2451 let peer_ip = peer_account.0;
2452 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2453 let proposal = create_test_proposal(
2454 &peer_account.1,
2455 primary.ledger.current_committee().unwrap(),
2456 round,
2457 Default::default(),
2458 timestamp,
2459 1,
2460 &mut rng,
2461 );
2462
2463 for (transmission_id, transmission) in proposal.transmissions() {
2465 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2466 }
2467
2468 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2470 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2472
2473 assert!(
2475 primary
2476 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2477 round: round + 1,
2478 batch_header: Data::Object(proposal.batch_header().clone())
2479 })
2480 .await
2481 .is_err()
2482 );
2483 }
2484
2485 #[tokio::test]
2486 async fn test_batch_propose_from_peer_in_round_wrong_round() {
2487 let round = 4;
2488 let mut rng = TestRng::default();
2489 let (primary, accounts) = primary_without_handlers(&mut rng);
2490
2491 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2493
2494 let peer_account = &accounts[1];
2496 let peer_ip = peer_account.0;
2497 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2498 let proposal = create_test_proposal(
2499 &peer_account.1,
2500 primary.ledger.current_committee().unwrap(),
2501 round,
2502 previous_certificates,
2503 timestamp,
2504 1,
2505 &mut rng,
2506 );
2507
2508 for (transmission_id, transmission) in proposal.transmissions() {
2510 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2511 }
2512
2513 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2515 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2517
2518 assert!(
2520 primary
2521 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2522 round: round + 1,
2523 batch_header: Data::Object(proposal.batch_header().clone())
2524 })
2525 .await
2526 .is_err()
2527 );
2528 }
2529
2530 #[tokio::test]
2531 async fn test_batch_propose_from_peer_with_past_timestamp() {
2532 let round = 2;
2533 let mut rng = TestRng::default();
2534 let (primary, accounts) = primary_without_handlers(&mut rng);
2535
2536 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2538
2539 let peer_account = &accounts[1];
2541 let peer_ip = peer_account.0;
2542 let past_timestamp = now() - 100; let proposal = create_test_proposal(
2544 &peer_account.1,
2545 primary.ledger.current_committee().unwrap(),
2546 round,
2547 previous_certificates,
2548 past_timestamp,
2549 1,
2550 &mut rng,
2551 );
2552
2553 for (transmission_id, transmission) in proposal.transmissions() {
2555 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2556 }
2557
2558 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2560 primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2562
2563 assert!(
2565 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2566 );
2567 }
2568
2569 #[tokio::test]
2570 async fn test_batch_propose_from_peer_over_spend_limit() {
2571 let mut rng = TestRng::default();
2572
2573 let (accounts, committee) = sample_committee(&mut rng);
2575 let primary_v4 = primary_with_committee(
2576 0,
2577 &accounts,
2578 committee.clone(),
2579 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2580 );
2581 let primary_v5 = primary_with_committee(
2582 1,
2583 &accounts,
2584 committee.clone(),
2585 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2586 );
2587
2588 let round = 1;
2590 let peer_account = &accounts[2];
2591 let peer_ip = peer_account.0;
2592 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2593 let proposal =
2594 create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2595
2596 for (transmission_id, transmission) in proposal.transmissions() {
2598 primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2599 primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2600 }
2601
2602 primary_v4.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2604 primary_v5.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2605 primary_v4.sync.block_sync().try_block_sync(&primary_v4.gateway.clone()).await;
2607 primary_v5.sync.block_sync().try_block_sync(&primary_v5.gateway.clone()).await;
2608
2609 assert!(
2611 primary_v4
2612 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2613 .await
2614 .is_ok()
2615 );
2616 assert!(
2617 primary_v5
2618 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2619 .await
2620 .is_err()
2621 );
2622 }
2623
2624 #[tokio::test]
2625 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2626 let round = 3;
2627 let mut rng = TestRng::default();
2628 let (primary, _) = primary_without_handlers(&mut rng);
2629
2630 assert!(primary.proposed_batch.read().is_none());
2632
2633 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2635 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2636
2637 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2639 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2640
2641 let old_proposal_lock_round = *primary.propose_lock.lock().await;
2643 *primary.propose_lock.lock().await = round + 1;
2644
2645 assert!(primary.propose_batch().await.is_ok());
2647 assert!(primary.proposed_batch.read().is_none());
2648
2649 *primary.propose_lock.lock().await = old_proposal_lock_round;
2651
2652 assert!(primary.propose_batch().await.is_ok());
2654 assert!(primary.proposed_batch.read().is_some());
2655 }
2656
2657 #[tokio::test]
2658 async fn test_propose_batch_with_storage_round_behind_proposal() {
2659 let round = 5;
2660 let mut rng = TestRng::default();
2661 let (primary, accounts) = primary_without_handlers(&mut rng);
2662
2663 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2665
2666 let timestamp = now();
2668 let proposal = create_test_proposal(
2669 primary.gateway.account(),
2670 primary.ledger.current_committee().unwrap(),
2671 round + 1,
2672 previous_certificates,
2673 timestamp,
2674 1,
2675 &mut rng,
2676 );
2677
2678 *primary.proposed_batch.write() = Some(proposal);
2680
2681 assert!(primary.propose_batch().await.is_ok());
2683 assert!(primary.proposed_batch.read().is_some());
2684 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2685 }
2686
2687 #[tokio::test(flavor = "multi_thread")]
2688 async fn test_batch_signature_from_peer() {
2689 let mut rng = TestRng::default();
2690 let (primary, accounts) = primary_without_handlers(&mut rng);
2691 map_account_addresses(&primary, &accounts);
2692
2693 let round = 1;
2695 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2696 let proposal = create_test_proposal(
2697 primary.gateway.account(),
2698 primary.ledger.current_committee().unwrap(),
2699 round,
2700 Default::default(),
2701 timestamp,
2702 1,
2703 &mut rng,
2704 );
2705
2706 *primary.proposed_batch.write() = Some(proposal);
2708
2709 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2711
2712 for (socket_addr, signature) in signatures {
2714 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2715 }
2716
2717 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2719 assert_eq!(primary.current_round(), round + 1);
2721 }
2722
2723 #[tokio::test(flavor = "multi_thread")]
2724 async fn test_batch_signature_from_peer_in_round() {
2725 let round = 5;
2726 let mut rng = TestRng::default();
2727 let (primary, accounts) = primary_without_handlers(&mut rng);
2728 map_account_addresses(&primary, &accounts);
2729
2730 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2732
2733 let timestamp = now();
2735 let proposal = create_test_proposal(
2736 primary.gateway.account(),
2737 primary.ledger.current_committee().unwrap(),
2738 round,
2739 previous_certificates,
2740 timestamp,
2741 1,
2742 &mut rng,
2743 );
2744
2745 *primary.proposed_batch.write() = Some(proposal);
2747
2748 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2750
2751 for (socket_addr, signature) in signatures {
2753 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2754 }
2755
2756 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2758 assert_eq!(primary.current_round(), round + 1);
2760 }
2761
2762 #[tokio::test]
2763 async fn test_batch_signature_from_peer_no_quorum() {
2764 let mut rng = TestRng::default();
2765 let (primary, accounts) = primary_without_handlers(&mut rng);
2766 map_account_addresses(&primary, &accounts);
2767
2768 let round = 1;
2770 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2771 let proposal = create_test_proposal(
2772 primary.gateway.account(),
2773 primary.ledger.current_committee().unwrap(),
2774 round,
2775 Default::default(),
2776 timestamp,
2777 1,
2778 &mut rng,
2779 );
2780
2781 *primary.proposed_batch.write() = Some(proposal);
2783
2784 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2786
2787 let (socket_addr, signature) = signatures.first().unwrap();
2789 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2790
2791 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2793 assert_eq!(primary.current_round(), round);
2795 }
2796
2797 #[tokio::test]
2798 async fn test_batch_signature_from_peer_in_round_no_quorum() {
2799 let round = 7;
2800 let mut rng = TestRng::default();
2801 let (primary, accounts) = primary_without_handlers(&mut rng);
2802 map_account_addresses(&primary, &accounts);
2803
2804 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2806
2807 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2809 let proposal = create_test_proposal(
2810 primary.gateway.account(),
2811 primary.ledger.current_committee().unwrap(),
2812 round,
2813 previous_certificates,
2814 timestamp,
2815 1,
2816 &mut rng,
2817 );
2818
2819 *primary.proposed_batch.write() = Some(proposal);
2821
2822 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2824
2825 let (socket_addr, signature) = signatures.first().unwrap();
2827 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2828
2829 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2831 assert_eq!(primary.current_round(), round);
2833 }
2834
2835 #[tokio::test]
2836 async fn test_insert_certificate_with_aborted_transmissions() {
2837 let round = 3;
2838 let prev_round = round - 1;
2839 let mut rng = TestRng::default();
2840 let (primary, accounts) = primary_without_handlers(&mut rng);
2841 let peer_account = &accounts[1];
2842 let peer_ip = peer_account.0;
2843
2844 store_certificate_chain(&primary, &accounts, round, &mut rng);
2846
2847 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2849
2850 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2852 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2853
2854 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2856 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2857
2858 assert_eq!(primary.workers[0].num_transmissions(), 2);
2860
2861 let account = accounts[0].1.clone();
2863 let (certificate, transmissions) =
2864 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2865 let certificate_id = certificate.id();
2866
2867 let mut aborted_transmissions = HashSet::new();
2869 let mut transmissions_without_aborted = HashMap::new();
2870 for (transmission_id, transmission) in transmissions.clone() {
2871 match rng.gen::<bool>() || aborted_transmissions.is_empty() {
2872 true => {
2873 aborted_transmissions.insert(transmission_id);
2875 }
2876 false => {
2877 transmissions_without_aborted.insert(transmission_id, transmission);
2879 }
2880 };
2881 }
2882
2883 for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2885 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2886 }
2887
2888 assert!(
2890 primary
2891 .storage
2892 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2893 .is_err()
2894 );
2895 assert!(
2896 primary
2897 .storage
2898 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
2899 .is_err()
2900 );
2901
2902 primary
2904 .storage
2905 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
2906 .unwrap();
2907
2908 assert!(primary.storage.contains_certificate(certificate_id));
2910 for aborted_transmission_id in aborted_transmissions {
2912 assert!(primary.storage.contains_transmission(aborted_transmission_id));
2913 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
2914 }
2915 }
2916}