1use crate::{
17 Gateway,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 PRIMARY_PING_IN_MS,
22 Sync,
23 Transport,
24 WORKER_PING_IN_MS,
25 Worker,
26 events::{BatchPropose, BatchSignature, Event},
27 helpers::{
28 BFTSender,
29 PrimaryReceiver,
30 PrimarySender,
31 Proposal,
32 ProposalCache,
33 SignedProposals,
34 Storage,
35 assign_to_worker,
36 assign_to_workers,
37 fmt_id,
38 init_sync_channels,
39 init_worker_channels,
40 now,
41 },
42 spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
48use snarkvm::{
49 console::{
50 prelude::*,
51 types::{Address, Field},
52 },
53 ledger::{
54 block::Transaction,
55 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
56 puzzle::{Solution, SolutionID},
57 },
58 prelude::{ConsensusVersion, committee::Committee},
59};
60
61use aleo_std::StorageMode;
62use colored::Colorize;
63use futures::stream::{FuturesUnordered, StreamExt};
64use indexmap::{IndexMap, IndexSet};
65#[cfg(feature = "locktick")]
66use locktick::{
67 parking_lot::{Mutex, RwLock},
68 tokio::Mutex as TMutex,
69};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72#[cfg(not(feature = "serial"))]
73use rayon::prelude::*;
74use std::{
75 collections::{HashMap, HashSet},
76 future::Future,
77 net::SocketAddr,
78 sync::Arc,
79 time::Duration,
80};
81#[cfg(not(feature = "locktick"))]
82use tokio::sync::Mutex as TMutex;
83use tokio::{sync::OnceCell, task::JoinHandle};
84
85pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
87
88#[derive(Clone)]
91pub struct Primary<N: Network> {
92 sync: Sync<N>,
94 gateway: Gateway<N>,
96 storage: Storage<N>,
98 ledger: Arc<dyn LedgerService<N>>,
100 workers: Arc<[Worker<N>]>,
102 bft_sender: Arc<OnceCell<BFTSender<N>>>,
104 proposed_batch: Arc<ProposedBatch<N>>,
106 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
108 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
110 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
112 propose_lock: Arc<TMutex<u64>>,
114 storage_mode: StorageMode,
116}
117
118impl<N: Network> Primary<N> {
119 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
121
122 #[allow(clippy::too_many_arguments)]
124 pub fn new(
125 account: Account<N>,
126 storage: Storage<N>,
127 ledger: Arc<dyn LedgerService<N>>,
128 block_sync: Arc<BlockSync<N>>,
129 ip: Option<SocketAddr>,
130 trusted_validators: &[SocketAddr],
131 storage_mode: StorageMode,
132 dev: Option<u16>,
133 ) -> Result<Self> {
134 let gateway = Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, dev)?;
136 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
138
139 Ok(Self {
141 sync,
142 gateway,
143 storage,
144 ledger,
145 workers: Arc::from(vec![]),
146 bft_sender: Default::default(),
147 proposed_batch: Default::default(),
148 latest_proposed_batch_timestamp: Default::default(),
149 signed_proposals: Default::default(),
150 handles: Default::default(),
151 propose_lock: Default::default(),
152 storage_mode,
153 })
154 }
155
156 async fn load_proposal_cache(&self) -> Result<()> {
158 match ProposalCache::<N>::exists(&self.storage_mode) {
160 true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) {
162 Ok(proposal_cache) => {
163 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
165 proposal_cache.into();
166
167 *self.proposed_batch.write() = proposed_batch;
169 *self.signed_proposals.write() = signed_proposals;
171 *self.propose_lock.lock().await = latest_certificate_round;
173
174 for certificate in pending_certificates {
176 let batch_id = certificate.batch_id();
177 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
181 {
182 warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
183 }
184 }
185 Ok(())
186 }
187 Err(err) => {
188 bail!("Failed to read the signed proposals from the file system - {err}.");
189 }
190 },
191 false => Ok(()),
193 }
194 }
195
196 pub async fn run(
198 &mut self,
199 ping: Option<Arc<Ping<N>>>,
200 bft_sender: Option<BFTSender<N>>,
201 primary_sender: PrimarySender<N>,
202 primary_receiver: PrimaryReceiver<N>,
203 ) -> Result<()> {
204 info!("Starting the primary instance of the memory pool...");
205
206 if let Some(bft_sender) = &bft_sender {
208 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
210 }
211
212 let mut worker_senders = IndexMap::new();
214 let mut workers = Vec::new();
216 for id in 0..MAX_WORKERS {
218 let (tx_worker, rx_worker) = init_worker_channels();
220 let worker = Worker::new(
222 id,
223 Arc::new(self.gateway.clone()),
224 self.storage.clone(),
225 self.ledger.clone(),
226 self.proposed_batch.clone(),
227 )?;
228 worker.run(rx_worker);
230 workers.push(worker);
232 worker_senders.insert(id, tx_worker);
234 }
235 self.workers = Arc::from(workers);
237
238 let (sync_sender, sync_receiver) = init_sync_channels();
240 self.sync.initialize(bft_sender).await?;
242 self.load_proposal_cache().await?;
244 self.sync.run(ping, sync_receiver).await?;
246 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
248 self.start_handlers(primary_receiver);
251
252 Ok(())
253 }
254
255 pub fn current_round(&self) -> u64 {
257 self.storage.current_round()
258 }
259
260 pub fn is_synced(&self) -> bool {
262 self.sync.is_synced()
263 }
264
265 pub const fn gateway(&self) -> &Gateway<N> {
267 &self.gateway
268 }
269
270 pub const fn storage(&self) -> &Storage<N> {
272 &self.storage
273 }
274
275 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
277 &self.ledger
278 }
279
280 pub fn num_workers(&self) -> u8 {
282 u8::try_from(self.workers.len()).expect("Too many workers")
283 }
284
285 pub const fn workers(&self) -> &Arc<[Worker<N>]> {
287 &self.workers
288 }
289
290 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
292 &self.proposed_batch
293 }
294}
295
296impl<N: Network> Primary<N> {
297 pub fn num_unconfirmed_transmissions(&self) -> usize {
299 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
300 }
301
302 pub fn num_unconfirmed_ratifications(&self) -> usize {
304 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
305 }
306
307 pub fn num_unconfirmed_solutions(&self) -> usize {
309 self.workers.iter().map(|worker| worker.num_solutions()).sum()
310 }
311
312 pub fn num_unconfirmed_transactions(&self) -> usize {
314 self.workers.iter().map(|worker| worker.num_transactions()).sum()
315 }
316}
317
318impl<N: Network> Primary<N> {
319 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
321 self.workers.iter().flat_map(|worker| worker.transmission_ids())
322 }
323
324 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
326 self.workers.iter().flat_map(|worker| worker.transmissions())
327 }
328
329 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
331 self.workers.iter().flat_map(|worker| worker.solutions())
332 }
333
334 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
336 self.workers.iter().flat_map(|worker| worker.transactions())
337 }
338}
339
340impl<N: Network> Primary<N> {
341 pub fn clear_worker_solutions(&self) {
343 self.workers.iter().for_each(Worker::clear_solutions);
344 }
345}
346
347impl<N: Network> Primary<N> {
348 pub async fn propose_batch(&self) -> Result<()> {
356 let mut lock_guard = self.propose_lock.lock().await;
358
359 if let Err(e) = self.check_proposed_batch_for_expiration().await {
361 warn!("Failed to check the proposed batch for expiration - {e}");
362 return Ok(());
363 }
364
365 let round = self.current_round();
367 let previous_round = round.saturating_sub(1);
369
370 ensure!(round > 0, "Round 0 cannot have transaction batches");
374
375 if round < *lock_guard {
377 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
378 return Ok(());
379 }
380
381 if let Some(proposal) = self.proposed_batch.read().as_ref() {
384 if round < proposal.round()
386 || proposal
387 .batch_header()
388 .previous_certificate_ids()
389 .iter()
390 .any(|id| !self.storage.contains_certificate(*id))
391 {
392 warn!(
393 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
394 proposal.round(),
395 );
396 return Ok(());
397 }
398 let event = Event::BatchPropose(proposal.batch_header().clone().into());
401 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
403 match self.gateway.resolver().get_peer_ip_for_address(address) {
405 Some(peer_ip) => {
407 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
408 tokio::spawn(async move {
409 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
410 if gateway.send(peer_ip, event_).await.is_none() {
412 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
413 }
414 });
415 }
416 None => continue,
417 }
418 }
419 debug!("Proposed batch for round {} is still valid", proposal.round());
420 return Ok(());
421 }
422
423 #[cfg(feature = "metrics")]
424 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
425
426 if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
428 debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
429 return Ok(());
430 }
431
432 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
434 if let Some(bft_sender) = self.bft_sender.get() {
436 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
437 Ok(true) => (), Ok(false) => return Ok(()),
441 Err(e) => {
443 warn!("Failed to update the BFT to the next round - {e}");
444 return Err(e);
445 }
446 }
447 }
448 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
449 return Ok(());
450 }
451
452 if round == *lock_guard {
458 warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
459 return Ok(());
460 }
461
462 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
464 {
466 let mut connected_validators = self.gateway.connected_addresses();
468 connected_validators.insert(self.gateway.account().address());
470 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
472 debug!(
473 "Primary is safely skipping a batch proposal for round {round} {}",
474 "(please connect to more validators)".dimmed()
475 );
476 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
477 return Ok(());
478 }
479 }
480
481 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
483
484 let mut is_ready = previous_round == 0;
487 if previous_round > 0 {
489 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
491 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
492 };
493 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
495 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
497 is_ready = true;
498 }
499 }
500 if !is_ready {
502 debug!(
503 "Primary is safely skipping a batch proposal for round {round} {}",
504 format!("(previous round {previous_round} has not reached quorum)").dimmed()
505 );
506 return Ok(());
507 }
508
509 let mut transmissions: IndexMap<_, _> = Default::default();
511 let mut proposal_cost = 0u64;
513 debug_assert_eq!(MAX_WORKERS, 1);
517
518 'outer: for worker in self.workers().iter() {
519 let mut num_worker_transmissions = 0usize;
520
521 while let Some((id, transmission)) = worker.remove_front() {
522 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
524 worker.insert_front(id, transmission);
526 break 'outer;
527 }
528
529 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
531 worker.insert_front(id, transmission);
533 continue 'outer;
534 }
535
536 if self.ledger.contains_transmission(&id).unwrap_or(true) {
538 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
539 continue;
540 }
541
542 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
546 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
547 continue;
548 }
549
550 match (id, transmission.clone()) {
552 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
553 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
555 {
556 trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
557 continue;
558 }
559 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
561 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
562 continue;
563 }
564 }
565 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
566 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
568 {
569 trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
570 continue;
571 }
572
573 let transaction = spawn_blocking!({
575 match transaction {
576 Data::Object(transaction) => Ok(transaction),
577 Data::Buffer(bytes) => {
578 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
579 }
580 }
581 })?;
582
583 let current_block_height = self.ledger.latest_block_height();
587 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
588 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
589 let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
590 if current_block_height > consensus_version_v7_height
591 && current_block_height <= consensus_version_v8_height
592 && transaction.is_deploy()
593 {
594 trace!(
595 "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})",
596 fmt_id(transaction_id)
597 );
598 continue;
599 }
600
601 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await {
604 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
605 continue;
606 }
607
608 let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(
611 transaction_id,
612 transaction,
613 consensus_version,
614 ) else {
615 debug!(
616 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
617 fmt_id(transaction_id)
618 );
619 continue;
620 };
621
622 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
625 debug!(
626 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
627 fmt_id(transaction_id)
628 );
629 continue;
630 };
631
632 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
634 if next_proposal_cost > batch_spend_limit {
635 debug!(
636 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
637 fmt_id(transaction_id),
638 batch_spend_limit
639 );
640
641 worker.insert_front(id, transmission);
643 break 'outer;
644 }
645
646 proposal_cost = next_proposal_cost;
648 }
649
650 (TransmissionID::Ratification, Transmission::Ratification) => continue,
653 _ => continue,
655 }
656
657 transmissions.insert(id, transmission);
659 num_worker_transmissions = num_worker_transmissions.saturating_add(1);
660 }
661 }
662
663 let current_timestamp = now();
665
666 *lock_guard = round;
667
668 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
670
671 let private_key = *self.gateway.account().private_key();
673 let committee_id = committee_lookback.id();
675 let transmission_ids = transmissions.keys().copied().collect();
677 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
679 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
681 &private_key,
682 round,
683 current_timestamp,
684 committee_id,
685 transmission_ids,
686 previous_certificate_ids,
687 &mut rand::thread_rng()
688 ))
689 .and_then(|batch_header| {
690 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
691 .map(|proposal| (batch_header, proposal))
692 })
693 .inspect_err(|_| {
694 if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
696 error!("Failed to reinsert transmissions: {e:?}");
697 }
698 })?;
699 self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
701 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
703 *self.proposed_batch.write() = Some(proposal);
705 Ok(())
706 }
707
708 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
718 let BatchPropose { round: batch_round, batch_header } = batch_propose;
719
720 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
722 if batch_round != batch_header.round() {
724 self.gateway.disconnect(peer_ip);
726 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
727 }
728
729 let batch_author = batch_header.author();
731
732 match self.gateway.resolver().get_address(peer_ip) {
734 Some(address) => {
736 if address != batch_author {
737 self.gateway.disconnect(peer_ip);
739 bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
740 }
741 }
742 None => bail!("Batch proposal from a disconnected validator"),
743 }
744 if !self.gateway.is_authorized_validator_address(batch_author) {
746 self.gateway.disconnect(peer_ip);
748 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
749 }
750 if self.gateway.account().address() == batch_author {
752 bail!("Invalid peer - proposed batch from myself ({batch_author})");
753 }
754
755 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
757 if expected_committee_id != batch_header.committee_id() {
758 self.gateway.disconnect(peer_ip);
760 bail!(
761 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
762 batch_header.committee_id()
763 );
764 }
765
766 if let Some((signed_round, signed_batch_id, signature)) =
768 self.signed_proposals.read().get(&batch_author).copied()
769 {
770 if signed_round > batch_header.round() {
773 bail!(
774 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
775 batch_header.round()
776 );
777 }
778
779 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
781 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
782 }
783 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
786 let gateway = self.gateway.clone();
787 tokio::spawn(async move {
788 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
789 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
790 if gateway.send(peer_ip, event).await.is_none() {
792 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
793 }
794 });
795 return Ok(());
797 }
798 }
799
800 if self.storage.contains_batch(batch_header.batch_id()) {
803 debug!(
804 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
805 format!("batch for round {batch_round} already exists in storage").dimmed()
806 );
807 return Ok(());
808 }
809
810 let previous_round = batch_round.saturating_sub(1);
812 if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
814 self.gateway.disconnect(peer_ip);
816 bail!("Malicious peer - {e} from '{peer_ip}'");
817 }
818
819 if batch_header.contains(TransmissionID::Ratification) {
821 self.gateway.disconnect(peer_ip);
823 bail!(
824 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
825 );
826 }
827
828 let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
830
831 if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
833 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
835 }) {
836 debug!("Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission - {err}",);
837 return Ok(());
838 }
839
840 if let Err(e) = self.ensure_is_signing_round(batch_round) {
844 debug!("{e} from '{peer_ip}'");
846 return Ok(());
847 }
848
849 let (storage, header) = (self.storage.clone(), batch_header.clone());
851 let missing_transmissions =
852 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
853 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
855
856 let block_height = self.ledger.latest_block_height() + 1;
858 if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
859 let mut proposal_cost = 0u64;
860 for transmission_id in batch_header.transmission_ids() {
861 let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
862 let Some(worker) = self.workers.get(worker_id as usize) else {
863 debug!("Unable to find worker {worker_id}");
864 return Ok(());
865 };
866
867 let Some(transmission) = worker.get_transmission(*transmission_id) else {
868 debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
869 return Ok(());
870 };
871
872 if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
874 (transmission_id, transmission)
875 {
876 let transaction = spawn_blocking!({
878 match transaction {
879 Data::Object(transaction) => Ok(transaction),
880 Data::Buffer(bytes) => {
881 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
882 }
883 }
884 })?;
885
886 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
890 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
891 let consensus_version = N::CONSENSUS_VERSION(block_height)?;
892 if block_height > consensus_version_v7_height
893 && block_height <= consensus_version_v8_height
894 && transaction.is_deploy()
895 {
896 bail!(
897 "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})",
898 )
899 }
900
901 let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(
904 *transaction_id,
905 transaction,
906 consensus_version,
907 ) else {
908 bail!(
909 "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
910 fmt_id(transaction_id)
911 )
912 };
913
914 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
917 bail!(
918 "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
919 fmt_id(transaction_id)
920 )
921 };
922
923 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
925 if next_proposal_cost > batch_spend_limit {
926 bail!(
927 "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
928 fmt_id(transaction_id),
929 batch_spend_limit
930 );
931 }
932
933 proposal_cost = next_proposal_cost;
935 }
936 }
937 }
938
939 let batch_id = batch_header.batch_id();
943 let account = self.gateway.account().clone();
945 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
946
947 match self.signed_proposals.write().0.entry(batch_author) {
953 std::collections::hash_map::Entry::Occupied(mut entry) => {
954 if entry.get().0 == batch_round {
959 return Ok(());
960 }
961 entry.insert((batch_round, batch_id, signature));
963 }
964 std::collections::hash_map::Entry::Vacant(entry) => {
966 entry.insert((batch_round, batch_id, signature));
968 }
969 };
970
971 let self_ = self.clone();
973 tokio::spawn(async move {
974 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
975 if self_.gateway.send(peer_ip, event).await.is_some() {
977 debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
978 }
979 });
980
981 Ok(())
982 }
983
984 async fn process_batch_signature_from_peer(
993 &self,
994 peer_ip: SocketAddr,
995 batch_signature: BatchSignature<N>,
996 ) -> Result<()> {
997 self.check_proposed_batch_for_expiration().await?;
999
1000 let BatchSignature { batch_id, signature } = batch_signature;
1002
1003 let signer = signature.to_address();
1005
1006 if self.gateway.resolver().get_address(peer_ip) != Some(signer) {
1008 self.gateway.disconnect(peer_ip);
1010 bail!("Malicious peer - batch signature is from a different validator ({signer})");
1011 }
1012 if self.gateway.account().address() == signer {
1014 bail!("Invalid peer - received a batch signature from myself ({signer})");
1015 }
1016
1017 let self_ = self.clone();
1018 let Some(proposal) = spawn_blocking!({
1019 let mut proposed_batch = self_.proposed_batch.write();
1021 match proposed_batch.as_mut() {
1023 Some(proposal) => {
1024 if proposal.batch_id() != batch_id {
1026 match self_.storage.contains_batch(batch_id) {
1027 true => {
1029 debug!(
1030 "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1031 proposal.round()
1032 );
1033 return Ok(None);
1034 }
1035 false => bail!(
1037 "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1038 proposal.batch_id(),
1039 proposal.round()
1040 ),
1041 }
1042 }
1043 let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1045 let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
1047 bail!("Signature is from a disconnected validator");
1048 };
1049 proposal.add_signature(signer, signature, &committee_lookback)?;
1051 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1052 if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1054 return Ok(None);
1056 }
1057 }
1058 None => return Ok(None),
1060 };
1061 match proposed_batch.take() {
1063 Some(proposal) => Ok(Some(proposal)),
1064 None => Ok(None),
1065 }
1066 })?
1067 else {
1068 return Ok(());
1069 };
1070
1071 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1074
1075 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1077 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1080 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1082 return Err(e);
1083 }
1084
1085 #[cfg(feature = "metrics")]
1086 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1087 Ok(())
1088 }
1089
1090 async fn process_batch_certificate_from_peer(
1097 &self,
1098 peer_ip: SocketAddr,
1099 certificate: BatchCertificate<N>,
1100 ) -> Result<()> {
1101 if !self.gateway.is_authorized_validator_ip(peer_ip) {
1103 self.gateway.disconnect(peer_ip);
1105 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1106 }
1107 if self.storage.contains_certificate(certificate.id()) {
1109 return Ok(());
1110 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1112 self.storage.insert_unprocessed_certificate(certificate.clone())?;
1113 }
1114
1115 let author = certificate.author();
1117 let certificate_round = certificate.round();
1119 let committee_id = certificate.committee_id();
1121
1122 if self.gateway.account().address() == author {
1124 bail!("Received a batch certificate for myself ({author})");
1125 }
1126
1127 self.storage.check_incoming_certificate(&certificate)?;
1129
1130 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1142
1143 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1148
1149 let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1151 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1153
1154 let expected_committee_id = committee_lookback.id();
1156 if expected_committee_id != committee_id {
1157 self.gateway.disconnect(peer_ip);
1159 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1160 }
1161
1162 let should_advance = match &*self.proposed_batch.read() {
1166 Some(proposal) => proposal.round() < certificate_round,
1168 None => true,
1170 };
1171
1172 let current_round = self.current_round();
1174
1175 if is_quorum && should_advance && certificate_round >= current_round {
1177 self.try_increment_to_the_next_round(current_round + 1).await?;
1179 }
1180 Ok(())
1181 }
1182}
1183
1184impl<N: Network> Primary<N> {
1185 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1194 let PrimaryReceiver {
1195 mut rx_batch_propose,
1196 mut rx_batch_signature,
1197 mut rx_batch_certified,
1198 mut rx_primary_ping,
1199 mut rx_unconfirmed_solution,
1200 mut rx_unconfirmed_transaction,
1201 } = primary_receiver;
1202
1203 let self_ = self.clone();
1205 self.spawn(async move {
1206 loop {
1207 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1209
1210 let self__ = self_.clone();
1212 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1213 Ok(block_locators) => block_locators,
1214 Err(e) => {
1215 warn!("Failed to retrieve block locators - {e}");
1216 continue;
1217 }
1218 };
1219
1220 let primary_certificate = {
1222 let primary_address = self_.gateway.account().address();
1224
1225 let mut certificate = None;
1227 let mut current_round = self_.current_round();
1228 while certificate.is_none() {
1229 if current_round == 0 {
1231 break;
1232 }
1233 if let Some(primary_certificate) =
1235 self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1236 {
1237 certificate = Some(primary_certificate);
1238 } else {
1240 current_round = current_round.saturating_sub(1);
1241 }
1242 }
1243
1244 match certificate {
1246 Some(certificate) => certificate,
1247 None => continue,
1249 }
1250 };
1251
1252 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1254 self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1256 }
1257 });
1258
1259 let self_ = self.clone();
1261 self.spawn(async move {
1262 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1263 if self_.sync.is_synced() {
1265 trace!("Processing new primary ping from '{peer_ip}'");
1266 } else {
1267 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1268 continue;
1269 }
1270
1271 {
1273 let self_ = self_.clone();
1274 tokio::spawn(async move {
1275 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1277 else {
1278 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1279 return;
1280 };
1281 let id = fmt_id(primary_certificate.id());
1283 let round = primary_certificate.round();
1284 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1285 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1286 }
1287 });
1288 }
1289 }
1290 });
1291
1292 let self_ = self.clone();
1294 self.spawn(async move {
1295 loop {
1296 tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1297 if !self_.sync.is_synced() {
1299 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1300 continue;
1301 }
1302 for worker in self_.workers.iter() {
1304 worker.broadcast_ping();
1305 }
1306 }
1307 });
1308
1309 let self_ = self.clone();
1311 self.spawn(async move {
1312 loop {
1313 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1315 let current_round = self_.current_round();
1316 if !self_.sync.is_synced() {
1318 debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1319 continue;
1320 }
1321 if self_.propose_lock.try_lock().is_err() {
1324 trace!(
1325 "Skipping batch proposal for round {current_round} {}",
1326 "(node is already proposing)".dimmed()
1327 );
1328 continue;
1329 };
1330 if let Err(e) = self_.propose_batch().await {
1334 warn!("Cannot propose a batch - {e}");
1335 }
1336 }
1337 });
1338
1339 let self_ = self.clone();
1341 self.spawn(async move {
1342 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1343 if !self_.sync.is_synced() {
1345 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1346 continue;
1347 }
1348 let self_ = self_.clone();
1350 tokio::spawn(async move {
1351 let round = batch_propose.round;
1353 if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1354 warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1355 }
1356 });
1357 }
1358 });
1359
1360 let self_ = self.clone();
1362 self.spawn(async move {
1363 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1364 if !self_.sync.is_synced() {
1366 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1367 continue;
1368 }
1369 let id = fmt_id(batch_signature.batch_id);
1375 if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1376 warn!("Cannot store a signature for batch '{id}' from '{peer_ip}' - {e}");
1377 }
1378 }
1379 });
1380
1381 let self_ = self.clone();
1383 self.spawn(async move {
1384 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1385 if !self_.sync.is_synced() {
1387 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1388 continue;
1389 }
1390 let self_ = self_.clone();
1392 tokio::spawn(async move {
1393 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1395 warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1396 return;
1397 };
1398 let id = fmt_id(batch_certificate.id());
1400 let round = batch_certificate.round();
1401 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1402 warn!("Cannot store a certificate '{id}' for round {round} from '{peer_ip}' - {e}");
1403 }
1404 });
1405 }
1406 });
1407
1408 let self_ = self.clone();
1413 self.spawn(async move {
1414 loop {
1415 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1417 if !self_.sync.is_synced() {
1419 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1420 continue;
1421 }
1422 let current_round = self_.current_round();
1424 let next_round = current_round.saturating_add(1);
1425 let is_quorum_threshold_reached = {
1427 let authors = self_.storage.get_certificate_authors_for_round(current_round);
1429 if authors.is_empty() {
1431 continue;
1432 }
1433 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1435 warn!("Failed to retrieve the committee lookback for round {current_round}");
1436 continue;
1437 };
1438 committee_lookback.is_quorum_threshold_reached(&authors)
1440 };
1441 if is_quorum_threshold_reached {
1443 debug!("Quorum threshold reached for round {}", current_round);
1444 if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1445 warn!("Failed to increment to the next round - {e}");
1446 }
1447 }
1448 }
1449 });
1450
1451 let self_ = self.clone();
1453 self.spawn(async move {
1454 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1455 let Ok(checksum) = solution.to_checksum::<N>() else {
1457 error!("Failed to compute the checksum for the unconfirmed solution");
1458 continue;
1459 };
1460 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1462 error!("Unable to determine the worker ID for the unconfirmed solution");
1463 continue;
1464 };
1465 let self_ = self_.clone();
1466 tokio::spawn(async move {
1467 let worker = &self_.workers[worker_id as usize];
1469 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1471 callback.send(result).ok();
1473 });
1474 }
1475 });
1476
1477 let self_ = self.clone();
1479 self.spawn(async move {
1480 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1481 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1482 let Ok(checksum) = transaction.to_checksum::<N>() else {
1484 error!("Failed to compute the checksum for the unconfirmed transaction");
1485 continue;
1486 };
1487 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1489 error!("Unable to determine the worker ID for the unconfirmed transaction");
1490 continue;
1491 };
1492 let self_ = self_.clone();
1493 tokio::spawn(async move {
1494 let worker = &self_.workers[worker_id as usize];
1496 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1498 callback.send(result).ok();
1500 });
1501 }
1502 });
1503 }
1504
1505 async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1507 let is_expired = match self.proposed_batch.read().as_ref() {
1509 Some(proposal) => proposal.round() < self.current_round(),
1510 None => false,
1511 };
1512 if is_expired {
1514 let proposal = self.proposed_batch.write().take();
1516 if let Some(proposal) = proposal {
1517 debug!("Cleared expired proposal for round {}", proposal.round());
1518 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1519 }
1520 }
1521 Ok(())
1522 }
1523
1524 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1526 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1528 let mut fast_forward_round = self.current_round();
1529 while fast_forward_round < next_round.saturating_sub(1) {
1531 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1533 *self.proposed_batch.write() = None;
1535 }
1536 }
1537
1538 let current_round = self.current_round();
1540 if current_round < next_round {
1542 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1544 match bft_sender.send_primary_round_to_bft(current_round).await {
1545 Ok(is_ready) => is_ready,
1546 Err(e) => {
1547 warn!("Failed to update the BFT to the next round - {e}");
1548 return Err(e);
1549 }
1550 }
1551 }
1552 else {
1554 self.storage.increment_to_next_round(current_round)?;
1556 true
1558 };
1559
1560 match is_ready {
1562 true => debug!("Primary is ready to propose the next round"),
1563 false => debug!("Primary is not ready to propose the next round"),
1564 }
1565
1566 if is_ready {
1568 self.propose_batch().await?;
1569 }
1570 }
1571 Ok(())
1572 }
1573
1574 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1578 let current_round = self.current_round();
1580 if current_round + self.storage.max_gc_rounds() <= batch_round {
1582 bail!("Round {batch_round} is too far in the future")
1583 }
1584 if current_round > batch_round + 1 {
1588 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1589 }
1590 if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1592 if signing_round > batch_round {
1593 bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1594 }
1595 }
1596 Ok(())
1597 }
1598
1599 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1602 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1604 Some(certificate) => certificate.timestamp(),
1606 None => match self.gateway.account().address() == author {
1607 true => *self.latest_proposed_batch_timestamp.read(),
1609 false => return Ok(()),
1611 },
1612 };
1613
1614 let elapsed = timestamp
1616 .checked_sub(previous_timestamp)
1617 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1618 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1620 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1621 false => Ok(()),
1622 }
1623 }
1624
1625 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1627 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1629 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1632 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1634 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1635 debug!("Stored a batch certificate for round {}", certificate.round());
1636 if let Some(bft_sender) = self.bft_sender.get() {
1638 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1640 warn!("Failed to update the BFT DAG from primary - {e}");
1641 return Err(e);
1642 };
1643 }
1644 self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1646 let num_transmissions = certificate.transmission_ids().len();
1648 let round = certificate.round();
1649 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1650 self.try_increment_to_the_next_round(round + 1).await
1652 }
1653
1654 fn insert_missing_transmissions_into_workers(
1656 &self,
1657 peer_ip: SocketAddr,
1658 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1659 ) -> Result<()> {
1660 assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1662 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1663 })
1664 }
1665
1666 fn reinsert_transmissions_into_workers(
1668 &self,
1669 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1670 ) -> Result<()> {
1671 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1673 worker.reinsert(transmission_id, transmission);
1674 })
1675 }
1676
1677 #[async_recursion::async_recursion]
1687 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1688 &self,
1689 peer_ip: SocketAddr,
1690 certificate: BatchCertificate<N>,
1691 ) -> Result<()> {
1692 let batch_header = certificate.batch_header();
1694 let batch_round = batch_header.round();
1696
1697 if batch_round <= self.storage.gc_round() {
1699 return Ok(());
1700 }
1701 if self.storage.contains_certificate(certificate.id()) {
1703 return Ok(());
1704 }
1705
1706 if !IS_SYNCING && !self.is_synced() {
1708 bail!(
1709 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1710 fmt_id(certificate.id())
1711 );
1712 }
1713
1714 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1716
1717 if !self.storage.contains_certificate(certificate.id()) {
1719 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1721 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1722 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1723 if let Some(bft_sender) = self.bft_sender.get() {
1725 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1727 warn!("Failed to update the BFT DAG from sync: {e}");
1728 return Err(e);
1729 };
1730 }
1731 }
1732 Ok(())
1733 }
1734
1735 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1737 &self,
1738 peer_ip: SocketAddr,
1739 batch_header: &BatchHeader<N>,
1740 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1741 let batch_round = batch_header.round();
1743
1744 if batch_round <= self.storage.gc_round() {
1746 bail!("Round {batch_round} is too far in the past")
1747 }
1748
1749 if !IS_SYNCING && !self.is_synced() {
1751 bail!(
1752 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1753 fmt_id(batch_header.batch_id())
1754 );
1755 }
1756
1757 let is_quorum_threshold_reached = {
1759 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1760 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1761 committee_lookback.is_quorum_threshold_reached(&authors)
1762 };
1763
1764 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1769 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1771 if is_behind_schedule || is_peer_far_in_future {
1773 self.try_increment_to_the_next_round(batch_round).await?;
1775 }
1776
1777 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1779
1780 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1782
1783 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1785 missing_transmissions_handle,
1786 missing_previous_certificates_handle,
1787 ).map_err(|e| {
1788 anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1789 })?;
1790
1791 for batch_certificate in missing_previous_certificates {
1793 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1795 }
1796 Ok(missing_transmissions)
1797 }
1798
1799 async fn fetch_missing_transmissions(
1802 &self,
1803 peer_ip: SocketAddr,
1804 batch_header: &BatchHeader<N>,
1805 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1806 if batch_header.round() <= self.storage.gc_round() {
1808 return Ok(Default::default());
1809 }
1810
1811 if self.storage.contains_batch(batch_header.batch_id()) {
1813 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1814 return Ok(Default::default());
1815 }
1816
1817 let workers = self.workers.clone();
1819
1820 let mut fetch_transmissions = FuturesUnordered::new();
1822
1823 let num_workers = self.num_workers();
1825 for transmission_id in batch_header.transmission_ids() {
1827 if !self.storage.contains_transmission(*transmission_id) {
1829 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1831 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1832 };
1833 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1835 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1837 }
1838 }
1839
1840 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1842 while let Some(result) = fetch_transmissions.next().await {
1844 let (transmission_id, transmission) = result?;
1846 transmissions.insert(transmission_id, transmission);
1848 }
1849 Ok(transmissions)
1851 }
1852
1853 async fn fetch_missing_previous_certificates(
1855 &self,
1856 peer_ip: SocketAddr,
1857 batch_header: &BatchHeader<N>,
1858 ) -> Result<HashSet<BatchCertificate<N>>> {
1859 let round = batch_header.round();
1861 if round == 1 || round <= self.storage.gc_round() + 1 {
1863 return Ok(Default::default());
1864 }
1865
1866 let missing_previous_certificates =
1868 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1869 if !missing_previous_certificates.is_empty() {
1870 debug!(
1871 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1872 missing_previous_certificates.len(),
1873 );
1874 }
1875 Ok(missing_previous_certificates)
1877 }
1878
1879 async fn fetch_missing_certificates(
1881 &self,
1882 peer_ip: SocketAddr,
1883 round: u64,
1884 certificate_ids: &IndexSet<Field<N>>,
1885 ) -> Result<HashSet<BatchCertificate<N>>> {
1886 let mut fetch_certificates = FuturesUnordered::new();
1888 let mut missing_certificates = HashSet::default();
1890 for certificate_id in certificate_ids {
1892 if self.ledger.contains_certificate(certificate_id)? {
1894 continue;
1895 }
1896 if self.storage.contains_certificate(*certificate_id) {
1898 continue;
1899 }
1900 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1902 missing_certificates.insert(certificate);
1903 } else {
1904 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1906 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1909 }
1910 }
1911
1912 match fetch_certificates.is_empty() {
1914 true => return Ok(missing_certificates),
1915 false => trace!(
1916 "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1917 fetch_certificates.len(),
1918 ),
1919 }
1920
1921 while let Some(result) = fetch_certificates.next().await {
1923 missing_certificates.insert(result?);
1925 }
1926 Ok(missing_certificates)
1928 }
1929}
1930
1931impl<N: Network> Primary<N> {
1932 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1934 self.handles.lock().push(tokio::spawn(future));
1935 }
1936
1937 pub async fn shut_down(&self) {
1939 info!("Shutting down the primary...");
1940 self.workers.iter().for_each(|worker| worker.shut_down());
1942 self.handles.lock().iter().for_each(|handle| handle.abort());
1944 let proposal_cache = {
1946 let proposal = self.proposed_batch.write().take();
1947 let signed_proposals = self.signed_proposals.read().clone();
1948 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1949 let pending_certificates = self.storage.get_pending_certificates();
1950 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1951 };
1952 if let Err(err) = proposal_cache.store(&self.storage_mode) {
1953 error!("Failed to store the current proposal cache: {err}");
1954 }
1955 self.gateway.shut_down().await;
1957 }
1958}
1959
1960#[cfg(test)]
1961mod tests {
1962 use super::*;
1963 use snarkos_node_bft_ledger_service::MockLedgerService;
1964 use snarkos_node_bft_storage_service::BFTMemoryService;
1965 use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
1966 use snarkvm::{
1967 ledger::{
1968 committee::{Committee, MIN_VALIDATOR_STAKE},
1969 snarkvm_ledger_test_helpers::sample_execution_transaction_with_fee,
1970 },
1971 prelude::{Address, Signature},
1972 };
1973
1974 use bytes::Bytes;
1975 use indexmap::IndexSet;
1976 use rand::RngCore;
1977
1978 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1979
1980 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
1981 const COMMITTEE_SIZE: usize = 4;
1983 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1984 let mut members = IndexMap::new();
1985
1986 for i in 0..COMMITTEE_SIZE {
1987 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1988 let account = Account::new(rng).unwrap();
1989
1990 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1991 accounts.push((socket_addr, account));
1992 }
1993
1994 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1995 }
1996
1997 fn primary_with_committee(
1999 account_index: usize,
2000 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2001 committee: Committee<CurrentNetwork>,
2002 height: u32,
2003 ) -> Primary<CurrentNetwork> {
2004 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2005 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2006
2007 let account = accounts[account_index].1.clone();
2009 let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2010 let mut primary =
2011 Primary::new(account, storage, ledger, block_sync, None, &[], StorageMode::Test(None), None).unwrap();
2012
2013 primary.workers = Arc::from([Worker::new(
2015 0, Arc::new(primary.gateway.clone()),
2017 primary.storage.clone(),
2018 primary.ledger.clone(),
2019 primary.proposed_batch.clone(),
2020 )
2021 .unwrap()]);
2022 for a in accounts.iter().skip(account_index) {
2023 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2024 }
2025
2026 primary
2027 }
2028
2029 fn primary_without_handlers(
2030 rng: &mut TestRng,
2031 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2032 let (accounts, committee) = sample_committee(rng);
2033 let primary = primary_with_committee(
2034 0, &accounts,
2036 committee,
2037 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2038 );
2039
2040 (primary, accounts)
2041 }
2042
2043 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2045 let solution_id = rng.r#gen::<u64>().into();
2047 let size = rng.gen_range(1024..10 * 1024);
2049 let mut vec = vec![0u8; size];
2051 rng.fill_bytes(&mut vec);
2052 let solution = Data::Buffer(Bytes::from(vec));
2053 (solution_id, solution)
2055 }
2056
2057 fn sample_unconfirmed_transaction(
2059 rng: &mut TestRng,
2060 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2061 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2062 let id = transaction.id();
2063
2064 (id, Data::Object(transaction))
2065 }
2066
2067 fn create_test_proposal(
2069 author: &Account<CurrentNetwork>,
2070 committee: Committee<CurrentNetwork>,
2071 round: u64,
2072 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2073 timestamp: i64,
2074 num_transactions: u64,
2075 rng: &mut TestRng,
2076 ) -> Proposal<CurrentNetwork> {
2077 let mut transmission_ids = IndexSet::new();
2078 let mut transmissions = IndexMap::new();
2079
2080 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2082 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2083 let solution_transmission_id = (solution_id, solution_checksum).into();
2084 transmission_ids.insert(solution_transmission_id);
2085 transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2086
2087 for _ in 0..num_transactions {
2089 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2090 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2091 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2092 transmission_ids.insert(transaction_transmission_id);
2093 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2094 }
2095
2096 let private_key = author.private_key();
2098 let batch_header = BatchHeader::new(
2100 private_key,
2101 round,
2102 timestamp,
2103 committee.id(),
2104 transmission_ids,
2105 previous_certificate_ids,
2106 rng,
2107 )
2108 .unwrap();
2109 Proposal::new(committee, batch_header, transmissions).unwrap()
2111 }
2112
2113 fn peer_signatures_for_proposal(
2116 primary: &Primary<CurrentNetwork>,
2117 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2118 rng: &mut TestRng,
2119 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2120 let mut signatures = Vec::with_capacity(accounts.len() - 1);
2122 for (socket_addr, account) in accounts {
2123 if account.address() == primary.gateway.account().address() {
2124 continue;
2125 }
2126 let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2127 let signature = account.sign(&[batch_id], rng).unwrap();
2128 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2129 }
2130
2131 signatures
2132 }
2133
2134 fn peer_signatures_for_batch(
2136 primary_address: Address<CurrentNetwork>,
2137 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2138 batch_id: Field<CurrentNetwork>,
2139 rng: &mut TestRng,
2140 ) -> IndexSet<Signature<CurrentNetwork>> {
2141 let mut signatures = IndexSet::new();
2142 for (_, account) in accounts {
2143 if account.address() == primary_address {
2144 continue;
2145 }
2146 let signature = account.sign(&[batch_id], rng).unwrap();
2147 signatures.insert(signature);
2148 }
2149 signatures
2150 }
2151
2152 fn create_batch_certificate(
2154 primary_address: Address<CurrentNetwork>,
2155 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2156 round: u64,
2157 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2158 rng: &mut TestRng,
2159 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2160 let timestamp = now();
2161
2162 let author =
2163 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2164 let private_key = author.private_key();
2165
2166 let committee_id = Field::rand(rng);
2167 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2168 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2169 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2170 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2171
2172 let solution_transmission_id = (solution_id, solution_checksum).into();
2173 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2174
2175 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2176 let transmissions = [
2177 (solution_transmission_id, Transmission::Solution(solution)),
2178 (transaction_transmission_id, Transmission::Transaction(transaction)),
2179 ]
2180 .into();
2181
2182 let batch_header = BatchHeader::new(
2183 private_key,
2184 round,
2185 timestamp,
2186 committee_id,
2187 transmission_ids,
2188 previous_certificate_ids,
2189 rng,
2190 )
2191 .unwrap();
2192 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2193 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2194 (certificate, transmissions)
2195 }
2196
2197 fn store_certificate_chain(
2199 primary: &Primary<CurrentNetwork>,
2200 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2201 round: u64,
2202 rng: &mut TestRng,
2203 ) -> IndexSet<Field<CurrentNetwork>> {
2204 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2205 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2206 for cur_round in 1..round {
2207 for (_, account) in accounts.iter() {
2208 let (certificate, transmissions) = create_batch_certificate(
2209 account.address(),
2210 accounts,
2211 cur_round,
2212 previous_certificates.clone(),
2213 rng,
2214 );
2215 next_certificates.insert(certificate.id());
2216 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2217 }
2218
2219 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2220 previous_certificates = next_certificates;
2221 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2222 }
2223
2224 previous_certificates
2225 }
2226
2227 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2230 for (addr, acct) in accounts.iter().skip(1) {
2232 primary.gateway.resolver().insert_peer(*addr, *addr, acct.address());
2233 }
2234 }
2235
2236 #[tokio::test]
2237 async fn test_propose_batch() {
2238 let mut rng = TestRng::default();
2239 let (primary, _) = primary_without_handlers(&mut rng);
2240
2241 assert!(primary.proposed_batch.read().is_none());
2243
2244 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2246 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2247
2248 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2250 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2251
2252 assert!(primary.propose_batch().await.is_ok());
2254 assert!(primary.proposed_batch.read().is_some());
2255 }
2256
2257 #[tokio::test]
2258 async fn test_propose_batch_with_no_transmissions() {
2259 let mut rng = TestRng::default();
2260 let (primary, _) = primary_without_handlers(&mut rng);
2261
2262 assert!(primary.proposed_batch.read().is_none());
2264
2265 assert!(primary.propose_batch().await.is_ok());
2267 assert!(primary.proposed_batch.read().is_some());
2268 }
2269
2270 #[tokio::test]
2271 async fn test_propose_batch_in_round() {
2272 let round = 3;
2273 let mut rng = TestRng::default();
2274 let (primary, accounts) = primary_without_handlers(&mut rng);
2275
2276 store_certificate_chain(&primary, &accounts, round, &mut rng);
2278
2279 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2281
2282 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2284 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2285
2286 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2288 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2289
2290 assert!(primary.propose_batch().await.is_ok());
2292 assert!(primary.proposed_batch.read().is_some());
2293 }
2294
2295 #[tokio::test]
2296 async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2297 let round = 3;
2298 let prev_round = round - 1;
2299 let mut rng = TestRng::default();
2300 let (primary, accounts) = primary_without_handlers(&mut rng);
2301 let peer_account = &accounts[1];
2302 let peer_ip = peer_account.0;
2303
2304 store_certificate_chain(&primary, &accounts, round, &mut rng);
2306
2307 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2309
2310 let mut num_transmissions_in_previous_round = 0;
2312
2313 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2315 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2316 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2317 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2318
2319 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2321 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2322
2323 assert_eq!(primary.workers[0].num_transmissions(), 2);
2325
2326 for (_, account) in accounts.iter() {
2328 let (certificate, transmissions) = create_batch_certificate(
2329 account.address(),
2330 &accounts,
2331 round,
2332 previous_certificate_ids.clone(),
2333 &mut rng,
2334 );
2335
2336 for (transmission_id, transmission) in transmissions.iter() {
2338 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2339 }
2340
2341 num_transmissions_in_previous_round += transmissions.len();
2343 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2344 }
2345
2346 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2348
2349 assert!(primary.storage.increment_to_next_round(round).is_ok());
2351
2352 assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2354
2355 assert!(primary.propose_batch().await.is_ok());
2357
2358 let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2360 assert_eq!(proposed_transmissions.len(), 2);
2361 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2362 assert!(
2363 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2364 );
2365 }
2366
2367 #[tokio::test]
2368 async fn test_propose_batch_over_spend_limit() {
2369 let mut rng = TestRng::default();
2370
2371 let (accounts, committee) = sample_committee(&mut rng);
2373 let primary = primary_with_committee(
2374 0,
2375 &accounts,
2376 committee.clone(),
2377 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2378 );
2379
2380 assert!(primary.proposed_batch.read().is_none());
2382 primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2384
2385 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2387 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2388
2389 for _i in 0..5 {
2390 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2391 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2393 }
2394
2395 assert!(primary.propose_batch().await.is_ok());
2397 assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2399 assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2401 }
2402
2403 #[tokio::test]
2404 async fn test_batch_propose_from_peer() {
2405 let mut rng = TestRng::default();
2406 let (primary, accounts) = primary_without_handlers(&mut rng);
2407
2408 let round = 1;
2410 let peer_account = &accounts[1];
2411 let peer_ip = peer_account.0;
2412 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2413 let proposal = create_test_proposal(
2414 &peer_account.1,
2415 primary.ledger.current_committee().unwrap(),
2416 round,
2417 Default::default(),
2418 timestamp,
2419 1,
2420 &mut rng,
2421 );
2422
2423 for (transmission_id, transmission) in proposal.transmissions() {
2425 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2426 }
2427
2428 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2430
2431 primary.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap();
2434 primary.sync.try_block_sync().await;
2435
2436 assert!(
2438 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2439 );
2440 }
2441
2442 #[tokio::test]
2443 async fn test_batch_propose_from_peer_when_not_synced() {
2444 let mut rng = TestRng::default();
2445 let (primary, accounts) = primary_without_handlers(&mut rng);
2446
2447 let round = 1;
2449 let peer_account = &accounts[1];
2450 let peer_ip = peer_account.0;
2451 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2452 let proposal = create_test_proposal(
2453 &peer_account.1,
2454 primary.ledger.current_committee().unwrap(),
2455 round,
2456 Default::default(),
2457 timestamp,
2458 1,
2459 &mut rng,
2460 );
2461
2462 for (transmission_id, transmission) in proposal.transmissions() {
2464 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2465 }
2466
2467 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2469
2470 primary.sync.test_update_peer_locators(peer_ip, sample_block_locators(20)).unwrap();
2472
2473 assert!(
2475 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2476 );
2477 }
2478
2479 #[tokio::test]
2480 async fn test_batch_propose_from_peer_in_round() {
2481 let round = 2;
2482 let mut rng = TestRng::default();
2483 let (primary, accounts) = primary_without_handlers(&mut rng);
2484
2485 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2487
2488 let peer_account = &accounts[1];
2490 let peer_ip = peer_account.0;
2491 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2492 let proposal = create_test_proposal(
2493 &peer_account.1,
2494 primary.ledger.current_committee().unwrap(),
2495 round,
2496 previous_certificates,
2497 timestamp,
2498 1,
2499 &mut rng,
2500 );
2501
2502 for (transmission_id, transmission) in proposal.transmissions() {
2504 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2505 }
2506
2507 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2509
2510 primary.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap();
2513 primary.sync.try_block_sync().await;
2514
2515 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2517 }
2518
2519 #[tokio::test]
2520 async fn test_batch_propose_from_peer_wrong_round() {
2521 let mut rng = TestRng::default();
2522 let (primary, accounts) = primary_without_handlers(&mut rng);
2523
2524 let round = 1;
2526 let peer_account = &accounts[1];
2527 let peer_ip = peer_account.0;
2528 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2529 let proposal = create_test_proposal(
2530 &peer_account.1,
2531 primary.ledger.current_committee().unwrap(),
2532 round,
2533 Default::default(),
2534 timestamp,
2535 1,
2536 &mut rng,
2537 );
2538
2539 for (transmission_id, transmission) in proposal.transmissions() {
2541 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2542 }
2543
2544 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2546 primary.sync.try_block_sync().await;
2548
2549 assert!(
2551 primary
2552 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2553 round: round + 1,
2554 batch_header: Data::Object(proposal.batch_header().clone())
2555 })
2556 .await
2557 .is_err()
2558 );
2559 }
2560
2561 #[tokio::test]
2562 async fn test_batch_propose_from_peer_in_round_wrong_round() {
2563 let round = 4;
2564 let mut rng = TestRng::default();
2565 let (primary, accounts) = primary_without_handlers(&mut rng);
2566
2567 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2569
2570 let peer_account = &accounts[1];
2572 let peer_ip = peer_account.0;
2573 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2574 let proposal = create_test_proposal(
2575 &peer_account.1,
2576 primary.ledger.current_committee().unwrap(),
2577 round,
2578 previous_certificates,
2579 timestamp,
2580 1,
2581 &mut rng,
2582 );
2583
2584 for (transmission_id, transmission) in proposal.transmissions() {
2586 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2587 }
2588
2589 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2591 primary.sync.try_block_sync().await;
2593
2594 assert!(
2596 primary
2597 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2598 round: round + 1,
2599 batch_header: Data::Object(proposal.batch_header().clone())
2600 })
2601 .await
2602 .is_err()
2603 );
2604 }
2605
2606 #[tokio::test]
2608 async fn test_batch_propose_from_peer_with_past_timestamp() {
2609 let round = 2;
2610 let mut rng = TestRng::default();
2611 let (primary, accounts) = primary_without_handlers(&mut rng);
2612
2613 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2615
2616 let peer_account = &accounts[1];
2618 let peer_ip = peer_account.0;
2619
2620 let last_timestamp = primary
2624 .storage
2625 .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2626 .expect("No previous proposal exists")
2627 .timestamp();
2628 let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2629
2630 let proposal = create_test_proposal(
2631 &peer_account.1,
2632 primary.ledger.current_committee().unwrap(),
2633 round,
2634 previous_certificates,
2635 invalid_timestamp,
2636 1,
2637 &mut rng,
2638 );
2639
2640 for (transmission_id, transmission) in proposal.transmissions() {
2642 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2643 }
2644
2645 primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2647 primary.sync.try_block_sync().await;
2649
2650 assert!(
2652 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2653 );
2654 }
2655
2656 #[tokio::test]
2658 async fn test_batch_propose_from_peer_over_spend_limit() {
2659 let mut rng = TestRng::default();
2660
2661 let (accounts, committee) = sample_committee(&mut rng);
2663 let primary_v4 = primary_with_committee(
2664 0,
2665 &accounts,
2666 committee.clone(),
2667 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2668 );
2669 let primary_v5 = primary_with_committee(
2670 1,
2671 &accounts,
2672 committee.clone(),
2673 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2674 );
2675
2676 let round = 1;
2678 let peer_account = &accounts[2];
2679 let peer_ip = peer_account.0;
2680
2681 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2682
2683 let proposal =
2684 create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2685
2686 for (transmission_id, transmission) in proposal.transmissions() {
2688 primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2689 primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2690 }
2691
2692 primary_v4.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2694 primary_v5.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2695
2696 primary_v4.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap();
2698 primary_v4.sync.try_block_sync().await;
2699
2700 primary_v5.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap();
2702 primary_v5.sync.try_block_sync().await;
2703
2704 assert!(
2706 primary_v4
2707 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2708 .await
2709 .is_ok()
2710 );
2711
2712 assert!(
2713 primary_v5
2714 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2715 .await
2716 .is_err()
2717 );
2718 }
2719
2720 #[tokio::test]
2721 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2722 let round = 3;
2723 let mut rng = TestRng::default();
2724 let (primary, _) = primary_without_handlers(&mut rng);
2725
2726 assert!(primary.proposed_batch.read().is_none());
2728
2729 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2731 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2732
2733 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2735 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2736
2737 let old_proposal_lock_round = *primary.propose_lock.lock().await;
2739 *primary.propose_lock.lock().await = round + 1;
2740
2741 assert!(primary.propose_batch().await.is_ok());
2743 assert!(primary.proposed_batch.read().is_none());
2744
2745 *primary.propose_lock.lock().await = old_proposal_lock_round;
2747
2748 assert!(primary.propose_batch().await.is_ok());
2750 assert!(primary.proposed_batch.read().is_some());
2751 }
2752
2753 #[tokio::test]
2754 async fn test_propose_batch_with_storage_round_behind_proposal() {
2755 let round = 5;
2756 let mut rng = TestRng::default();
2757 let (primary, accounts) = primary_without_handlers(&mut rng);
2758
2759 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2761
2762 let timestamp = now();
2764 let proposal = create_test_proposal(
2765 primary.gateway.account(),
2766 primary.ledger.current_committee().unwrap(),
2767 round + 1,
2768 previous_certificates,
2769 timestamp,
2770 1,
2771 &mut rng,
2772 );
2773
2774 *primary.proposed_batch.write() = Some(proposal);
2776
2777 assert!(primary.propose_batch().await.is_ok());
2779 assert!(primary.proposed_batch.read().is_some());
2780 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2781 }
2782
2783 #[tokio::test(flavor = "multi_thread")]
2784 async fn test_batch_signature_from_peer() {
2785 let mut rng = TestRng::default();
2786 let (primary, accounts) = primary_without_handlers(&mut rng);
2787 map_account_addresses(&primary, &accounts);
2788
2789 let round = 1;
2791 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2792 let proposal = create_test_proposal(
2793 primary.gateway.account(),
2794 primary.ledger.current_committee().unwrap(),
2795 round,
2796 Default::default(),
2797 timestamp,
2798 1,
2799 &mut rng,
2800 );
2801
2802 *primary.proposed_batch.write() = Some(proposal);
2804
2805 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2807
2808 for (socket_addr, signature) in signatures {
2810 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2811 }
2812
2813 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2815 assert_eq!(primary.current_round(), round + 1);
2817 }
2818
2819 #[tokio::test(flavor = "multi_thread")]
2820 async fn test_batch_signature_from_peer_in_round() {
2821 let round = 5;
2822 let mut rng = TestRng::default();
2823 let (primary, accounts) = primary_without_handlers(&mut rng);
2824 map_account_addresses(&primary, &accounts);
2825
2826 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2828
2829 let timestamp = now();
2831 let proposal = create_test_proposal(
2832 primary.gateway.account(),
2833 primary.ledger.current_committee().unwrap(),
2834 round,
2835 previous_certificates,
2836 timestamp,
2837 1,
2838 &mut rng,
2839 );
2840
2841 *primary.proposed_batch.write() = Some(proposal);
2843
2844 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2846
2847 for (socket_addr, signature) in signatures {
2849 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2850 }
2851
2852 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2854 assert_eq!(primary.current_round(), round + 1);
2856 }
2857
2858 #[tokio::test]
2859 async fn test_batch_signature_from_peer_no_quorum() {
2860 let mut rng = TestRng::default();
2861 let (primary, accounts) = primary_without_handlers(&mut rng);
2862 map_account_addresses(&primary, &accounts);
2863
2864 let round = 1;
2866 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2867 let proposal = create_test_proposal(
2868 primary.gateway.account(),
2869 primary.ledger.current_committee().unwrap(),
2870 round,
2871 Default::default(),
2872 timestamp,
2873 1,
2874 &mut rng,
2875 );
2876
2877 *primary.proposed_batch.write() = Some(proposal);
2879
2880 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2882
2883 let (socket_addr, signature) = signatures.first().unwrap();
2885 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2886
2887 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2889 assert_eq!(primary.current_round(), round);
2891 }
2892
2893 #[tokio::test]
2894 async fn test_batch_signature_from_peer_in_round_no_quorum() {
2895 let round = 7;
2896 let mut rng = TestRng::default();
2897 let (primary, accounts) = primary_without_handlers(&mut rng);
2898 map_account_addresses(&primary, &accounts);
2899
2900 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2902
2903 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2905 let proposal = create_test_proposal(
2906 primary.gateway.account(),
2907 primary.ledger.current_committee().unwrap(),
2908 round,
2909 previous_certificates,
2910 timestamp,
2911 1,
2912 &mut rng,
2913 );
2914
2915 *primary.proposed_batch.write() = Some(proposal);
2917
2918 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2920
2921 let (socket_addr, signature) = signatures.first().unwrap();
2923 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2924
2925 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2927 assert_eq!(primary.current_round(), round);
2929 }
2930
2931 #[tokio::test]
2932 async fn test_insert_certificate_with_aborted_transmissions() {
2933 let round = 3;
2934 let prev_round = round - 1;
2935 let mut rng = TestRng::default();
2936 let (primary, accounts) = primary_without_handlers(&mut rng);
2937 let peer_account = &accounts[1];
2938 let peer_ip = peer_account.0;
2939
2940 store_certificate_chain(&primary, &accounts, round, &mut rng);
2942
2943 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2945
2946 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2948 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2949
2950 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2952 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2953
2954 assert_eq!(primary.workers[0].num_transmissions(), 2);
2956
2957 let account = accounts[0].1.clone();
2959 let (certificate, transmissions) =
2960 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2961 let certificate_id = certificate.id();
2962
2963 let mut aborted_transmissions = HashSet::new();
2965 let mut transmissions_without_aborted = HashMap::new();
2966 for (transmission_id, transmission) in transmissions.clone() {
2967 match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
2968 true => {
2969 aborted_transmissions.insert(transmission_id);
2971 }
2972 false => {
2973 transmissions_without_aborted.insert(transmission_id, transmission);
2975 }
2976 };
2977 }
2978
2979 for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2981 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2982 }
2983
2984 assert!(
2986 primary
2987 .storage
2988 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2989 .is_err()
2990 );
2991 assert!(
2992 primary
2993 .storage
2994 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
2995 .is_err()
2996 );
2997
2998 primary
3000 .storage
3001 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3002 .unwrap();
3003
3004 assert!(primary.storage.contains_certificate(certificate_id));
3006 for aborted_transmission_id in aborted_transmissions {
3008 assert!(primary.storage.contains_transmission(aborted_transmission_id));
3009 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3010 }
3011 }
3012}