1use crate::{
17 Gateway,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 PRIMARY_PING_IN_MS,
22 Sync,
23 Transport,
24 WORKER_PING_IN_MS,
25 Worker,
26 events::{BatchPropose, BatchSignature, Event},
27 helpers::{
28 BFTSender,
29 PrimaryReceiver,
30 PrimarySender,
31 Proposal,
32 ProposalCache,
33 SignedProposals,
34 Storage,
35 assign_to_worker,
36 assign_to_workers,
37 fmt_id,
38 init_sync_channels,
39 init_worker_channels,
40 now,
41 },
42 spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_network::PeerPoolHandling;
48use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
49use snarkvm::{
50 console::{
51 prelude::*,
52 types::{Address, Field},
53 },
54 ledger::{
55 block::Transaction,
56 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
57 puzzle::{Solution, SolutionID},
58 },
59 prelude::{ConsensusVersion, committee::Committee},
60};
61
62use aleo_std::StorageMode;
63use colored::Colorize;
64use futures::stream::{FuturesUnordered, StreamExt};
65use indexmap::{IndexMap, IndexSet};
66#[cfg(feature = "locktick")]
67use locktick::{
68 parking_lot::{Mutex, RwLock},
69 tokio::Mutex as TMutex,
70};
71#[cfg(not(feature = "locktick"))]
72use parking_lot::{Mutex, RwLock};
73#[cfg(not(feature = "serial"))]
74use rayon::prelude::*;
75use std::{
76 collections::{HashMap, HashSet},
77 future::Future,
78 net::SocketAddr,
79 sync::Arc,
80 time::Duration,
81};
82#[cfg(not(feature = "locktick"))]
83use tokio::sync::Mutex as TMutex;
84use tokio::{sync::OnceCell, task::JoinHandle};
85
86pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
88
89#[derive(Clone)]
92pub struct Primary<N: Network> {
93 sync: Sync<N>,
95 gateway: Gateway<N>,
97 storage: Storage<N>,
99 ledger: Arc<dyn LedgerService<N>>,
101 workers: Arc<[Worker<N>]>,
103 bft_sender: Arc<OnceCell<BFTSender<N>>>,
105 proposed_batch: Arc<ProposedBatch<N>>,
107 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
109 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
111 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
113 propose_lock: Arc<TMutex<u64>>,
115 storage_mode: StorageMode,
117}
118
119impl<N: Network> Primary<N> {
120 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
122
123 #[allow(clippy::too_many_arguments)]
125 pub fn new(
126 account: Account<N>,
127 storage: Storage<N>,
128 ledger: Arc<dyn LedgerService<N>>,
129 block_sync: Arc<BlockSync<N>>,
130 ip: Option<SocketAddr>,
131 trusted_validators: &[SocketAddr],
132 trusted_peers_only: bool,
133 storage_mode: StorageMode,
134 dev: Option<u16>,
135 ) -> Result<Self> {
136 let gateway = Gateway::new(
138 account,
139 storage.clone(),
140 ledger.clone(),
141 ip,
142 trusted_validators,
143 trusted_peers_only,
144 storage_mode.clone(),
145 dev,
146 )?;
147 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
149
150 Ok(Self {
152 sync,
153 gateway,
154 storage,
155 ledger,
156 workers: Arc::from(vec![]),
157 bft_sender: Default::default(),
158 proposed_batch: Default::default(),
159 latest_proposed_batch_timestamp: Default::default(),
160 signed_proposals: Default::default(),
161 handles: Default::default(),
162 propose_lock: Default::default(),
163 storage_mode,
164 })
165 }
166
167 async fn load_proposal_cache(&self) -> Result<()> {
169 match ProposalCache::<N>::exists(&self.storage_mode) {
171 true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) {
173 Ok(proposal_cache) => {
174 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
176 proposal_cache.into();
177
178 *self.proposed_batch.write() = proposed_batch;
180 *self.signed_proposals.write() = signed_proposals;
182 *self.propose_lock.lock().await = latest_certificate_round;
184
185 for certificate in pending_certificates {
187 let batch_id = certificate.batch_id();
188 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
192 {
193 warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
194 }
195 }
196 Ok(())
197 }
198 Err(err) => {
199 bail!("Failed to read the signed proposals from the file system - {err}.");
200 }
201 },
202 false => Ok(()),
204 }
205 }
206
207 pub async fn run(
209 &mut self,
210 ping: Option<Arc<Ping<N>>>,
211 bft_sender: Option<BFTSender<N>>,
212 primary_sender: PrimarySender<N>,
213 primary_receiver: PrimaryReceiver<N>,
214 ) -> Result<()> {
215 info!("Starting the primary instance of the memory pool...");
216
217 if let Some(bft_sender) = &bft_sender {
219 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
221 }
222
223 let mut worker_senders = IndexMap::new();
225 let mut workers = Vec::new();
227 for id in 0..MAX_WORKERS {
229 let (tx_worker, rx_worker) = init_worker_channels();
231 let worker = Worker::new(
233 id,
234 Arc::new(self.gateway.clone()),
235 self.storage.clone(),
236 self.ledger.clone(),
237 self.proposed_batch.clone(),
238 )?;
239 worker.run(rx_worker);
241 workers.push(worker);
243 worker_senders.insert(id, tx_worker);
245 }
246 self.workers = Arc::from(workers);
248
249 let (sync_sender, sync_receiver) = init_sync_channels();
251 self.sync.initialize(bft_sender).await?;
253 self.load_proposal_cache().await?;
255 self.sync.run(ping, sync_receiver).await?;
257 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
259 self.start_handlers(primary_receiver);
262
263 Ok(())
264 }
265
266 pub fn current_round(&self) -> u64 {
268 self.storage.current_round()
269 }
270
271 pub fn is_synced(&self) -> bool {
273 self.sync.is_synced()
274 }
275
276 pub const fn gateway(&self) -> &Gateway<N> {
278 &self.gateway
279 }
280
281 pub const fn storage(&self) -> &Storage<N> {
283 &self.storage
284 }
285
286 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
288 &self.ledger
289 }
290
291 pub fn num_workers(&self) -> u8 {
293 u8::try_from(self.workers.len()).expect("Too many workers")
294 }
295
296 pub const fn workers(&self) -> &Arc<[Worker<N>]> {
298 &self.workers
299 }
300
301 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
303 &self.proposed_batch
304 }
305}
306
307impl<N: Network> Primary<N> {
308 pub fn num_unconfirmed_transmissions(&self) -> usize {
310 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
311 }
312
313 pub fn num_unconfirmed_ratifications(&self) -> usize {
315 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
316 }
317
318 pub fn num_unconfirmed_solutions(&self) -> usize {
320 self.workers.iter().map(|worker| worker.num_solutions()).sum()
321 }
322
323 pub fn num_unconfirmed_transactions(&self) -> usize {
325 self.workers.iter().map(|worker| worker.num_transactions()).sum()
326 }
327}
328
329impl<N: Network> Primary<N> {
330 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
332 self.workers.iter().flat_map(|worker| worker.transmission_ids())
333 }
334
335 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
337 self.workers.iter().flat_map(|worker| worker.transmissions())
338 }
339
340 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
342 self.workers.iter().flat_map(|worker| worker.solutions())
343 }
344
345 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
347 self.workers.iter().flat_map(|worker| worker.transactions())
348 }
349}
350
351impl<N: Network> Primary<N> {
352 pub fn clear_worker_solutions(&self) {
354 self.workers.iter().for_each(Worker::clear_solutions);
355 }
356}
357
358impl<N: Network> Primary<N> {
359 pub async fn propose_batch(&self) -> Result<()> {
367 let mut lock_guard = self.propose_lock.lock().await;
369
370 if let Err(e) = self.check_proposed_batch_for_expiration().await {
372 warn!("Failed to check the proposed batch for expiration - {e}");
373 return Ok(());
374 }
375
376 let round = self.current_round();
378 let previous_round = round.saturating_sub(1);
380
381 ensure!(round > 0, "Round 0 cannot have transaction batches");
385
386 if round < *lock_guard {
388 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
389 return Ok(());
390 }
391
392 if let Some(proposal) = self.proposed_batch.read().as_ref() {
395 if round < proposal.round()
397 || proposal
398 .batch_header()
399 .previous_certificate_ids()
400 .iter()
401 .any(|id| !self.storage.contains_certificate(*id))
402 {
403 warn!(
404 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
405 proposal.round(),
406 );
407 return Ok(());
408 }
409 let event = Event::BatchPropose(proposal.batch_header().clone().into());
412 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
414 match self.gateway.resolver().read().get_peer_ip_for_address(address) {
416 Some(peer_ip) => {
418 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
419 tokio::spawn(async move {
420 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
421 if gateway.send(peer_ip, event_).await.is_none() {
423 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
424 }
425 });
426 }
427 None => continue,
428 }
429 }
430 debug!("Proposed batch for round {} is still valid", proposal.round());
431 return Ok(());
432 }
433
434 #[cfg(feature = "metrics")]
435 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
436
437 if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
439 debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
440 return Ok(());
441 }
442
443 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
445 if let Some(bft_sender) = self.bft_sender.get() {
447 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
448 Ok(true) => (), Ok(false) => return Ok(()),
452 Err(e) => {
454 warn!("Failed to update the BFT to the next round - {e}");
455 return Err(e);
456 }
457 }
458 }
459 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
460 return Ok(());
461 }
462
463 if round == *lock_guard {
469 warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
470 return Ok(());
471 }
472
473 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
475 {
477 let mut connected_validators = self.gateway.connected_addresses();
479 connected_validators.insert(self.gateway.account().address());
481 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
483 debug!(
484 "Primary is safely skipping a batch proposal for round {round} {}",
485 "(please connect to more validators)".dimmed()
486 );
487 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
488 return Ok(());
489 }
490 }
491
492 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
494
495 let mut is_ready = previous_round == 0;
498 if previous_round > 0 {
500 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
502 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
503 };
504 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
506 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
508 is_ready = true;
509 }
510 }
511 if !is_ready {
513 debug!(
514 "Primary is safely skipping a batch proposal for round {round} {}",
515 format!("(previous round {previous_round} has not reached quorum)").dimmed()
516 );
517 return Ok(());
518 }
519
520 let mut transmissions: IndexMap<_, _> = Default::default();
522 let mut proposal_cost = 0u64;
524 debug_assert_eq!(MAX_WORKERS, 1);
528
529 'outer: for worker in self.workers().iter() {
530 let mut num_worker_transmissions = 0usize;
531
532 while let Some((id, transmission)) = worker.remove_front() {
533 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
535 worker.insert_front(id, transmission);
537 break 'outer;
538 }
539
540 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
542 worker.insert_front(id, transmission);
544 continue 'outer;
545 }
546
547 if self.ledger.contains_transmission(&id).unwrap_or(true) {
549 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
550 continue;
551 }
552
553 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
557 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
558 continue;
559 }
560
561 match (id, transmission.clone()) {
563 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
564 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
566 {
567 trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
568 continue;
569 }
570 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
572 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
573 continue;
574 }
575 }
576 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
577 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
579 {
580 trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
581 continue;
582 }
583
584 let transaction = spawn_blocking!({
586 match transaction {
587 Data::Object(transaction) => Ok(transaction),
588 Data::Buffer(bytes) => {
589 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
590 }
591 }
592 })?;
593
594 let current_block_height = self.ledger.latest_block_height();
598 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
599 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
600 let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
601 if current_block_height > consensus_version_v7_height
602 && current_block_height <= consensus_version_v8_height
603 && transaction.is_deploy()
604 {
605 trace!(
606 "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})",
607 fmt_id(transaction_id)
608 );
609 continue;
610 }
611
612 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
615 else {
616 debug!(
617 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
618 fmt_id(transaction_id)
619 );
620 continue;
621 };
622
623 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
625 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
626 continue;
627 }
628
629 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
632 debug!(
633 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
634 fmt_id(transaction_id)
635 );
636 continue;
637 };
638
639 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
641 if next_proposal_cost > batch_spend_limit {
642 debug!(
643 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
644 fmt_id(transaction_id),
645 batch_spend_limit
646 );
647
648 worker.insert_front(id, transmission);
650 break 'outer;
651 }
652
653 proposal_cost = next_proposal_cost;
655 }
656
657 (TransmissionID::Ratification, Transmission::Ratification) => continue,
660 _ => continue,
662 }
663
664 transmissions.insert(id, transmission);
666 num_worker_transmissions = num_worker_transmissions.saturating_add(1);
667 }
668 }
669
670 let current_timestamp = now();
672
673 *lock_guard = round;
674
675 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
677
678 let private_key = *self.gateway.account().private_key();
680 let committee_id = committee_lookback.id();
682 let transmission_ids = transmissions.keys().copied().collect();
684 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
686 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
688 &private_key,
689 round,
690 current_timestamp,
691 committee_id,
692 transmission_ids,
693 previous_certificate_ids,
694 &mut rand::thread_rng()
695 ))
696 .and_then(|batch_header| {
697 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
698 .map(|proposal| (batch_header, proposal))
699 })
700 .inspect_err(|_| {
701 if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
703 error!("Failed to reinsert transmissions: {e:?}");
704 }
705 })?;
706 self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
708 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
710 *self.proposed_batch.write() = Some(proposal);
712 Ok(())
713 }
714
715 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
725 let BatchPropose { round: batch_round, batch_header } = batch_propose;
726
727 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
729 if batch_round != batch_header.round() {
731 self.gateway.disconnect(peer_ip);
733 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
734 }
735
736 let batch_author = batch_header.author();
738
739 match self.gateway.resolve_to_aleo_addr(peer_ip) {
741 Some(address) => {
743 if address != batch_author {
744 self.gateway.disconnect(peer_ip);
746 bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
747 }
748 }
749 None => bail!("Batch proposal from a disconnected validator"),
750 }
751 if !self.gateway.is_authorized_validator_address(batch_author) {
753 self.gateway.disconnect(peer_ip);
755 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
756 }
757 if self.gateway.account().address() == batch_author {
759 bail!("Invalid peer - proposed batch from myself ({batch_author})");
760 }
761
762 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
764 if expected_committee_id != batch_header.committee_id() {
765 self.gateway.disconnect(peer_ip);
767 bail!(
768 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
769 batch_header.committee_id()
770 );
771 }
772
773 if let Some((signed_round, signed_batch_id, signature)) =
775 self.signed_proposals.read().get(&batch_author).copied()
776 {
777 if signed_round > batch_header.round() {
780 bail!(
781 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
782 batch_header.round()
783 );
784 }
785
786 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
788 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
789 }
790 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
793 let gateway = self.gateway.clone();
794 tokio::spawn(async move {
795 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
796 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
797 if gateway.send(peer_ip, event).await.is_none() {
799 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
800 }
801 });
802 return Ok(());
804 }
805 }
806
807 if self.storage.contains_batch(batch_header.batch_id()) {
810 debug!(
811 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
812 format!("batch for round {batch_round} already exists in storage").dimmed()
813 );
814 return Ok(());
815 }
816
817 let previous_round = batch_round.saturating_sub(1);
819 if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
821 self.gateway.disconnect(peer_ip);
823 bail!("Malicious peer - {e} from '{peer_ip}'");
824 }
825
826 if batch_header.contains(TransmissionID::Ratification) {
828 self.gateway.disconnect(peer_ip);
830 bail!(
831 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
832 );
833 }
834
835 let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
837
838 if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
840 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
842 }) {
843 debug!("Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission - {err}",);
844 return Ok(());
845 }
846
847 if let Err(e) = self.ensure_is_signing_round(batch_round) {
851 debug!("{e} from '{peer_ip}'");
853 return Ok(());
854 }
855
856 let (storage, header) = (self.storage.clone(), batch_header.clone());
858 let missing_transmissions =
859 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
860 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
862
863 let block_height = self.ledger.latest_block_height() + 1;
865 if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
866 let mut proposal_cost = 0u64;
867 for transmission_id in batch_header.transmission_ids() {
868 let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
869 let Some(worker) = self.workers.get(worker_id as usize) else {
870 debug!("Unable to find worker {worker_id}");
871 return Ok(());
872 };
873
874 let Some(transmission) = worker.get_transmission(*transmission_id) else {
875 debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
876 return Ok(());
877 };
878
879 if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
881 (transmission_id, transmission)
882 {
883 let transaction = spawn_blocking!({
885 match transaction {
886 Data::Object(transaction) => Ok(transaction),
887 Data::Buffer(bytes) => {
888 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
889 }
890 }
891 })?;
892
893 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
897 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
898 let consensus_version = N::CONSENSUS_VERSION(block_height)?;
899 if block_height > consensus_version_v7_height
900 && block_height <= consensus_version_v8_height
901 && transaction.is_deploy()
902 {
903 bail!(
904 "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})",
905 )
906 }
907
908 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
911 else {
912 bail!(
913 "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
914 fmt_id(transaction_id)
915 )
916 };
917
918 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
921 bail!(
922 "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
923 fmt_id(transaction_id)
924 )
925 };
926
927 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
929 if next_proposal_cost > batch_spend_limit {
930 bail!(
931 "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
932 fmt_id(transaction_id),
933 batch_spend_limit
934 );
935 }
936
937 proposal_cost = next_proposal_cost;
939 }
940 }
941 }
942
943 let batch_id = batch_header.batch_id();
947 let account = self.gateway.account().clone();
949 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
950
951 match self.signed_proposals.write().0.entry(batch_author) {
957 std::collections::hash_map::Entry::Occupied(mut entry) => {
958 if entry.get().0 == batch_round {
963 return Ok(());
964 }
965 entry.insert((batch_round, batch_id, signature));
967 }
968 std::collections::hash_map::Entry::Vacant(entry) => {
970 entry.insert((batch_round, batch_id, signature));
972 }
973 };
974
975 let self_ = self.clone();
977 tokio::spawn(async move {
978 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
979 if self_.gateway.send(peer_ip, event).await.is_some() {
981 debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
982 }
983 });
984
985 Ok(())
986 }
987
988 async fn process_batch_signature_from_peer(
997 &self,
998 peer_ip: SocketAddr,
999 batch_signature: BatchSignature<N>,
1000 ) -> Result<()> {
1001 self.check_proposed_batch_for_expiration().await?;
1003
1004 let BatchSignature { batch_id, signature } = batch_signature;
1006
1007 let signer = signature.to_address();
1009
1010 if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1012 self.gateway.disconnect(peer_ip);
1014 bail!("Malicious peer - batch signature is from a different validator ({signer})");
1015 }
1016 if self.gateway.account().address() == signer {
1018 bail!("Invalid peer - received a batch signature from myself ({signer})");
1019 }
1020
1021 let self_ = self.clone();
1022 let Some(proposal) = spawn_blocking!({
1023 let mut proposed_batch = self_.proposed_batch.write();
1025 match proposed_batch.as_mut() {
1027 Some(proposal) => {
1028 if proposal.batch_id() != batch_id {
1030 match self_.storage.contains_batch(batch_id) {
1031 true => {
1033 debug!(
1034 "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1035 proposal.round()
1036 );
1037 return Ok(None);
1038 }
1039 false => bail!(
1041 "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1042 proposal.batch_id(),
1043 proposal.round()
1044 ),
1045 }
1046 }
1047 let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1049 let Some(signer) = self_.gateway.resolve_to_aleo_addr(peer_ip) else {
1051 bail!("Signature is from a disconnected validator");
1052 };
1053 proposal.add_signature(signer, signature, &committee_lookback)?;
1055 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1056 if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1058 return Ok(None);
1060 }
1061 }
1062 None => return Ok(None),
1064 };
1065 match proposed_batch.take() {
1067 Some(proposal) => Ok(Some(proposal)),
1068 None => Ok(None),
1069 }
1070 })?
1071 else {
1072 return Ok(());
1073 };
1074
1075 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1078
1079 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1081 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1084 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1086 return Err(e);
1087 }
1088
1089 #[cfg(feature = "metrics")]
1090 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1091 Ok(())
1092 }
1093
1094 async fn process_batch_certificate_from_peer(
1101 &self,
1102 peer_ip: SocketAddr,
1103 certificate: BatchCertificate<N>,
1104 ) -> Result<()> {
1105 if !self.gateway.is_authorized_validator_ip(peer_ip) {
1107 self.gateway.disconnect(peer_ip);
1109 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1110 }
1111 if self.storage.contains_certificate(certificate.id()) {
1113 return Ok(());
1114 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1116 self.storage.insert_unprocessed_certificate(certificate.clone())?;
1117 }
1118
1119 let author = certificate.author();
1121 let certificate_round = certificate.round();
1123 let committee_id = certificate.committee_id();
1125
1126 if self.gateway.account().address() == author {
1128 bail!("Received a batch certificate for myself ({author})");
1129 }
1130
1131 self.storage.check_incoming_certificate(&certificate)?;
1133
1134 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1146
1147 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1152
1153 let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1155 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1157
1158 let expected_committee_id = committee_lookback.id();
1160 if expected_committee_id != committee_id {
1161 self.gateway.disconnect(peer_ip);
1163 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1164 }
1165
1166 let should_advance = match &*self.proposed_batch.read() {
1170 Some(proposal) => proposal.round() < certificate_round,
1172 None => true,
1174 };
1175
1176 let current_round = self.current_round();
1178
1179 if is_quorum && should_advance && certificate_round >= current_round {
1181 self.try_increment_to_the_next_round(current_round + 1).await?;
1183 }
1184 Ok(())
1185 }
1186}
1187
1188impl<N: Network> Primary<N> {
1189 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1198 let PrimaryReceiver {
1199 mut rx_batch_propose,
1200 mut rx_batch_signature,
1201 mut rx_batch_certified,
1202 mut rx_primary_ping,
1203 mut rx_unconfirmed_solution,
1204 mut rx_unconfirmed_transaction,
1205 } = primary_receiver;
1206
1207 let self_ = self.clone();
1209 self.spawn(async move {
1210 loop {
1211 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1213
1214 let self__ = self_.clone();
1216 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1217 Ok(block_locators) => block_locators,
1218 Err(e) => {
1219 warn!("Failed to retrieve block locators - {e}");
1220 continue;
1221 }
1222 };
1223
1224 let primary_certificate = {
1226 let primary_address = self_.gateway.account().address();
1228
1229 let mut certificate = None;
1231 let mut current_round = self_.current_round();
1232 while certificate.is_none() {
1233 if current_round == 0 {
1235 break;
1236 }
1237 if let Some(primary_certificate) =
1239 self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1240 {
1241 certificate = Some(primary_certificate);
1242 } else {
1244 current_round = current_round.saturating_sub(1);
1245 }
1246 }
1247
1248 match certificate {
1250 Some(certificate) => certificate,
1251 None => continue,
1253 }
1254 };
1255
1256 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1258 self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1260 }
1261 });
1262
1263 let self_ = self.clone();
1265 self.spawn(async move {
1266 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1267 if self_.sync.is_synced() {
1269 trace!("Processing new primary ping from '{peer_ip}'");
1270 } else {
1271 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1272 continue;
1273 }
1274
1275 {
1277 let self_ = self_.clone();
1278 tokio::spawn(async move {
1279 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1281 else {
1282 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1283 return;
1284 };
1285 let id = fmt_id(primary_certificate.id());
1287 let round = primary_certificate.round();
1288 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1289 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1290 }
1291 });
1292 }
1293 }
1294 });
1295
1296 let self_ = self.clone();
1298 self.spawn(async move {
1299 loop {
1300 tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1301 if !self_.sync.is_synced() {
1303 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1304 continue;
1305 }
1306 for worker in self_.workers.iter() {
1308 worker.broadcast_ping();
1309 }
1310 }
1311 });
1312
1313 let self_ = self.clone();
1315 self.spawn(async move {
1316 loop {
1317 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1319 let current_round = self_.current_round();
1320 if !self_.sync.is_synced() {
1322 debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1323 continue;
1324 }
1325 if self_.propose_lock.try_lock().is_err() {
1328 trace!(
1329 "Skipping batch proposal for round {current_round} {}",
1330 "(node is already proposing)".dimmed()
1331 );
1332 continue;
1333 };
1334 if let Err(e) = self_.propose_batch().await {
1338 warn!("Cannot propose a batch - {e}");
1339 }
1340 }
1341 });
1342
1343 let self_ = self.clone();
1345 self.spawn(async move {
1346 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1347 if !self_.sync.is_synced() {
1349 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1350 continue;
1351 }
1352 let self_ = self_.clone();
1354 tokio::spawn(async move {
1355 let round = batch_propose.round;
1357 if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1358 warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1359 }
1360 });
1361 }
1362 });
1363
1364 let self_ = self.clone();
1366 self.spawn(async move {
1367 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1368 if !self_.sync.is_synced() {
1370 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1371 continue;
1372 }
1373 let id = fmt_id(batch_signature.batch_id);
1379 if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1380 warn!("Cannot store a signature for batch '{id}' from '{peer_ip}' - {e}");
1381 }
1382 }
1383 });
1384
1385 let self_ = self.clone();
1387 self.spawn(async move {
1388 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1389 if !self_.sync.is_synced() {
1391 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1392 continue;
1393 }
1394 let self_ = self_.clone();
1396 tokio::spawn(async move {
1397 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1399 warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1400 return;
1401 };
1402 let id = fmt_id(batch_certificate.id());
1404 let round = batch_certificate.round();
1405 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1406 warn!("Cannot store a certificate '{id}' for round {round} from '{peer_ip}' - {e}");
1407 }
1408 });
1409 }
1410 });
1411
1412 let self_ = self.clone();
1417 self.spawn(async move {
1418 loop {
1419 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1421 if !self_.sync.is_synced() {
1423 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1424 continue;
1425 }
1426 let current_round = self_.current_round();
1428 let next_round = current_round.saturating_add(1);
1429 let is_quorum_threshold_reached = {
1431 let authors = self_.storage.get_certificate_authors_for_round(current_round);
1433 if authors.is_empty() {
1435 continue;
1436 }
1437 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1439 warn!("Failed to retrieve the committee lookback for round {current_round}");
1440 continue;
1441 };
1442 committee_lookback.is_quorum_threshold_reached(&authors)
1444 };
1445 if is_quorum_threshold_reached {
1447 debug!("Quorum threshold reached for round {}", current_round);
1448 if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1449 warn!("Failed to increment to the next round - {e}");
1450 }
1451 }
1452 }
1453 });
1454
1455 let self_ = self.clone();
1457 self.spawn(async move {
1458 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1459 let Ok(checksum) = solution.to_checksum::<N>() else {
1461 error!("Failed to compute the checksum for the unconfirmed solution");
1462 continue;
1463 };
1464 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1466 error!("Unable to determine the worker ID for the unconfirmed solution");
1467 continue;
1468 };
1469 let self_ = self_.clone();
1470 tokio::spawn(async move {
1471 let worker = &self_.workers[worker_id as usize];
1473 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1475 callback.send(result).ok();
1477 });
1478 }
1479 });
1480
1481 let self_ = self.clone();
1483 self.spawn(async move {
1484 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1485 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1486 let Ok(checksum) = transaction.to_checksum::<N>() else {
1488 error!("Failed to compute the checksum for the unconfirmed transaction");
1489 continue;
1490 };
1491 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1493 error!("Unable to determine the worker ID for the unconfirmed transaction");
1494 continue;
1495 };
1496 let self_ = self_.clone();
1497 tokio::spawn(async move {
1498 let worker = &self_.workers[worker_id as usize];
1500 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1502 callback.send(result).ok();
1504 });
1505 }
1506 });
1507 }
1508
1509 async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1511 let is_expired = match self.proposed_batch.read().as_ref() {
1513 Some(proposal) => proposal.round() < self.current_round(),
1514 None => false,
1515 };
1516 if is_expired {
1518 let proposal = self.proposed_batch.write().take();
1520 if let Some(proposal) = proposal {
1521 debug!("Cleared expired proposal for round {}", proposal.round());
1522 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1523 }
1524 }
1525 Ok(())
1526 }
1527
1528 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1530 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1532 let mut fast_forward_round = self.current_round();
1533 while fast_forward_round < next_round.saturating_sub(1) {
1535 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1537 *self.proposed_batch.write() = None;
1539 }
1540 }
1541
1542 let current_round = self.current_round();
1544 if current_round < next_round {
1546 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1548 match bft_sender.send_primary_round_to_bft(current_round).await {
1549 Ok(is_ready) => is_ready,
1550 Err(e) => {
1551 warn!("Failed to update the BFT to the next round - {e}");
1552 return Err(e);
1553 }
1554 }
1555 }
1556 else {
1558 self.storage.increment_to_next_round(current_round)?;
1560 true
1562 };
1563
1564 match is_ready {
1566 true => debug!("Primary is ready to propose the next round"),
1567 false => debug!("Primary is not ready to propose the next round"),
1568 }
1569
1570 if is_ready {
1572 self.propose_batch().await?;
1573 }
1574 }
1575 Ok(())
1576 }
1577
1578 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1582 let current_round = self.current_round();
1584 if current_round + self.storage.max_gc_rounds() <= batch_round {
1586 bail!("Round {batch_round} is too far in the future")
1587 }
1588 if current_round > batch_round + 1 {
1592 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1593 }
1594 if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1596 if signing_round > batch_round {
1597 bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1598 }
1599 }
1600 Ok(())
1601 }
1602
1603 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1606 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1608 Some(certificate) => certificate.timestamp(),
1610 None => match self.gateway.account().address() == author {
1611 true => *self.latest_proposed_batch_timestamp.read(),
1613 false => return Ok(()),
1615 },
1616 };
1617
1618 let elapsed = timestamp
1620 .checked_sub(previous_timestamp)
1621 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1622 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1624 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1625 false => Ok(()),
1626 }
1627 }
1628
1629 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1631 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1633 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1636 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1638 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1639 debug!("Stored a batch certificate for round {}", certificate.round());
1640 if let Some(bft_sender) = self.bft_sender.get() {
1642 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1644 warn!("Failed to update the BFT DAG from primary - {e}");
1645 return Err(e);
1646 };
1647 }
1648 self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1650 let num_transmissions = certificate.transmission_ids().len();
1652 let round = certificate.round();
1653 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1654 self.try_increment_to_the_next_round(round + 1).await
1656 }
1657
1658 fn insert_missing_transmissions_into_workers(
1660 &self,
1661 peer_ip: SocketAddr,
1662 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1663 ) -> Result<()> {
1664 assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1666 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1667 })
1668 }
1669
1670 fn reinsert_transmissions_into_workers(
1672 &self,
1673 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1674 ) -> Result<()> {
1675 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1677 worker.reinsert(transmission_id, transmission);
1678 })
1679 }
1680
1681 #[async_recursion::async_recursion]
1691 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1692 &self,
1693 peer_ip: SocketAddr,
1694 certificate: BatchCertificate<N>,
1695 ) -> Result<()> {
1696 let batch_header = certificate.batch_header();
1698 let batch_round = batch_header.round();
1700
1701 if batch_round <= self.storage.gc_round() {
1703 return Ok(());
1704 }
1705 if self.storage.contains_certificate(certificate.id()) {
1707 return Ok(());
1708 }
1709
1710 if !IS_SYNCING && !self.is_synced() {
1712 bail!(
1713 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1714 fmt_id(certificate.id())
1715 );
1716 }
1717
1718 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1720
1721 if !self.storage.contains_certificate(certificate.id()) {
1723 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1725 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1726 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1727 if let Some(bft_sender) = self.bft_sender.get() {
1729 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1731 warn!("Failed to update the BFT DAG from sync: {e}");
1732 return Err(e);
1733 };
1734 }
1735 }
1736 Ok(())
1737 }
1738
1739 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1741 &self,
1742 peer_ip: SocketAddr,
1743 batch_header: &BatchHeader<N>,
1744 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1745 let batch_round = batch_header.round();
1747
1748 if batch_round <= self.storage.gc_round() {
1750 bail!("Round {batch_round} is too far in the past")
1751 }
1752
1753 if !IS_SYNCING && !self.is_synced() {
1755 bail!(
1756 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1757 fmt_id(batch_header.batch_id())
1758 );
1759 }
1760
1761 let is_quorum_threshold_reached = {
1763 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1764 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1765 committee_lookback.is_quorum_threshold_reached(&authors)
1766 };
1767
1768 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1773 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1775 if is_behind_schedule || is_peer_far_in_future {
1777 self.try_increment_to_the_next_round(batch_round).await?;
1779 }
1780
1781 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1783
1784 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1786
1787 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1789 missing_transmissions_handle,
1790 missing_previous_certificates_handle,
1791 ).map_err(|e| {
1792 anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1793 })?;
1794
1795 for batch_certificate in missing_previous_certificates {
1797 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1799 }
1800 Ok(missing_transmissions)
1801 }
1802
1803 async fn fetch_missing_transmissions(
1806 &self,
1807 peer_ip: SocketAddr,
1808 batch_header: &BatchHeader<N>,
1809 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1810 if batch_header.round() <= self.storage.gc_round() {
1812 return Ok(Default::default());
1813 }
1814
1815 if self.storage.contains_batch(batch_header.batch_id()) {
1817 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1818 return Ok(Default::default());
1819 }
1820
1821 let workers = self.workers.clone();
1823
1824 let mut fetch_transmissions = FuturesUnordered::new();
1826
1827 let num_workers = self.num_workers();
1829 for transmission_id in batch_header.transmission_ids() {
1831 if !self.storage.contains_transmission(*transmission_id) {
1833 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1835 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1836 };
1837 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1839 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1841 }
1842 }
1843
1844 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1846 while let Some(result) = fetch_transmissions.next().await {
1848 let (transmission_id, transmission) = result?;
1850 transmissions.insert(transmission_id, transmission);
1852 }
1853 Ok(transmissions)
1855 }
1856
1857 async fn fetch_missing_previous_certificates(
1859 &self,
1860 peer_ip: SocketAddr,
1861 batch_header: &BatchHeader<N>,
1862 ) -> Result<HashSet<BatchCertificate<N>>> {
1863 let round = batch_header.round();
1865 if round == 1 || round <= self.storage.gc_round() + 1 {
1867 return Ok(Default::default());
1868 }
1869
1870 let missing_previous_certificates =
1872 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1873 if !missing_previous_certificates.is_empty() {
1874 debug!(
1875 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1876 missing_previous_certificates.len(),
1877 );
1878 }
1879 Ok(missing_previous_certificates)
1881 }
1882
1883 async fn fetch_missing_certificates(
1885 &self,
1886 peer_ip: SocketAddr,
1887 round: u64,
1888 certificate_ids: &IndexSet<Field<N>>,
1889 ) -> Result<HashSet<BatchCertificate<N>>> {
1890 let mut fetch_certificates = FuturesUnordered::new();
1892 let mut missing_certificates = HashSet::default();
1894 for certificate_id in certificate_ids {
1896 if self.ledger.contains_certificate(certificate_id)? {
1898 continue;
1899 }
1900 if self.storage.contains_certificate(*certificate_id) {
1902 continue;
1903 }
1904 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1906 missing_certificates.insert(certificate);
1907 } else {
1908 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1910 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1913 }
1914 }
1915
1916 match fetch_certificates.is_empty() {
1918 true => return Ok(missing_certificates),
1919 false => trace!(
1920 "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1921 fetch_certificates.len(),
1922 ),
1923 }
1924
1925 while let Some(result) = fetch_certificates.next().await {
1927 missing_certificates.insert(result?);
1929 }
1930 Ok(missing_certificates)
1932 }
1933}
1934
1935impl<N: Network> Primary<N> {
1936 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1938 self.handles.lock().push(tokio::spawn(future));
1939 }
1940
1941 pub async fn shut_down(&self) {
1943 info!("Shutting down the primary...");
1944 self.workers.iter().for_each(|worker| worker.shut_down());
1946 self.handles.lock().iter().for_each(|handle| handle.abort());
1948 let proposal_cache = {
1950 let proposal = self.proposed_batch.write().take();
1951 let signed_proposals = self.signed_proposals.read().clone();
1952 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1953 let pending_certificates = self.storage.get_pending_certificates();
1954 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1955 };
1956 if let Err(err) = proposal_cache.store(&self.storage_mode) {
1957 error!("Failed to store the current proposal cache: {err}");
1958 }
1959 self.gateway.shut_down().await;
1961 }
1962}
1963
1964#[cfg(test)]
1965mod tests {
1966 use super::*;
1967 use snarkos_node_bft_ledger_service::MockLedgerService;
1968 use snarkos_node_bft_storage_service::BFTMemoryService;
1969 use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
1970 use snarkvm::{
1971 ledger::{
1972 committee::{Committee, MIN_VALIDATOR_STAKE},
1973 test_helpers::sample_execution_transaction_with_fee,
1974 },
1975 prelude::{Address, Signature},
1976 };
1977
1978 use bytes::Bytes;
1979 use indexmap::IndexSet;
1980 use rand::RngCore;
1981
1982 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1983
1984 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
1985 const COMMITTEE_SIZE: usize = 4;
1987 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1988 let mut members = IndexMap::new();
1989
1990 for i in 0..COMMITTEE_SIZE {
1991 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1992 let account = Account::new(rng).unwrap();
1993
1994 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1995 accounts.push((socket_addr, account));
1996 }
1997
1998 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1999 }
2000
2001 fn primary_with_committee(
2003 account_index: usize,
2004 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2005 committee: Committee<CurrentNetwork>,
2006 height: u32,
2007 ) -> Primary<CurrentNetwork> {
2008 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2009 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2010
2011 let account = accounts[account_index].1.clone();
2013 let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2014 let mut primary =
2015 Primary::new(account, storage, ledger, block_sync, None, &[], false, StorageMode::new_test(None), None)
2016 .unwrap();
2017
2018 primary.workers = Arc::from([Worker::new(
2020 0, Arc::new(primary.gateway.clone()),
2022 primary.storage.clone(),
2023 primary.ledger.clone(),
2024 primary.proposed_batch.clone(),
2025 )
2026 .unwrap()]);
2027 for a in accounts.iter().skip(account_index) {
2028 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2029 }
2030
2031 primary
2032 }
2033
2034 fn primary_without_handlers(
2035 rng: &mut TestRng,
2036 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2037 let (accounts, committee) = sample_committee(rng);
2038 let primary = primary_with_committee(
2039 0, &accounts,
2041 committee,
2042 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2043 );
2044
2045 (primary, accounts)
2046 }
2047
2048 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2050 let solution_id = rng.r#gen::<u64>().into();
2052 let size = rng.gen_range(1024..10 * 1024);
2054 let mut vec = vec![0u8; size];
2056 rng.fill_bytes(&mut vec);
2057 let solution = Data::Buffer(Bytes::from(vec));
2058 (solution_id, solution)
2060 }
2061
2062 fn sample_unconfirmed_transaction(
2064 rng: &mut TestRng,
2065 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2066 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2067 let id = transaction.id();
2068
2069 (id, Data::Object(transaction))
2070 }
2071
2072 fn create_test_proposal(
2074 author: &Account<CurrentNetwork>,
2075 committee: Committee<CurrentNetwork>,
2076 round: u64,
2077 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2078 timestamp: i64,
2079 num_transactions: u64,
2080 rng: &mut TestRng,
2081 ) -> Proposal<CurrentNetwork> {
2082 let mut transmission_ids = IndexSet::new();
2083 let mut transmissions = IndexMap::new();
2084
2085 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2087 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2088 let solution_transmission_id = (solution_id, solution_checksum).into();
2089 transmission_ids.insert(solution_transmission_id);
2090 transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2091
2092 for _ in 0..num_transactions {
2094 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2095 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2096 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2097 transmission_ids.insert(transaction_transmission_id);
2098 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2099 }
2100
2101 let private_key = author.private_key();
2103 let batch_header = BatchHeader::new(
2105 private_key,
2106 round,
2107 timestamp,
2108 committee.id(),
2109 transmission_ids,
2110 previous_certificate_ids,
2111 rng,
2112 )
2113 .unwrap();
2114 Proposal::new(committee, batch_header, transmissions).unwrap()
2116 }
2117
2118 fn peer_signatures_for_proposal(
2121 primary: &Primary<CurrentNetwork>,
2122 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2123 rng: &mut TestRng,
2124 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2125 let mut signatures = Vec::with_capacity(accounts.len() - 1);
2127 for (socket_addr, account) in accounts {
2128 if account.address() == primary.gateway.account().address() {
2129 continue;
2130 }
2131 let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2132 let signature = account.sign(&[batch_id], rng).unwrap();
2133 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2134 }
2135
2136 signatures
2137 }
2138
2139 fn peer_signatures_for_batch(
2141 primary_address: Address<CurrentNetwork>,
2142 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2143 batch_id: Field<CurrentNetwork>,
2144 rng: &mut TestRng,
2145 ) -> IndexSet<Signature<CurrentNetwork>> {
2146 let mut signatures = IndexSet::new();
2147 for (_, account) in accounts {
2148 if account.address() == primary_address {
2149 continue;
2150 }
2151 let signature = account.sign(&[batch_id], rng).unwrap();
2152 signatures.insert(signature);
2153 }
2154 signatures
2155 }
2156
2157 fn create_batch_certificate(
2159 primary_address: Address<CurrentNetwork>,
2160 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2161 round: u64,
2162 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2163 rng: &mut TestRng,
2164 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2165 let timestamp = now();
2166
2167 let author =
2168 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2169 let private_key = author.private_key();
2170
2171 let committee_id = Field::rand(rng);
2172 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2173 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2174 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2175 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2176
2177 let solution_transmission_id = (solution_id, solution_checksum).into();
2178 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2179
2180 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2181 let transmissions = [
2182 (solution_transmission_id, Transmission::Solution(solution)),
2183 (transaction_transmission_id, Transmission::Transaction(transaction)),
2184 ]
2185 .into();
2186
2187 let batch_header = BatchHeader::new(
2188 private_key,
2189 round,
2190 timestamp,
2191 committee_id,
2192 transmission_ids,
2193 previous_certificate_ids,
2194 rng,
2195 )
2196 .unwrap();
2197 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2198 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2199 (certificate, transmissions)
2200 }
2201
2202 fn store_certificate_chain(
2204 primary: &Primary<CurrentNetwork>,
2205 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2206 round: u64,
2207 rng: &mut TestRng,
2208 ) -> IndexSet<Field<CurrentNetwork>> {
2209 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2210 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2211 for cur_round in 1..round {
2212 for (_, account) in accounts.iter() {
2213 let (certificate, transmissions) = create_batch_certificate(
2214 account.address(),
2215 accounts,
2216 cur_round,
2217 previous_certificates.clone(),
2218 rng,
2219 );
2220 next_certificates.insert(certificate.id());
2221 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2222 }
2223
2224 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2225 previous_certificates = next_certificates;
2226 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2227 }
2228
2229 previous_certificates
2230 }
2231
2232 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2235 for (addr, acct) in accounts.iter().skip(1) {
2237 primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2238 }
2239 }
2240
2241 #[tokio::test]
2242 async fn test_propose_batch() {
2243 let mut rng = TestRng::default();
2244 let (primary, _) = primary_without_handlers(&mut rng);
2245
2246 assert!(primary.proposed_batch.read().is_none());
2248
2249 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2251 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2252
2253 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2255 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2256
2257 assert!(primary.propose_batch().await.is_ok());
2259 assert!(primary.proposed_batch.read().is_some());
2260 }
2261
2262 #[tokio::test]
2263 async fn test_propose_batch_with_no_transmissions() {
2264 let mut rng = TestRng::default();
2265 let (primary, _) = primary_without_handlers(&mut rng);
2266
2267 assert!(primary.proposed_batch.read().is_none());
2269
2270 assert!(primary.propose_batch().await.is_ok());
2272 assert!(primary.proposed_batch.read().is_some());
2273 }
2274
2275 #[tokio::test]
2276 async fn test_propose_batch_in_round() {
2277 let round = 3;
2278 let mut rng = TestRng::default();
2279 let (primary, accounts) = primary_without_handlers(&mut rng);
2280
2281 store_certificate_chain(&primary, &accounts, round, &mut rng);
2283
2284 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2286
2287 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2289 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2290
2291 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2293 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2294
2295 assert!(primary.propose_batch().await.is_ok());
2297 assert!(primary.proposed_batch.read().is_some());
2298 }
2299
2300 #[tokio::test]
2301 async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2302 let round = 3;
2303 let prev_round = round - 1;
2304 let mut rng = TestRng::default();
2305 let (primary, accounts) = primary_without_handlers(&mut rng);
2306 let peer_account = &accounts[1];
2307 let peer_ip = peer_account.0;
2308
2309 store_certificate_chain(&primary, &accounts, round, &mut rng);
2311
2312 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2314
2315 let mut num_transmissions_in_previous_round = 0;
2317
2318 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2320 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2321 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2322 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2323
2324 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2326 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2327
2328 assert_eq!(primary.workers[0].num_transmissions(), 2);
2330
2331 for (_, account) in accounts.iter() {
2333 let (certificate, transmissions) = create_batch_certificate(
2334 account.address(),
2335 &accounts,
2336 round,
2337 previous_certificate_ids.clone(),
2338 &mut rng,
2339 );
2340
2341 for (transmission_id, transmission) in transmissions.iter() {
2343 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2344 }
2345
2346 num_transmissions_in_previous_round += transmissions.len();
2348 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2349 }
2350
2351 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2353
2354 assert!(primary.storage.increment_to_next_round(round).is_ok());
2356
2357 assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2359
2360 assert!(primary.propose_batch().await.is_ok());
2362
2363 let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2365 assert_eq!(proposed_transmissions.len(), 2);
2366 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2367 assert!(
2368 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2369 );
2370 }
2371
2372 #[tokio::test]
2373 async fn test_propose_batch_over_spend_limit() {
2374 let mut rng = TestRng::default();
2375
2376 let (accounts, committee) = sample_committee(&mut rng);
2378 let primary = primary_with_committee(
2379 0,
2380 &accounts,
2381 committee.clone(),
2382 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2383 );
2384
2385 assert!(primary.proposed_batch.read().is_none());
2387 primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2389
2390 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2392 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2393
2394 for _i in 0..5 {
2395 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2396 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2398 }
2399
2400 assert!(primary.propose_batch().await.is_ok());
2402 assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2404 assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2406 }
2407
2408 #[tokio::test]
2409 async fn test_batch_propose_from_peer() {
2410 let mut rng = TestRng::default();
2411 let (primary, accounts) = primary_without_handlers(&mut rng);
2412
2413 let round = 1;
2415 let peer_account = &accounts[1];
2416 let peer_ip = peer_account.0;
2417 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2418 let proposal = create_test_proposal(
2419 &peer_account.1,
2420 primary.ledger.current_committee().unwrap(),
2421 round,
2422 Default::default(),
2423 timestamp,
2424 1,
2425 &mut rng,
2426 );
2427
2428 for (transmission_id, transmission) in proposal.transmissions() {
2430 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2431 }
2432
2433 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2435
2436 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2439 primary.sync.testing_only_try_block_sync_testing_only().await;
2440
2441 assert!(
2443 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2444 );
2445 }
2446
2447 #[tokio::test]
2448 async fn test_batch_propose_from_peer_when_not_synced() {
2449 let mut rng = TestRng::default();
2450 let (primary, accounts) = primary_without_handlers(&mut rng);
2451
2452 let round = 1;
2454 let peer_account = &accounts[1];
2455 let peer_ip = peer_account.0;
2456 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2457 let proposal = create_test_proposal(
2458 &peer_account.1,
2459 primary.ledger.current_committee().unwrap(),
2460 round,
2461 Default::default(),
2462 timestamp,
2463 1,
2464 &mut rng,
2465 );
2466
2467 for (transmission_id, transmission) in proposal.transmissions() {
2469 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2470 }
2471
2472 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2474
2475 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2477
2478 assert!(
2480 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2481 );
2482 }
2483
2484 #[tokio::test]
2485 async fn test_batch_propose_from_peer_in_round() {
2486 let round = 2;
2487 let mut rng = TestRng::default();
2488 let (primary, accounts) = primary_without_handlers(&mut rng);
2489
2490 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2492
2493 let peer_account = &accounts[1];
2495 let peer_ip = peer_account.0;
2496 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2497 let proposal = create_test_proposal(
2498 &peer_account.1,
2499 primary.ledger.current_committee().unwrap(),
2500 round,
2501 previous_certificates,
2502 timestamp,
2503 1,
2504 &mut rng,
2505 );
2506
2507 for (transmission_id, transmission) in proposal.transmissions() {
2509 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2510 }
2511
2512 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2514
2515 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2518 primary.sync.testing_only_try_block_sync_testing_only().await;
2519
2520 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2522 }
2523
2524 #[tokio::test]
2525 async fn test_batch_propose_from_peer_wrong_round() {
2526 let mut rng = TestRng::default();
2527 let (primary, accounts) = primary_without_handlers(&mut rng);
2528
2529 let round = 1;
2531 let peer_account = &accounts[1];
2532 let peer_ip = peer_account.0;
2533 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2534 let proposal = create_test_proposal(
2535 &peer_account.1,
2536 primary.ledger.current_committee().unwrap(),
2537 round,
2538 Default::default(),
2539 timestamp,
2540 1,
2541 &mut rng,
2542 );
2543
2544 for (transmission_id, transmission) in proposal.transmissions() {
2546 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2547 }
2548
2549 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2551 primary.sync.testing_only_try_block_sync_testing_only().await;
2553
2554 assert!(
2556 primary
2557 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2558 round: round + 1,
2559 batch_header: Data::Object(proposal.batch_header().clone())
2560 })
2561 .await
2562 .is_err()
2563 );
2564 }
2565
2566 #[tokio::test]
2567 async fn test_batch_propose_from_peer_in_round_wrong_round() {
2568 let round = 4;
2569 let mut rng = TestRng::default();
2570 let (primary, accounts) = primary_without_handlers(&mut rng);
2571
2572 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2574
2575 let peer_account = &accounts[1];
2577 let peer_ip = peer_account.0;
2578 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2579 let proposal = create_test_proposal(
2580 &peer_account.1,
2581 primary.ledger.current_committee().unwrap(),
2582 round,
2583 previous_certificates,
2584 timestamp,
2585 1,
2586 &mut rng,
2587 );
2588
2589 for (transmission_id, transmission) in proposal.transmissions() {
2591 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2592 }
2593
2594 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2596 primary.sync.testing_only_try_block_sync_testing_only().await;
2598
2599 assert!(
2601 primary
2602 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2603 round: round + 1,
2604 batch_header: Data::Object(proposal.batch_header().clone())
2605 })
2606 .await
2607 .is_err()
2608 );
2609 }
2610
2611 #[tokio::test]
2613 async fn test_batch_propose_from_peer_with_past_timestamp() {
2614 let round = 2;
2615 let mut rng = TestRng::default();
2616 let (primary, accounts) = primary_without_handlers(&mut rng);
2617
2618 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2620
2621 let peer_account = &accounts[1];
2623 let peer_ip = peer_account.0;
2624
2625 let last_timestamp = primary
2629 .storage
2630 .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2631 .expect("No previous proposal exists")
2632 .timestamp();
2633 let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2634
2635 let proposal = create_test_proposal(
2636 &peer_account.1,
2637 primary.ledger.current_committee().unwrap(),
2638 round,
2639 previous_certificates,
2640 invalid_timestamp,
2641 1,
2642 &mut rng,
2643 );
2644
2645 for (transmission_id, transmission) in proposal.transmissions() {
2647 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2648 }
2649
2650 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2652 primary.sync.testing_only_try_block_sync_testing_only().await;
2654
2655 assert!(
2657 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2658 );
2659 }
2660
2661 #[tokio::test]
2663 async fn test_batch_propose_from_peer_over_spend_limit() {
2664 let mut rng = TestRng::default();
2665
2666 let (accounts, committee) = sample_committee(&mut rng);
2668 let primary_v4 = primary_with_committee(
2669 0,
2670 &accounts,
2671 committee.clone(),
2672 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2673 );
2674 let primary_v5 = primary_with_committee(
2675 1,
2676 &accounts,
2677 committee.clone(),
2678 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2679 );
2680
2681 let round = 1;
2683 let peer_account = &accounts[2];
2684 let peer_ip = peer_account.0;
2685
2686 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2687
2688 let proposal =
2689 create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2690
2691 for (transmission_id, transmission) in proposal.transmissions() {
2693 primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2694 primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2695 }
2696
2697 primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2699 primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2700
2701 primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2703 primary_v4.sync.testing_only_try_block_sync_testing_only().await;
2704
2705 primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2707 primary_v5.sync.testing_only_try_block_sync_testing_only().await;
2708
2709 assert!(
2711 primary_v4
2712 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2713 .await
2714 .is_ok()
2715 );
2716
2717 assert!(
2718 primary_v5
2719 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2720 .await
2721 .is_err()
2722 );
2723 }
2724
2725 #[tokio::test]
2726 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2727 let round = 3;
2728 let mut rng = TestRng::default();
2729 let (primary, _) = primary_without_handlers(&mut rng);
2730
2731 assert!(primary.proposed_batch.read().is_none());
2733
2734 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2736 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2737
2738 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2740 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2741
2742 let old_proposal_lock_round = *primary.propose_lock.lock().await;
2744 *primary.propose_lock.lock().await = round + 1;
2745
2746 assert!(primary.propose_batch().await.is_ok());
2748 assert!(primary.proposed_batch.read().is_none());
2749
2750 *primary.propose_lock.lock().await = old_proposal_lock_round;
2752
2753 assert!(primary.propose_batch().await.is_ok());
2755 assert!(primary.proposed_batch.read().is_some());
2756 }
2757
2758 #[tokio::test]
2759 async fn test_propose_batch_with_storage_round_behind_proposal() {
2760 let round = 5;
2761 let mut rng = TestRng::default();
2762 let (primary, accounts) = primary_without_handlers(&mut rng);
2763
2764 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2766
2767 let timestamp = now();
2769 let proposal = create_test_proposal(
2770 primary.gateway.account(),
2771 primary.ledger.current_committee().unwrap(),
2772 round + 1,
2773 previous_certificates,
2774 timestamp,
2775 1,
2776 &mut rng,
2777 );
2778
2779 *primary.proposed_batch.write() = Some(proposal);
2781
2782 assert!(primary.propose_batch().await.is_ok());
2784 assert!(primary.proposed_batch.read().is_some());
2785 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2786 }
2787
2788 #[tokio::test(flavor = "multi_thread")]
2789 async fn test_batch_signature_from_peer() {
2790 let mut rng = TestRng::default();
2791 let (primary, accounts) = primary_without_handlers(&mut rng);
2792 map_account_addresses(&primary, &accounts);
2793
2794 let round = 1;
2796 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2797 let proposal = create_test_proposal(
2798 primary.gateway.account(),
2799 primary.ledger.current_committee().unwrap(),
2800 round,
2801 Default::default(),
2802 timestamp,
2803 1,
2804 &mut rng,
2805 );
2806
2807 *primary.proposed_batch.write() = Some(proposal);
2809
2810 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2812
2813 for (socket_addr, signature) in signatures {
2815 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2816 }
2817
2818 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2820 assert_eq!(primary.current_round(), round + 1);
2822 }
2823
2824 #[tokio::test(flavor = "multi_thread")]
2825 async fn test_batch_signature_from_peer_in_round() {
2826 let round = 5;
2827 let mut rng = TestRng::default();
2828 let (primary, accounts) = primary_without_handlers(&mut rng);
2829 map_account_addresses(&primary, &accounts);
2830
2831 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2833
2834 let timestamp = now();
2836 let proposal = create_test_proposal(
2837 primary.gateway.account(),
2838 primary.ledger.current_committee().unwrap(),
2839 round,
2840 previous_certificates,
2841 timestamp,
2842 1,
2843 &mut rng,
2844 );
2845
2846 *primary.proposed_batch.write() = Some(proposal);
2848
2849 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2851
2852 for (socket_addr, signature) in signatures {
2854 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2855 }
2856
2857 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2859 assert_eq!(primary.current_round(), round + 1);
2861 }
2862
2863 #[tokio::test]
2864 async fn test_batch_signature_from_peer_no_quorum() {
2865 let mut rng = TestRng::default();
2866 let (primary, accounts) = primary_without_handlers(&mut rng);
2867 map_account_addresses(&primary, &accounts);
2868
2869 let round = 1;
2871 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2872 let proposal = create_test_proposal(
2873 primary.gateway.account(),
2874 primary.ledger.current_committee().unwrap(),
2875 round,
2876 Default::default(),
2877 timestamp,
2878 1,
2879 &mut rng,
2880 );
2881
2882 *primary.proposed_batch.write() = Some(proposal);
2884
2885 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2887
2888 let (socket_addr, signature) = signatures.first().unwrap();
2890 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2891
2892 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2894 assert_eq!(primary.current_round(), round);
2896 }
2897
2898 #[tokio::test]
2899 async fn test_batch_signature_from_peer_in_round_no_quorum() {
2900 let round = 7;
2901 let mut rng = TestRng::default();
2902 let (primary, accounts) = primary_without_handlers(&mut rng);
2903 map_account_addresses(&primary, &accounts);
2904
2905 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2907
2908 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2910 let proposal = create_test_proposal(
2911 primary.gateway.account(),
2912 primary.ledger.current_committee().unwrap(),
2913 round,
2914 previous_certificates,
2915 timestamp,
2916 1,
2917 &mut rng,
2918 );
2919
2920 *primary.proposed_batch.write() = Some(proposal);
2922
2923 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2925
2926 let (socket_addr, signature) = signatures.first().unwrap();
2928 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2929
2930 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2932 assert_eq!(primary.current_round(), round);
2934 }
2935
2936 #[tokio::test]
2937 async fn test_insert_certificate_with_aborted_transmissions() {
2938 let round = 3;
2939 let prev_round = round - 1;
2940 let mut rng = TestRng::default();
2941 let (primary, accounts) = primary_without_handlers(&mut rng);
2942 let peer_account = &accounts[1];
2943 let peer_ip = peer_account.0;
2944
2945 store_certificate_chain(&primary, &accounts, round, &mut rng);
2947
2948 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2950
2951 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2953 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2954
2955 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2957 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2958
2959 assert_eq!(primary.workers[0].num_transmissions(), 2);
2961
2962 let account = accounts[0].1.clone();
2964 let (certificate, transmissions) =
2965 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2966 let certificate_id = certificate.id();
2967
2968 let mut aborted_transmissions = HashSet::new();
2970 let mut transmissions_without_aborted = HashMap::new();
2971 for (transmission_id, transmission) in transmissions.clone() {
2972 match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
2973 true => {
2974 aborted_transmissions.insert(transmission_id);
2976 }
2977 false => {
2978 transmissions_without_aborted.insert(transmission_id, transmission);
2980 }
2981 };
2982 }
2983
2984 for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2986 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2987 }
2988
2989 assert!(
2991 primary
2992 .storage
2993 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2994 .is_err()
2995 );
2996 assert!(
2997 primary
2998 .storage
2999 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3000 .is_err()
3001 );
3002
3003 primary
3005 .storage
3006 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3007 .unwrap();
3008
3009 assert!(primary.storage.contains_certificate(certificate_id));
3011 for aborted_transmission_id in aborted_transmissions {
3013 assert!(primary.storage.contains_transmission(aborted_transmission_id));
3014 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3015 }
3016 }
3017}