1use crate::{
17 Gateway,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 PRIMARY_PING_IN_MS,
22 Sync,
23 Transport,
24 WORKER_PING_IN_MS,
25 Worker,
26 events::{BatchPropose, BatchSignature, Event},
27 helpers::{
28 PrimaryReceiver,
29 PrimarySender,
30 Proposal,
31 ProposalCache,
32 SignedProposals,
33 Storage,
34 assign_to_worker,
35 assign_to_workers,
36 fmt_id,
37 init_sync_channels,
38 init_worker_channels,
39 now,
40 },
41 spawn_blocking,
42 sync::SyncCallback,
43};
44
45use snarkos_account::Account;
46use snarkos_node_bft_events::PrimaryPing;
47use snarkos_node_bft_ledger_service::LedgerService;
48use snarkos_node_network::PeerPoolHandling;
49use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
50use snarkos_utilities::{CallbackHandle, NodeDataDir};
51
52use snarkvm::{
53 console::{
54 prelude::*,
55 types::{Address, Field},
56 },
57 ledger::{
58 block::Transaction,
59 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
60 puzzle::{Solution, SolutionID},
61 },
62 prelude::{ConsensusVersion, committee::Committee},
63 utilities::flatten_error,
64};
65
66use anyhow::Context;
67use colored::Colorize;
68use futures::stream::{FuturesUnordered, StreamExt};
69use indexmap::{IndexMap, IndexSet};
70#[cfg(feature = "locktick")]
71use locktick::{
72 parking_lot::{Mutex, RwLock},
73 tokio::Mutex as TMutex,
74};
75#[cfg(not(feature = "locktick"))]
76use parking_lot::{Mutex, RwLock};
77#[cfg(not(feature = "serial"))]
78use rayon::prelude::*;
79#[cfg(feature = "metrics")]
80use std::time::Instant;
81use std::{
82 collections::{HashMap, HashSet},
83 future::Future,
84 net::SocketAddr,
85 sync::{Arc, OnceLock},
86 time::Duration,
87};
88#[cfg(not(feature = "locktick"))]
89use tokio::sync::Mutex as TMutex;
90use tokio::task::JoinHandle;
91
92pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
94
95#[async_trait::async_trait]
98pub trait PrimaryCallback<N: Network>: Send + std::marker::Sync {
99 fn update_to_next_round(&self, current_round: u64) -> bool;
101
102 async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()>;
104}
105
106#[derive(Clone)]
109pub struct Primary<N: Network> {
110 sync: Sync<N>,
112 gateway: Gateway<N>,
114 storage: Storage<N>,
116 ledger: Arc<dyn LedgerService<N>>,
118 workers: Arc<OnceLock<Vec<Worker<N>>>>,
120 primary_callback: Arc<CallbackHandle<Arc<dyn PrimaryCallback<N>>>>,
122 proposed_batch: Arc<ProposedBatch<N>>,
124 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
126 #[cfg(feature = "metrics")]
129 batch_propose_start: Arc<Mutex<Option<Instant>>>,
130 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
132 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
134 propose_lock: Arc<TMutex<u64>>,
136 node_data_dir: NodeDataDir,
138}
139
140impl<N: Network> Primary<N> {
141 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
143
144 #[allow(clippy::too_many_arguments)]
146 pub fn new(
147 account: Account<N>,
148 storage: Storage<N>,
149 ledger: Arc<dyn LedgerService<N>>,
150 block_sync: Arc<BlockSync<N>>,
151 ip: Option<SocketAddr>,
152 trusted_validators: &[SocketAddr],
153 trusted_peers_only: bool,
154 node_data_dir: NodeDataDir,
155 dev: Option<u16>,
156 ) -> Result<Self> {
157 let gateway = Gateway::new(
159 account,
160 storage.clone(),
161 ledger.clone(),
162 ip,
163 trusted_validators,
164 trusted_peers_only,
165 node_data_dir.clone(),
166 dev,
167 )?;
168 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
170
171 Ok(Self {
173 sync,
174 gateway,
175 storage,
176 ledger,
177 workers: Default::default(),
178 primary_callback: Default::default(),
179 proposed_batch: Default::default(),
180 latest_proposed_batch_timestamp: Default::default(),
181 #[cfg(feature = "metrics")]
182 batch_propose_start: Default::default(),
183 signed_proposals: Default::default(),
184 handles: Default::default(),
185 propose_lock: Default::default(),
186 node_data_dir,
187 })
188 }
189
190 async fn load_proposal_cache(&self) -> Result<()> {
192 match ProposalCache::<N>::exists(&self.node_data_dir) {
194 true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.node_data_dir) {
196 Ok(proposal_cache) => {
197 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
199 proposal_cache.into();
200
201 let ledger_round = self.ledger.latest_round();
206 let max_gc_rounds = BatchHeader::<N>::MAX_GC_ROUNDS as u64;
207 if latest_certificate_round > ledger_round.saturating_add(max_gc_rounds) {
208 bail!(
209 "The proposal cache (round {latest_certificate_round}) is more than {max_gc_rounds} \
210 rounds ahead of the ledger (round {ledger_round}). \
211 Please restore a more recent ledger snapshot before restarting the node."
212 );
213 }
214
215 *self.proposed_batch.write() = proposed_batch;
217 *self.signed_proposals.write() = signed_proposals;
219 *self.propose_lock.lock().await = latest_certificate_round;
221
222 for certificate in pending_certificates {
224 let batch_id = certificate.batch_id();
225 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
229 {
230 let err = err.context(format!(
231 "Failed to load stored certificate {} from proposal cache",
232 fmt_id(batch_id)
233 ));
234 warn!("{}", &flatten_error(err));
235 }
236 }
237 Ok(())
238 }
239 Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
240 },
241 false => Ok(()),
243 }
244 }
245
246 pub async fn run(
248 &self,
249 ping: Option<Arc<Ping<N>>>,
250 primary_callback: Option<Arc<dyn PrimaryCallback<N>>>,
251 sync_callback: Option<Arc<dyn SyncCallback<N>>>,
252 primary_sender: PrimarySender<N>,
253 primary_receiver: PrimaryReceiver<N>,
254 ) -> Result<()> {
255 info!("Starting the primary instance of the memory pool...");
256
257 if let Some(callback) = primary_callback {
259 self.primary_callback.set(callback)?;
260 }
261
262 let mut worker_senders = IndexMap::new();
264 let mut workers = Vec::new();
266 for id in 0..MAX_WORKERS {
268 let (tx_worker, rx_worker) = init_worker_channels();
270 let worker = Worker::new(
272 id,
273 Arc::new(self.gateway.clone()),
274 self.storage.clone(),
275 self.ledger.clone(),
276 self.proposed_batch.clone(),
277 )?;
278 worker.run(rx_worker);
280 workers.push(worker);
282 worker_senders.insert(id, tx_worker);
284 }
285 if self.workers.set(workers).is_err() {
287 bail!("Workers already set. `Primary::run` cannot be called more than once.");
288 }
289
290 let (sync_sender, sync_receiver) = init_sync_channels();
292 self.sync.initialize(sync_callback).await?;
294 self.load_proposal_cache().await?;
296 self.sync.run(ping, sync_receiver).await?;
298 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
300 self.start_handlers(primary_receiver);
303
304 Ok(())
305 }
306
307 pub fn current_round(&self) -> u64 {
309 self.storage.current_round()
310 }
311
312 pub fn is_synced(&self) -> bool {
314 self.sync.is_synced()
315 }
316
317 pub const fn gateway(&self) -> &Gateway<N> {
319 &self.gateway
320 }
321
322 pub const fn storage(&self) -> &Storage<N> {
324 &self.storage
325 }
326
327 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
329 &self.ledger
330 }
331
332 pub fn num_workers(&self) -> u8 {
334 u8::try_from(self.workers.get().expect("Primary is not running yet").len()).expect("Too many workers")
335 }
336
337 pub fn workers(&self) -> &[Worker<N>] {
339 self.workers.get().expect("Primary is not running yet")
340 }
341
342 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
344 &self.proposed_batch
345 }
346}
347
348impl<N: Network> Primary<N> {
349 pub fn num_unconfirmed_transmissions(&self) -> usize {
351 self.workers().iter().map(|worker| worker.num_transmissions()).sum()
352 }
353
354 pub fn num_unconfirmed_ratifications(&self) -> usize {
356 self.workers().iter().map(|worker| worker.num_ratifications()).sum()
357 }
358
359 pub fn num_unconfirmed_solutions(&self) -> usize {
361 self.workers().iter().map(|worker| worker.num_solutions()).sum()
362 }
363
364 pub fn num_unconfirmed_transactions(&self) -> usize {
366 self.workers().iter().map(|worker| worker.num_transactions()).sum()
367 }
368}
369
370impl<N: Network> Primary<N> {
371 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
373 self.workers().iter().flat_map(|worker| worker.transmission_ids())
374 }
375
376 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
378 self.workers().iter().flat_map(|worker| worker.transmissions())
379 }
380
381 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
383 self.workers().iter().flat_map(|worker| worker.solutions())
384 }
385
386 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
388 self.workers().iter().flat_map(|worker| worker.transactions())
389 }
390}
391
392impl<N: Network> Primary<N> {
393 pub fn clear_worker_solutions(&self) {
395 self.workers().iter().for_each(Worker::clear_solutions);
396 }
397}
398
399impl<N: Network> Primary<N> {
400 pub async fn propose_batch(&self) -> Result<()> {
408 let mut lock_guard = self.propose_lock.lock().await;
410
411 if let Err(err) = self
413 .check_proposed_batch_for_expiration()
414 .with_context(|| "Failed to check the proposed batch for expiration")
415 {
416 warn!("{}", flatten_error(&err));
417 return Ok(());
418 }
419
420 let round = self.current_round();
422 let previous_round = round.saturating_sub(1);
424
425 ensure!(round > 0, "Round 0 cannot have transaction batches");
429
430 if round < *lock_guard {
432 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
433 return Ok(());
434 }
435
436 if let Some(proposal) = self.proposed_batch.read().as_ref() {
439 if round < proposal.round()
441 || proposal
442 .batch_header()
443 .previous_certificate_ids()
444 .iter()
445 .any(|id| !self.storage.contains_certificate(*id))
446 {
447 warn!(
448 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
449 proposal.round(),
450 );
451 return Ok(());
452 }
453 let event = Event::BatchPropose(proposal.batch_header().clone().into());
456 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
458 match self.gateway.resolver().read().get_peer_ip_for_address(address) {
460 Some(peer_ip) => {
462 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
463 tokio::spawn(async move {
464 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
465 if gateway.send(peer_ip, event_).await.is_none() {
467 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
468 }
469 });
470 }
471 None => continue,
472 }
473 }
474 debug!("Proposed batch for round {} is still valid", proposal.round());
475 return Ok(());
476 }
477
478 #[cfg(feature = "metrics")]
479 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
480
481 if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
483 debug!(
484 "{}",
485 flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}")))
486 );
487 return Ok(());
488 }
489
490 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
492 if let Some(cb) = &*self.primary_callback.get_ref() {
494 match cb.update_to_next_round(self.current_round()) {
495 true => (), false => return Ok(()),
499 }
500 }
501 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
502 return Ok(());
503 }
504
505 if round == *lock_guard {
511 debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
512 return Ok(());
513 }
514
515 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
517 {
519 let mut connected_validators = self.gateway.connected_addresses();
521 connected_validators.insert(self.gateway.account().address());
523 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
525 debug!(
526 "Primary is safely skipping a batch proposal for round {round} {}",
527 "(please connect to more validators)".dimmed()
528 );
529 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
530 return Ok(());
531 }
532 }
533
534 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
536
537 let mut is_ready = previous_round == 0;
540 if previous_round > 0 {
542 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
544 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
545 };
546 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
548 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
550 is_ready = true;
551 }
552 }
553 if !is_ready {
555 debug!(
556 "Primary is safely skipping a batch proposal for round {round} {}",
557 format!("(previous round {previous_round} has not reached quorum)").dimmed()
558 );
559 return Ok(());
560 }
561
562 let mut transmissions: IndexMap<_, _> = Default::default();
564 let mut proposal_cost = 0u64;
566 debug_assert_eq!(MAX_WORKERS, 1);
570
571 'outer: for worker in self.workers().iter() {
572 let mut num_worker_transmissions = 0usize;
573
574 while let Some((id, transmission)) = worker.remove_front() {
575 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
577 worker.insert_front(id, transmission);
579 break 'outer;
580 }
581
582 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
584 worker.insert_front(id, transmission);
586 continue 'outer;
587 }
588
589 if self.ledger.contains_transmission(&id).unwrap_or(true) {
591 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
592 continue;
593 }
594
595 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
599 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
600 continue;
601 }
602
603 match (id, transmission.clone()) {
605 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
606 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
608 {
609 trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
610 continue;
611 }
612 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
614 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
615 continue;
616 }
617 }
618 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
619 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
621 {
622 trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
623 continue;
624 }
625
626 let transaction = spawn_blocking!({
628 match transaction {
629 Data::Object(transaction) => Ok(transaction),
630 Data::Buffer(bytes) => Ok(Transaction::<N>::read_le(
631 &mut bytes.take(N::LATEST_MAX_TRANSACTION_SIZE() as u64),
632 )?),
633 }
634 })?;
635
636 let current_block_height = self.ledger.latest_block_height();
638 let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
639
640 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
643 else {
644 debug!(
645 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
646 fmt_id(transaction_id)
647 );
648 continue;
649 };
650
651 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
653 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
654 continue;
655 }
656
657 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
660 debug!(
661 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
662 fmt_id(transaction_id)
663 );
664 continue;
665 };
666
667 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
669 if next_proposal_cost > batch_spend_limit {
670 debug!(
671 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
672 fmt_id(transaction_id),
673 batch_spend_limit
674 );
675
676 worker.insert_front(id, transmission);
678 break 'outer;
679 }
680
681 proposal_cost = next_proposal_cost;
683 }
684
685 (TransmissionID::Ratification, Transmission::Ratification) => continue,
688 _ => continue,
690 }
691
692 transmissions.insert(id, transmission);
694 num_worker_transmissions = num_worker_transmissions.saturating_add(1);
695 }
696 }
697
698 let current_timestamp = now();
700
701 *lock_guard = round;
702
703 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
705
706 let private_key = *self.gateway.account().private_key();
708 let committee_id = committee_lookback.id();
710 let transmission_ids = transmissions.keys().copied().collect();
712 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
714 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
716 &private_key,
717 round,
718 current_timestamp,
719 committee_id,
720 transmission_ids,
721 previous_certificate_ids,
722 &mut rand::thread_rng()
723 ))
724 .and_then(|batch_header| {
725 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
726 .map(|proposal| (batch_header, proposal))
727 })
728 .inspect_err(|_| {
729 if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
731 error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
732 }
733 })?;
734 self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
736 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
738 #[cfg(feature = "metrics")]
740 {
741 *self.batch_propose_start.lock() = Some(Instant::now());
742 }
743 *self.proposed_batch.write() = Some(proposal);
745 Ok(())
746 }
747
748 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
758 let BatchPropose { round: batch_round, batch_header } = batch_propose;
759
760 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
762 if batch_round != batch_header.round() {
764 self.gateway.disconnect(peer_ip);
766 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
767 }
768
769 let batch_author = batch_header.author();
771
772 match self.gateway.resolve_to_aleo_addr(peer_ip) {
774 Some(address) => {
776 if address != batch_author {
777 self.gateway.disconnect(peer_ip);
779 bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
780 }
781 }
782 None => bail!("Batch proposal from a disconnected validator"),
783 }
784 if !self.gateway.is_authorized_validator_address(batch_author) {
786 self.gateway.disconnect(peer_ip);
788 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
789 }
790 if self.gateway.account().address() == batch_author {
792 bail!("Invalid peer - proposed batch from myself ({batch_author})");
793 }
794
795 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
801 if expected_committee_id != batch_header.committee_id() {
802 self.gateway.disconnect(peer_ip);
804 bail!(
805 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
806 batch_header.committee_id()
807 );
808 }
809
810 if let Some((signed_round, signed_batch_id, signature)) =
812 self.signed_proposals.read().get(&batch_author).copied()
813 {
814 if signed_round > batch_header.round() {
817 bail!(
818 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
819 batch_header.round()
820 );
821 }
822
823 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
825 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
826 }
827 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
830 let gateway = self.gateway.clone();
831 tokio::spawn(async move {
832 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
833 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
834 if gateway.send(peer_ip, event).await.is_none() {
836 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
837 }
838 });
839 return Ok(());
841 }
842 }
843
844 if self.storage.contains_batch(batch_header.batch_id()) {
847 debug!(
848 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
849 format!("batch for round {batch_round} already exists in storage").dimmed()
850 );
851 return Ok(());
852 }
853
854 let previous_round = batch_round.saturating_sub(1);
856 if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
858 self.gateway.disconnect(peer_ip);
860 return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
861 }
862
863 if batch_header.contains(TransmissionID::Ratification) {
865 self.gateway.disconnect(peer_ip);
867 bail!(
868 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
869 );
870 }
871
872 let mut missing_transmissions =
874 self.sync_with_batch_header_from_peer::<false, true>(peer_ip, &batch_header).await?;
875
876 if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
878 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
880 }) {
881 let err = err.context(format!(
882 "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
883 ));
884 debug!("{}", flatten_error(err));
885 return Ok(());
886 }
887
888 if let Err(e) = self.ensure_is_signing_round(batch_round) {
892 debug!("{e} from '{peer_ip}'");
894 return Ok(());
895 }
896
897 let (storage, header) = (self.storage.clone(), batch_header.clone());
899
900 let Some(missing_transmissions) =
902 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
903 else {
904 return Ok(());
905 };
906
907 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
909
910 let block_height = self.ledger.latest_block_height() + 1;
912 if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
913 let mut proposal_cost = 0u64;
914 for transmission_id in batch_header.transmission_ids() {
915 let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
916 let Some(worker) = self.workers().get(worker_id as usize) else {
917 debug!("Unable to find worker {worker_id}");
918 return Ok(());
919 };
920
921 let Some(transmission) = worker.get_transmission(*transmission_id) else {
922 debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
923 return Ok(());
924 };
925
926 if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
928 (transmission_id, transmission)
929 {
930 let transaction = spawn_blocking!({
932 match transaction {
933 Data::Object(transaction) => Ok(transaction),
934 Data::Buffer(bytes) => {
935 Ok(Transaction::<N>::read_le(&mut bytes.take(N::LATEST_MAX_TRANSACTION_SIZE() as u64))?)
938 }
939 }
940 })?;
941
942 let consensus_version = N::CONSENSUS_VERSION(block_height)?;
944 if consensus_version < ConsensusVersion::V14 {
947 if let Some(max_tx_size) = consensus_config_value!(N, MAX_TRANSACTION_SIZE, block_height) {
949 if transaction.to_bytes_le()?.len() > max_tx_size {
951 trace!(
952 "Invalid batch proposal - Batch proposal transaction '{}' - Exceeds maximum transaction size of {max_tx_size} bytes",
953 fmt_id(transaction_id),
954 );
955 continue;
956 }
957 }
958 }
959
960 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
963 else {
964 bail!(
965 "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
966 fmt_id(transaction_id)
967 )
968 };
969
970 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
973 bail!(
974 "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
975 fmt_id(transaction_id)
976 )
977 };
978
979 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
981 if next_proposal_cost > batch_spend_limit {
982 bail!(
983 "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
984 fmt_id(transaction_id),
985 batch_spend_limit
986 );
987 }
988
989 proposal_cost = next_proposal_cost;
991 }
992 }
993 }
994
995 let batch_id = batch_header.batch_id();
999 let account = self.gateway.account().clone();
1001 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
1002
1003 match self.signed_proposals.write().0.entry(batch_author) {
1009 std::collections::hash_map::Entry::Occupied(mut entry) => {
1010 if entry.get().0 == batch_round {
1015 return Ok(());
1016 }
1017 entry.insert((batch_round, batch_id, signature));
1019 }
1020 std::collections::hash_map::Entry::Vacant(entry) => {
1022 entry.insert((batch_round, batch_id, signature));
1024 }
1025 };
1026
1027 let self_ = self.clone();
1029 tokio::spawn(async move {
1030 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1031 if self_.gateway.send(peer_ip, event).await.is_some() {
1033 debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1034 }
1035 });
1036
1037 Ok(())
1038 }
1039
1040 async fn process_batch_signature_from_peer(
1049 &self,
1050 peer_ip: SocketAddr,
1051 batch_signature: BatchSignature<N>,
1052 ) -> Result<()> {
1053 self.check_proposed_batch_for_expiration()?;
1055
1056 let BatchSignature { batch_id, signature } = batch_signature;
1058
1059 let signer = signature.to_address();
1061
1062 if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1064 self.gateway.disconnect(peer_ip);
1066 bail!("Malicious peer - batch signature is from a different validator ({signer})");
1067 }
1068 if self.gateway.account().address() == signer {
1070 bail!("Invalid peer - received a batch signature from myself ({signer})");
1071 }
1072
1073 let self_ = self.clone();
1074 let Some(proposal) = spawn_blocking!({
1075 let mut proposed_batch = self_.proposed_batch.write();
1077 match proposed_batch.as_mut() {
1079 Some(proposal) => {
1080 if proposal.batch_id() != batch_id {
1082 match self_.storage.contains_batch(batch_id) {
1083 true => {
1085 debug!(
1086 "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1087 proposal.round()
1088 );
1089 return Ok(None);
1090 }
1091 false => bail!(
1093 "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1094 proposal.batch_id(),
1095 proposal.round()
1096 ),
1097 }
1098 }
1099 let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1101 let Some(signer) = self_.gateway.resolve_to_aleo_addr(peer_ip) else {
1103 bail!("Signature is from a disconnected validator");
1104 };
1105 let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1107
1108 if new_signature {
1109 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1110 if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1112 return Ok(None);
1114 }
1115 } else {
1116 debug!(
1117 "Received duplicated signature from '{peer_ip}' for batch {batch_id} in round {round}",
1118 round = proposal.round()
1119 );
1120 return Ok(None);
1121 }
1122 }
1123 None => return Ok(None),
1125 };
1126 match proposed_batch.take() {
1128 Some(proposal) => Ok(Some(proposal)),
1129 None => Ok(None),
1130 }
1131 })?
1132 else {
1133 return Ok(());
1134 };
1135
1136 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1139
1140 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1142 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1145 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1147 return Err(e);
1148 }
1149
1150 #[cfg(feature = "metrics")]
1151 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1152 Ok(())
1153 }
1154
1155 async fn process_batch_certificate_from_peer(
1162 &self,
1163 peer_ip: SocketAddr,
1164 certificate: BatchCertificate<N>,
1165 ) -> Result<()> {
1166 if !self.gateway.is_authorized_validator_ip(peer_ip) {
1168 self.gateway.disconnect(peer_ip);
1170 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1171 }
1172 if self.storage.contains_certificate(certificate.id()) {
1174 return Ok(());
1175 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1177 self.storage.insert_unprocessed_certificate(certificate.clone())?;
1178 }
1179
1180 let author = certificate.author();
1182 let certificate_round = certificate.round();
1184 let committee_id = certificate.committee_id();
1186
1187 if self.gateway.account().address() == author {
1189 bail!("Received a batch certificate for myself ({author})");
1190 }
1191
1192 self.storage.check_incoming_certificate(&certificate)?;
1194
1195 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1207
1208 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1213
1214 let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1216 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1218
1219 let expected_committee_id = committee_lookback.id();
1221 if expected_committee_id != committee_id {
1222 self.gateway.disconnect(peer_ip);
1224 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1225 }
1226
1227 let should_advance = match &*self.proposed_batch.read() {
1231 Some(proposal) => proposal.round() < certificate_round,
1233 None => true,
1235 };
1236
1237 let current_round = self.current_round();
1239
1240 if is_quorum && should_advance && certificate_round >= current_round {
1242 self.try_increment_to_the_next_round(current_round + 1).await?;
1244 }
1245 Ok(())
1246 }
1247}
1248
1249impl<N: Network> Primary<N> {
1250 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1259 let PrimaryReceiver {
1260 mut rx_batch_propose,
1261 mut rx_batch_signature,
1262 mut rx_batch_certified,
1263 mut rx_primary_ping,
1264 mut rx_unconfirmed_solution,
1265 mut rx_unconfirmed_transaction,
1266 } = primary_receiver;
1267
1268 let self_ = self.clone();
1270 self.spawn(async move {
1271 loop {
1272 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1274
1275 let self__ = self_.clone();
1277 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1278 Ok(block_locators) => block_locators,
1279 Err(e) => {
1280 warn!("Failed to retrieve block locators - {e}");
1281 continue;
1282 }
1283 };
1284
1285 let primary_certificate = {
1287 let primary_address = self_.gateway.account().address();
1289
1290 let mut certificate = None;
1292 let mut current_round = self_.current_round();
1293 while certificate.is_none() {
1294 if current_round == 0 {
1296 break;
1297 }
1298 if let Some(primary_certificate) =
1300 self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1301 {
1302 certificate = Some(primary_certificate);
1303 } else {
1305 current_round = current_round.saturating_sub(1);
1306 }
1307 }
1308
1309 match certificate {
1311 Some(certificate) => certificate,
1312 None => continue,
1314 }
1315 };
1316
1317 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1319 self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1321 }
1322 });
1323
1324 let self_ = self.clone();
1326 self.spawn(async move {
1327 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1328 if self_.sync.is_synced() {
1330 trace!("Processing new primary ping from '{peer_ip}'");
1331 } else {
1332 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1333 continue;
1334 }
1335
1336 {
1338 let self_ = self_.clone();
1339 tokio::spawn(async move {
1340 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1342 else {
1343 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1344 return;
1345 };
1346 let id = fmt_id(primary_certificate.id());
1348 let round = primary_certificate.round();
1349 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1350 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1351 }
1352 });
1353 }
1354 }
1355 });
1356
1357 let self_ = self.clone();
1359 self.spawn(async move {
1360 loop {
1361 tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1362 if !self_.sync.is_synced() {
1364 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1365 continue;
1366 }
1367 for worker in self_.workers() {
1369 worker.broadcast_ping();
1370 }
1371 }
1372 });
1373
1374 let self_ = self.clone();
1376 self.spawn(async move {
1377 loop {
1378 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1380 let current_round = self_.current_round();
1381 if !self_.sync.is_synced() {
1383 debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1384 continue;
1385 }
1386 if self_.propose_lock.try_lock().is_err() {
1389 trace!(
1390 "Skipping batch proposal for round {current_round} {}",
1391 "(node is already proposing)".dimmed()
1392 );
1393 continue;
1394 };
1395 if let Err(e) = self_.propose_batch().await {
1399 warn!("Cannot propose a batch - {e}");
1400 }
1401 }
1402 });
1403
1404 let self_ = self.clone();
1406 self.spawn(async move {
1407 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1408 if !self_.sync.is_synced() {
1410 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1411 continue;
1412 }
1413 let self_ = self_.clone();
1415 tokio::spawn(async move {
1416 let round = batch_propose.round;
1418 if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1419 warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1420 }
1421 });
1422 }
1423 });
1424
1425 let self_ = self.clone();
1427 self.spawn(async move {
1428 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1429 if !self_.sync.is_synced() {
1431 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1432 continue;
1433 }
1434 let id = fmt_id(batch_signature.batch_id);
1440 if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1441 let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1442 warn!("{}", flatten_error(err));
1443 }
1444 }
1445 });
1446
1447 let self_ = self.clone();
1449 self.spawn(async move {
1450 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1451 if !self_.sync.is_synced() {
1453 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1454 continue;
1455 }
1456 let self_ = self_.clone();
1458 tokio::spawn(async move {
1459 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1461 warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1462 return;
1463 };
1464 let id = fmt_id(batch_certificate.id());
1466 let round = batch_certificate.round();
1467 if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1468 warn!(
1469 "{}",
1470 flatten_error(err.context(format!(
1471 "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1472 )))
1473 );
1474 }
1475 });
1476 }
1477 });
1478
1479 let self_ = self.clone();
1484 self.spawn(async move {
1485 loop {
1486 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1488 if !self_.sync.is_synced() {
1490 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1491 continue;
1492 }
1493 let current_round = self_.current_round();
1495 let next_round = current_round.saturating_add(1);
1496 let is_quorum_threshold_reached = {
1498 let authors = self_.storage.get_certificate_authors_for_round(current_round);
1500 if authors.is_empty() {
1502 continue;
1503 }
1504 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1506 warn!("Failed to retrieve the committee lookback for round {current_round}");
1507 continue;
1508 };
1509 committee_lookback.is_quorum_threshold_reached(&authors)
1511 };
1512 if is_quorum_threshold_reached {
1514 debug!("Quorum threshold reached for round {current_round}");
1515 if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1516 warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1517 }
1518 }
1519 }
1520 });
1521
1522 let self_ = self.clone();
1524 self.spawn(async move {
1525 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1526 let Ok(checksum) = solution.to_checksum::<N>() else {
1528 error!("Failed to compute the checksum for the unconfirmed solution");
1529 continue;
1530 };
1531 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1533 error!("Unable to determine the worker ID for the unconfirmed solution");
1534 continue;
1535 };
1536 let self_ = self_.clone();
1537 tokio::spawn(async move {
1538 let worker = &self_.workers()[worker_id as usize];
1540 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1542 callback.send(result).ok();
1544 });
1545 }
1546 });
1547
1548 let self_ = self.clone();
1550 self.spawn(async move {
1551 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1552 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1553 let Ok(checksum) = transaction.to_checksum::<N>() else {
1555 error!("Failed to compute the checksum for the unconfirmed transaction");
1556 continue;
1557 };
1558 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1560 error!("Unable to determine the worker ID for the unconfirmed transaction");
1561 continue;
1562 };
1563 let self_ = self_.clone();
1564 tokio::spawn(async move {
1565 let worker = &self_.workers().get(worker_id as usize).expect("Invalid worker ID");
1567 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1569 callback.send(result).ok();
1571 });
1572 }
1573 });
1574 }
1575
1576 fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1578 let is_expired = match self.proposed_batch.read().as_ref() {
1580 Some(proposal) => proposal.round() < self.current_round(),
1581 None => false,
1582 };
1583 if is_expired {
1585 let proposal = self.proposed_batch.write().take();
1587 if let Some(proposal) = proposal {
1588 debug!("Cleared expired proposal for round {}", proposal.round());
1589 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1590 }
1591 }
1592 Ok(())
1593 }
1594
1595 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1597 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1599 let mut fast_forward_round = self.current_round();
1600 while fast_forward_round < next_round.saturating_sub(1) {
1602 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1604 *self.proposed_batch.write() = None;
1606 }
1607 }
1608
1609 let current_round = self.current_round();
1611 if current_round < next_round {
1613 let is_ready = if let Some(cb) = self.primary_callback.get() {
1615 cb.update_to_next_round(current_round)
1616 }
1617 else {
1619 self.storage.increment_to_next_round(current_round)?;
1621 true
1623 };
1624
1625 match is_ready {
1627 true => debug!("Primary is ready to propose the next round"),
1628 false => debug!("Primary is not ready to propose the next round"),
1629 }
1630
1631 if is_ready {
1633 self.propose_batch().await?;
1634 }
1635 }
1636 Ok(())
1637 }
1638
1639 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1643 let current_round = self.current_round();
1645 if current_round + self.storage.max_gc_rounds() <= batch_round {
1647 bail!("Round {batch_round} is too far in the future")
1648 }
1649 if current_round > batch_round + 1 {
1653 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1654 }
1655 if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round())
1657 && signing_round > batch_round
1658 {
1659 bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1660 }
1661 Ok(())
1662 }
1663
1664 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1667 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1669 Some(certificate) => certificate.timestamp(),
1671 None => match self.gateway.account().address() == author {
1672 true => *self.latest_proposed_batch_timestamp.read(),
1674 false => return Ok(()),
1676 },
1677 };
1678
1679 let elapsed = timestamp
1681 .checked_sub(previous_timestamp)
1682 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1683 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1685 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1686 false => Ok(()),
1687 }
1688 }
1689
1690 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1692 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1694 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1697 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1699 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1700 debug!("Stored a batch certificate for round {}", certificate.round());
1701 if let Some(cb) = self.primary_callback.get() {
1703 cb.add_new_certificate(certificate.clone())
1705 .await
1706 .with_context(|| "Failed to add new certificate from primary")?;
1707 }
1708 self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1710 let num_transmissions = certificate.transmission_ids().len();
1712 let round = certificate.round();
1713 info!("Our batch with {num_transmissions} transmissions for round {round} was certified!");
1714 #[cfg(feature = "metrics")]
1716 if let Some(start) = self.batch_propose_start.lock().take() {
1717 metrics::histogram(metrics::bft::BATCH_CERTIFICATION_LATENCY, start.elapsed().as_secs_f64());
1718 }
1719 self.try_increment_to_the_next_round(round + 1).await
1721 }
1722
1723 fn insert_missing_transmissions_into_workers(
1725 &self,
1726 peer_ip: SocketAddr,
1727 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1728 ) -> Result<()> {
1729 assign_to_workers(self.workers(), transmissions, |worker, transmission_id, transmission| {
1731 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1732 })
1733 }
1734
1735 fn reinsert_transmissions_into_workers(
1737 &self,
1738 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1739 ) -> Result<()> {
1740 assign_to_workers(self.workers(), transmissions.into_iter(), |worker, transmission_id, transmission| {
1742 worker.reinsert(transmission_id, transmission);
1743 })
1744 }
1745
1746 #[async_recursion::async_recursion]
1756 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1757 &self,
1758 peer_ip: SocketAddr,
1759 certificate: BatchCertificate<N>,
1760 ) -> Result<()> {
1761 let batch_header = certificate.batch_header();
1763 let batch_round = batch_header.round();
1765
1766 if batch_round <= self.storage.gc_round() {
1768 return Ok(());
1769 }
1770 if self.storage.contains_certificate(certificate.id()) {
1772 return Ok(());
1773 }
1774
1775 if !IS_SYNCING && !self.is_synced() {
1777 bail!(
1778 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1779 fmt_id(certificate.id())
1780 );
1781 }
1782
1783 let missing_transmissions =
1785 self.sync_with_batch_header_from_peer::<IS_SYNCING, false>(peer_ip, batch_header).await?;
1786
1787 if !self.storage.contains_certificate(certificate.id()) {
1789 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1791 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1792 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1793 if let Some(cb) = self.primary_callback.get() {
1795 cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?;
1796 }
1797 }
1798 Ok(())
1799 }
1800
1801 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool, const CHECK_PREVIOUS_CERTIFICATES: bool>(
1803 &self,
1804 peer_ip: SocketAddr,
1805 batch_header: &BatchHeader<N>,
1806 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1807 let batch_round = batch_header.round();
1809
1810 if batch_round <= self.storage.gc_round() {
1812 bail!("Round {batch_round} is too far in the past")
1813 }
1814
1815 if !IS_SYNCING && !self.is_synced() {
1817 bail!(
1818 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1819 fmt_id(batch_header.batch_id())
1820 );
1821 }
1822
1823 let is_quorum_threshold_reached = {
1825 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1826 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1827 committee_lookback.is_quorum_threshold_reached(&authors)
1828 };
1829
1830 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1835 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1837 if is_behind_schedule || is_peer_far_in_future {
1839 self.try_increment_to_the_next_round(batch_round).await?;
1841 }
1842
1843 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1845
1846 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1848
1849 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1851 missing_transmissions_handle,
1852 missing_previous_certificates_handle,
1853 ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1854
1855 for batch_certificate in missing_previous_certificates {
1857 if CHECK_PREVIOUS_CERTIFICATES {
1862 self.storage.check_incoming_certificate(&batch_certificate)?;
1863 }
1864 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1866 }
1867 Ok(missing_transmissions)
1868 }
1869
1870 async fn fetch_missing_transmissions(
1873 &self,
1874 peer_ip: SocketAddr,
1875 batch_header: &BatchHeader<N>,
1876 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1877 if batch_header.round() <= self.storage.gc_round() {
1879 return Ok(Default::default());
1880 }
1881
1882 if self.storage.contains_batch(batch_header.batch_id()) {
1884 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1885 return Ok(Default::default());
1886 }
1887
1888 let workers = self.workers.clone();
1890
1891 let mut fetch_transmissions = FuturesUnordered::new();
1893
1894 let num_workers = self.num_workers();
1896 for transmission_id in batch_header.transmission_ids() {
1898 if !self.storage.contains_transmission(*transmission_id) {
1900 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1902 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1903 };
1904 let Some(worker) = workers.get().expect("No workers set").get(worker_id as usize) else {
1906 bail!("Unable to find worker {worker_id}")
1907 };
1908 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1910 }
1911 }
1912
1913 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1915 while let Some(result) = fetch_transmissions.next().await {
1917 let (transmission_id, transmission) = result?;
1919 transmissions.insert(transmission_id, transmission);
1921 }
1922 Ok(transmissions)
1924 }
1925
1926 async fn fetch_missing_previous_certificates(
1928 &self,
1929 peer_ip: SocketAddr,
1930 batch_header: &BatchHeader<N>,
1931 ) -> Result<HashSet<BatchCertificate<N>>> {
1932 let round = batch_header.round();
1934 if round == 1 || round <= self.storage.gc_round() + 1 {
1936 return Ok(Default::default());
1937 }
1938
1939 let missing_previous_certificates =
1941 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1942 if !missing_previous_certificates.is_empty() {
1943 debug!(
1944 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1945 missing_previous_certificates.len(),
1946 );
1947 }
1948 Ok(missing_previous_certificates)
1950 }
1951
1952 async fn fetch_missing_certificates(
1954 &self,
1955 peer_ip: SocketAddr,
1956 round: u64,
1957 certificate_ids: &IndexSet<Field<N>>,
1958 ) -> Result<HashSet<BatchCertificate<N>>> {
1959 let mut fetch_certificates = FuturesUnordered::new();
1961 let mut missing_certificates = HashSet::default();
1963 for certificate_id in certificate_ids {
1965 if self.ledger.contains_certificate(certificate_id)? {
1967 continue;
1968 }
1969 if self.storage.contains_certificate(*certificate_id) {
1971 continue;
1972 }
1973 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1975 missing_certificates.insert(certificate);
1976 } else {
1977 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1979 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1982 }
1983 }
1984
1985 match fetch_certificates.is_empty() {
1987 true => return Ok(missing_certificates),
1988 false => trace!(
1989 "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1990 fetch_certificates.len(),
1991 ),
1992 }
1993
1994 while let Some(result) = fetch_certificates.next().await {
1996 missing_certificates.insert(result?);
1998 }
1999 Ok(missing_certificates)
2001 }
2002}
2003
2004impl<N: Network> Primary<N> {
2005 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
2007 self.handles.lock().push(tokio::spawn(future));
2008 }
2009
2010 pub async fn shut_down(&self) {
2012 info!("Shutting down the primary...");
2013 self.primary_callback.clear();
2015 self.workers().iter().for_each(|worker| worker.shut_down());
2017 self.handles.lock().drain(..).for_each(|handle| handle.abort());
2019 let proposal_cache = {
2021 let proposal = self.proposed_batch.write().take();
2022 let signed_proposals = self.signed_proposals.read().clone();
2023 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
2024 let pending_certificates = self.storage.get_pending_certificates();
2025 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
2026 };
2027 if let Err(err) = proposal_cache.store(&self.node_data_dir) {
2028 error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
2029 }
2030 self.gateway.shut_down().await;
2032 }
2033}
2034
2035#[cfg(test)]
2036mod tests {
2037 use super::*;
2038 use snarkos_node_bft_ledger_service::MockLedgerService;
2039 use snarkos_node_bft_storage_service::BFTMemoryService;
2040 use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
2041 use snarkvm::{
2042 ledger::{
2043 committee::{Committee, MIN_VALIDATOR_STAKE},
2044 test_helpers::sample_execution_transaction_with_fee,
2045 },
2046 prelude::{Address, Signature},
2047 };
2048
2049 use bytes::Bytes;
2050 use indexmap::IndexSet;
2051 use rand::RngCore;
2052
2053 type CurrentNetwork = snarkvm::prelude::MainnetV0;
2054
2055 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2056 const COMMITTEE_SIZE: usize = 4;
2058 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2059 let mut members = IndexMap::new();
2060
2061 for i in 0..COMMITTEE_SIZE {
2062 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2063 let account = Account::new(rng).unwrap();
2064
2065 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
2066 accounts.push((socket_addr, account));
2067 }
2068
2069 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2070 }
2071
2072 fn primary_with_committee(
2074 account_index: usize,
2075 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2076 committee: Committee<CurrentNetwork>,
2077 height: u32,
2078 ) -> Primary<CurrentNetwork> {
2079 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2080 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2081
2082 let account = accounts[account_index].1.clone();
2084 let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2085 let primary =
2086 Primary::new(account, storage, ledger, block_sync, None, &[], false, NodeDataDir::new_test(None), None)
2087 .unwrap();
2088
2089 let worker = Worker::new(
2091 0, Arc::new(primary.gateway.clone()),
2093 primary.storage.clone(),
2094 primary.ledger.clone(),
2095 primary.proposed_batch.clone(),
2096 )
2097 .unwrap();
2098 let _ = primary.workers.set(vec![worker]);
2099 for a in accounts.iter().skip(account_index) {
2100 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2101 }
2102
2103 primary
2104 }
2105
2106 fn primary_without_handlers(
2107 rng: &mut TestRng,
2108 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2109 let (accounts, committee) = sample_committee(rng);
2110 let primary = primary_with_committee(
2111 0, &accounts,
2113 committee,
2114 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2115 );
2116
2117 (primary, accounts)
2118 }
2119
2120 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2122 let solution_id = rng.r#gen::<u64>().into();
2124 let size = rng.gen_range(1024..10 * 1024);
2126 let mut vec = vec![0u8; size];
2128 rng.fill_bytes(&mut vec);
2129 let solution = Data::Buffer(Bytes::from(vec));
2130 (solution_id, solution)
2132 }
2133
2134 fn sample_unconfirmed_transaction(
2136 rng: &mut TestRng,
2137 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2138 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2139 let id = transaction.id();
2140
2141 (id, Data::Object(transaction))
2142 }
2143
2144 fn create_test_proposal(
2146 author: &Account<CurrentNetwork>,
2147 committee: Committee<CurrentNetwork>,
2148 round: u64,
2149 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2150 timestamp: i64,
2151 num_transactions: u64,
2152 rng: &mut TestRng,
2153 ) -> Proposal<CurrentNetwork> {
2154 let mut transmission_ids = IndexSet::new();
2155 let mut transmissions = IndexMap::new();
2156
2157 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2159 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2160 let solution_transmission_id = (solution_id, solution_checksum).into();
2161 transmission_ids.insert(solution_transmission_id);
2162 transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2163
2164 for _ in 0..num_transactions {
2166 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2167 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2168 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2169 transmission_ids.insert(transaction_transmission_id);
2170 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2171 }
2172
2173 let private_key = author.private_key();
2175 let batch_header = BatchHeader::new(
2177 private_key,
2178 round,
2179 timestamp,
2180 committee.id(),
2181 transmission_ids,
2182 previous_certificate_ids,
2183 rng,
2184 )
2185 .unwrap();
2186 Proposal::new(committee, batch_header, transmissions).unwrap()
2188 }
2189
2190 fn peer_signatures_for_proposal(
2193 primary: &Primary<CurrentNetwork>,
2194 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2195 rng: &mut TestRng,
2196 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2197 let mut signatures = Vec::with_capacity(accounts.len() - 1);
2199 for (socket_addr, account) in accounts {
2200 if account.address() == primary.gateway.account().address() {
2201 continue;
2202 }
2203 let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2204 let signature = account.sign(&[batch_id], rng).unwrap();
2205 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2206 }
2207
2208 signatures
2209 }
2210
2211 fn peer_signatures_for_batch(
2213 primary_address: Address<CurrentNetwork>,
2214 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2215 batch_id: Field<CurrentNetwork>,
2216 rng: &mut TestRng,
2217 ) -> IndexSet<Signature<CurrentNetwork>> {
2218 let mut signatures = IndexSet::new();
2219 for (_, account) in accounts {
2220 if account.address() == primary_address {
2221 continue;
2222 }
2223 let signature = account.sign(&[batch_id], rng).unwrap();
2224 signatures.insert(signature);
2225 }
2226 signatures
2227 }
2228
2229 fn create_batch_certificate(
2231 primary_address: Address<CurrentNetwork>,
2232 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2233 round: u64,
2234 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2235 rng: &mut TestRng,
2236 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2237 let timestamp = now();
2238
2239 let author =
2240 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2241 let private_key = author.private_key();
2242
2243 let committee_id = Field::rand(rng);
2244 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2245 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2246 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2247 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2248
2249 let solution_transmission_id = (solution_id, solution_checksum).into();
2250 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2251
2252 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2253 let transmissions = [
2254 (solution_transmission_id, Transmission::Solution(solution)),
2255 (transaction_transmission_id, Transmission::Transaction(transaction)),
2256 ]
2257 .into();
2258
2259 let batch_header = BatchHeader::new(
2260 private_key,
2261 round,
2262 timestamp,
2263 committee_id,
2264 transmission_ids,
2265 previous_certificate_ids,
2266 rng,
2267 )
2268 .unwrap();
2269 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2270 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2271 (certificate, transmissions)
2272 }
2273
2274 fn store_certificate_chain(
2276 primary: &Primary<CurrentNetwork>,
2277 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2278 round: u64,
2279 rng: &mut TestRng,
2280 ) -> IndexSet<Field<CurrentNetwork>> {
2281 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2282 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2283 for cur_round in 1..round {
2284 for (_, account) in accounts.iter() {
2285 let (certificate, transmissions) = create_batch_certificate(
2286 account.address(),
2287 accounts,
2288 cur_round,
2289 previous_certificates.clone(),
2290 rng,
2291 );
2292 next_certificates.insert(certificate.id());
2293 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2294 }
2295
2296 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2297 previous_certificates = next_certificates;
2298 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2299 }
2300
2301 previous_certificates
2302 }
2303
2304 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2307 for (addr, acct) in accounts.iter().skip(1) {
2309 primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2310 }
2311 }
2312
2313 #[tokio::test]
2314 async fn test_propose_batch() {
2315 let mut rng = TestRng::default();
2316 let (primary, _) = primary_without_handlers(&mut rng);
2317
2318 assert!(primary.proposed_batch.read().is_none());
2320
2321 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2323 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2324
2325 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2327 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2328
2329 assert!(primary.propose_batch().await.is_ok());
2331 assert!(primary.proposed_batch.read().is_some());
2332 }
2333
2334 #[tokio::test]
2335 async fn test_propose_batch_with_no_transmissions() {
2336 let mut rng = TestRng::default();
2337 let (primary, _) = primary_without_handlers(&mut rng);
2338
2339 assert!(primary.proposed_batch.read().is_none());
2341
2342 assert!(primary.propose_batch().await.is_ok());
2344 assert!(primary.proposed_batch.read().is_some());
2345 }
2346
2347 #[tokio::test]
2348 async fn test_propose_batch_in_round() {
2349 let round = 3;
2350 let mut rng = TestRng::default();
2351 let (primary, accounts) = primary_without_handlers(&mut rng);
2352
2353 store_certificate_chain(&primary, &accounts, round, &mut rng);
2355
2356 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2358
2359 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2361 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2362
2363 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2365 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2366
2367 assert!(primary.propose_batch().await.is_ok());
2369 assert!(primary.proposed_batch.read().is_some());
2370 }
2371
2372 #[tokio::test]
2373 async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2374 let round = 3;
2375 let prev_round = round - 1;
2376 let mut rng = TestRng::default();
2377 let (primary, accounts) = primary_without_handlers(&mut rng);
2378 let peer_account = &accounts[1];
2379 let peer_ip = peer_account.0;
2380
2381 store_certificate_chain(&primary, &accounts, round, &mut rng);
2383
2384 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2386
2387 let mut num_transmissions_in_previous_round = 0;
2389
2390 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2392 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2393 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2394 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2395
2396 primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2398 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2399
2400 assert_eq!(primary.workers()[0].num_transmissions(), 2);
2402
2403 for (_, account) in accounts.iter() {
2405 let (certificate, transmissions) = create_batch_certificate(
2406 account.address(),
2407 &accounts,
2408 round,
2409 previous_certificate_ids.clone(),
2410 &mut rng,
2411 );
2412
2413 for (transmission_id, transmission) in transmissions.iter() {
2415 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2416 }
2417
2418 num_transmissions_in_previous_round += transmissions.len();
2420 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2421 }
2422
2423 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2425
2426 assert!(primary.storage.increment_to_next_round(round).is_ok());
2428
2429 assert_eq!(primary.workers()[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2431
2432 assert!(primary.propose_batch().await.is_ok());
2434
2435 let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2437 assert_eq!(proposed_transmissions.len(), 2);
2438 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2439 assert!(
2440 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2441 );
2442 }
2443
2444 #[tokio::test]
2445 async fn test_propose_batch_over_spend_limit() {
2446 let mut rng = TestRng::default();
2447
2448 let (accounts, committee) = sample_committee(&mut rng);
2450 let primary = primary_with_committee(
2451 0,
2452 &accounts,
2453 committee.clone(),
2454 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2455 );
2456
2457 assert!(primary.proposed_batch.read().is_none());
2459 primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2461
2462 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2464 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2465
2466 for _i in 0..5 {
2467 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2468 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2470 }
2471
2472 assert!(primary.propose_batch().await.is_ok());
2474 assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2476 assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2478 }
2479
2480 #[tokio::test]
2481 async fn test_batch_propose_from_peer() {
2482 let mut rng = TestRng::default();
2483 let (primary, accounts) = primary_without_handlers(&mut rng);
2484
2485 let round = 1;
2487 let peer_account = &accounts[1];
2488 let peer_ip = peer_account.0;
2489 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2490 let proposal = create_test_proposal(
2491 &peer_account.1,
2492 primary.ledger.current_committee().unwrap(),
2493 round,
2494 Default::default(),
2495 timestamp,
2496 1,
2497 &mut rng,
2498 );
2499
2500 for (transmission_id, transmission) in proposal.transmissions() {
2502 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2503 }
2504
2505 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2507
2508 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2511 primary.sync.testing_only_try_block_sync_testing_only().await;
2512
2513 assert!(
2515 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2516 );
2517 }
2518
2519 #[tokio::test]
2520 async fn test_batch_propose_from_peer_when_not_synced() {
2521 let mut rng = TestRng::default();
2522 let (primary, accounts) = primary_without_handlers(&mut rng);
2523
2524 let round = 1;
2526 let peer_account = &accounts[1];
2527 let peer_ip = peer_account.0;
2528 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2529 let proposal = create_test_proposal(
2530 &peer_account.1,
2531 primary.ledger.current_committee().unwrap(),
2532 round,
2533 Default::default(),
2534 timestamp,
2535 1,
2536 &mut rng,
2537 );
2538
2539 for (transmission_id, transmission) in proposal.transmissions() {
2541 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2542 }
2543
2544 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2546
2547 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2549
2550 assert!(
2552 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2553 );
2554 }
2555
2556 #[tokio::test]
2557 async fn test_batch_propose_from_peer_in_round() {
2558 let round = 2;
2559 let mut rng = TestRng::default();
2560 let (primary, accounts) = primary_without_handlers(&mut rng);
2561
2562 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2564
2565 let peer_account = &accounts[1];
2567 let peer_ip = peer_account.0;
2568 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2569 let proposal = create_test_proposal(
2570 &peer_account.1,
2571 primary.ledger.current_committee().unwrap(),
2572 round,
2573 previous_certificates,
2574 timestamp,
2575 1,
2576 &mut rng,
2577 );
2578
2579 for (transmission_id, transmission) in proposal.transmissions() {
2581 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2582 }
2583
2584 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2586
2587 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2590 primary.sync.testing_only_try_block_sync_testing_only().await;
2591
2592 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2594 }
2595
2596 #[tokio::test]
2597 async fn test_batch_propose_from_peer_wrong_round() {
2598 let mut rng = TestRng::default();
2599 let (primary, accounts) = primary_without_handlers(&mut rng);
2600
2601 let round = 1;
2603 let peer_account = &accounts[1];
2604 let peer_ip = peer_account.0;
2605 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2606 let proposal = create_test_proposal(
2607 &peer_account.1,
2608 primary.ledger.current_committee().unwrap(),
2609 round,
2610 Default::default(),
2611 timestamp,
2612 1,
2613 &mut rng,
2614 );
2615
2616 for (transmission_id, transmission) in proposal.transmissions() {
2618 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2619 }
2620
2621 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2623 primary.sync.testing_only_try_block_sync_testing_only().await;
2625
2626 assert!(
2628 primary
2629 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2630 round: round + 1,
2631 batch_header: Data::Object(proposal.batch_header().clone())
2632 })
2633 .await
2634 .is_err()
2635 );
2636 }
2637
2638 #[tokio::test]
2639 async fn test_batch_propose_from_peer_in_round_wrong_round() {
2640 let round = 4;
2641 let mut rng = TestRng::default();
2642 let (primary, accounts) = primary_without_handlers(&mut rng);
2643
2644 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2646
2647 let peer_account = &accounts[1];
2649 let peer_ip = peer_account.0;
2650 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2651 let proposal = create_test_proposal(
2652 &peer_account.1,
2653 primary.ledger.current_committee().unwrap(),
2654 round,
2655 previous_certificates,
2656 timestamp,
2657 1,
2658 &mut rng,
2659 );
2660
2661 for (transmission_id, transmission) in proposal.transmissions() {
2663 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2664 }
2665
2666 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2668 primary.sync.testing_only_try_block_sync_testing_only().await;
2670
2671 assert!(
2673 primary
2674 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2675 round: round + 1,
2676 batch_header: Data::Object(proposal.batch_header().clone())
2677 })
2678 .await
2679 .is_err()
2680 );
2681 }
2682
2683 #[tokio::test]
2685 async fn test_batch_propose_from_peer_with_past_timestamp() {
2686 let round = 2;
2687 let mut rng = TestRng::default();
2688 let (primary, accounts) = primary_without_handlers(&mut rng);
2689
2690 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2692
2693 let peer_account = &accounts[1];
2695 let peer_ip = peer_account.0;
2696
2697 let last_timestamp = primary
2701 .storage
2702 .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2703 .expect("No previous proposal exists")
2704 .timestamp();
2705 let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2706
2707 let proposal = create_test_proposal(
2708 &peer_account.1,
2709 primary.ledger.current_committee().unwrap(),
2710 round,
2711 previous_certificates,
2712 invalid_timestamp,
2713 1,
2714 &mut rng,
2715 );
2716
2717 for (transmission_id, transmission) in proposal.transmissions() {
2719 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2720 }
2721
2722 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2724 primary.sync.testing_only_try_block_sync_testing_only().await;
2726
2727 assert!(
2729 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2730 );
2731 }
2732
2733 #[tokio::test]
2735 async fn test_batch_propose_from_peer_over_spend_limit() {
2736 let mut rng = TestRng::default();
2737
2738 let (accounts, committee) = sample_committee(&mut rng);
2740 let primary_v4 = primary_with_committee(
2741 0,
2742 &accounts,
2743 committee.clone(),
2744 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2745 );
2746 let primary_v5 = primary_with_committee(
2747 1,
2748 &accounts,
2749 committee.clone(),
2750 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2751 );
2752
2753 let round = 1;
2755 let peer_account = &accounts[2];
2756 let peer_ip = peer_account.0;
2757
2758 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2759
2760 let proposal =
2761 create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2762
2763 for (transmission_id, transmission) in proposal.transmissions() {
2765 primary_v4.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2766 primary_v5.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2767 }
2768
2769 primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2771 primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2772
2773 primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2775 primary_v4.sync.testing_only_try_block_sync_testing_only().await;
2776
2777 primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2779 primary_v5.sync.testing_only_try_block_sync_testing_only().await;
2780
2781 assert!(
2783 primary_v4
2784 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2785 .await
2786 .is_ok()
2787 );
2788
2789 assert!(
2790 primary_v5
2791 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2792 .await
2793 .is_err()
2794 );
2795 }
2796
2797 #[tokio::test]
2798 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2799 let round = 3;
2800 let mut rng = TestRng::default();
2801 let (primary, _) = primary_without_handlers(&mut rng);
2802
2803 assert!(primary.proposed_batch.read().is_none());
2805
2806 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2808 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2809
2810 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2812 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2813
2814 let old_proposal_lock_round = *primary.propose_lock.lock().await;
2816 *primary.propose_lock.lock().await = round + 1;
2817
2818 assert!(primary.propose_batch().await.is_ok());
2820 assert!(primary.proposed_batch.read().is_none());
2821
2822 *primary.propose_lock.lock().await = old_proposal_lock_round;
2824
2825 assert!(primary.propose_batch().await.is_ok());
2827 assert!(primary.proposed_batch.read().is_some());
2828 }
2829
2830 #[tokio::test]
2831 async fn test_propose_batch_with_storage_round_behind_proposal() {
2832 let round = 5;
2833 let mut rng = TestRng::default();
2834 let (primary, accounts) = primary_without_handlers(&mut rng);
2835
2836 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2838
2839 let timestamp = now();
2841 let proposal = create_test_proposal(
2842 primary.gateway.account(),
2843 primary.ledger.current_committee().unwrap(),
2844 round + 1,
2845 previous_certificates,
2846 timestamp,
2847 1,
2848 &mut rng,
2849 );
2850
2851 *primary.proposed_batch.write() = Some(proposal);
2853
2854 assert!(primary.propose_batch().await.is_ok());
2856 assert!(primary.proposed_batch.read().is_some());
2857 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2858 }
2859
2860 #[tokio::test(flavor = "multi_thread")]
2861 async fn test_batch_signature_from_peer() {
2862 let mut rng = TestRng::default();
2863 let (primary, accounts) = primary_without_handlers(&mut rng);
2864 map_account_addresses(&primary, &accounts);
2865
2866 let round = 1;
2868 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2869 let proposal = create_test_proposal(
2870 primary.gateway.account(),
2871 primary.ledger.current_committee().unwrap(),
2872 round,
2873 Default::default(),
2874 timestamp,
2875 1,
2876 &mut rng,
2877 );
2878
2879 *primary.proposed_batch.write() = Some(proposal);
2881
2882 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2884
2885 for (socket_addr, signature) in signatures {
2887 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2888 }
2889
2890 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2892 assert_eq!(primary.current_round(), round + 1);
2894 }
2895
2896 #[tokio::test(flavor = "multi_thread")]
2897 async fn test_batch_signature_from_peer_in_round() {
2898 let round = 5;
2899 let mut rng = TestRng::default();
2900 let (primary, accounts) = primary_without_handlers(&mut rng);
2901 map_account_addresses(&primary, &accounts);
2902
2903 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2905
2906 let timestamp = now();
2908 let proposal = create_test_proposal(
2909 primary.gateway.account(),
2910 primary.ledger.current_committee().unwrap(),
2911 round,
2912 previous_certificates,
2913 timestamp,
2914 1,
2915 &mut rng,
2916 );
2917
2918 *primary.proposed_batch.write() = Some(proposal);
2920
2921 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2923
2924 for (socket_addr, signature) in signatures {
2926 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2927 }
2928
2929 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2931 assert_eq!(primary.current_round(), round + 1);
2933 }
2934
2935 #[tokio::test]
2936 async fn test_batch_signature_from_peer_no_quorum() {
2937 let mut rng = TestRng::default();
2938 let (primary, accounts) = primary_without_handlers(&mut rng);
2939 map_account_addresses(&primary, &accounts);
2940
2941 let round = 1;
2943 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2944 let proposal = create_test_proposal(
2945 primary.gateway.account(),
2946 primary.ledger.current_committee().unwrap(),
2947 round,
2948 Default::default(),
2949 timestamp,
2950 1,
2951 &mut rng,
2952 );
2953
2954 *primary.proposed_batch.write() = Some(proposal);
2956
2957 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2959
2960 let (socket_addr, signature) = signatures.first().unwrap();
2962 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2963
2964 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2966 assert_eq!(primary.current_round(), round);
2968 }
2969
2970 #[tokio::test]
2971 async fn test_batch_signature_from_peer_in_round_no_quorum() {
2972 let round = 7;
2973 let mut rng = TestRng::default();
2974 let (primary, accounts) = primary_without_handlers(&mut rng);
2975 map_account_addresses(&primary, &accounts);
2976
2977 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2979
2980 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2982 let proposal = create_test_proposal(
2983 primary.gateway.account(),
2984 primary.ledger.current_committee().unwrap(),
2985 round,
2986 previous_certificates,
2987 timestamp,
2988 1,
2989 &mut rng,
2990 );
2991
2992 *primary.proposed_batch.write() = Some(proposal);
2994
2995 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2997
2998 let (socket_addr, signature) = signatures.first().unwrap();
3000 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
3001
3002 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3004 assert_eq!(primary.current_round(), round);
3006 }
3007
3008 #[tokio::test]
3009 async fn test_insert_certificate_with_aborted_transmissions() {
3010 let round = 3;
3011 let prev_round = round - 1;
3012 let mut rng = TestRng::default();
3013 let (primary, accounts) = primary_without_handlers(&mut rng);
3014 let peer_account = &accounts[1];
3015 let peer_ip = peer_account.0;
3016
3017 store_certificate_chain(&primary, &accounts, round, &mut rng);
3019
3020 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
3022
3023 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
3025 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
3026
3027 primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
3029 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
3030
3031 assert_eq!(primary.workers()[0].num_transmissions(), 2);
3033
3034 let account = accounts[0].1.clone();
3036 let (certificate, transmissions) =
3037 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3038 let certificate_id = certificate.id();
3039
3040 let mut aborted_transmissions = HashSet::new();
3042 let mut transmissions_without_aborted = HashMap::new();
3043 for (transmission_id, transmission) in transmissions.clone() {
3044 match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
3045 true => {
3046 aborted_transmissions.insert(transmission_id);
3048 }
3049 false => {
3050 transmissions_without_aborted.insert(transmission_id, transmission);
3052 }
3053 };
3054 }
3055
3056 for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3058 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3059 }
3060
3061 assert!(
3063 primary
3064 .storage
3065 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3066 .is_err()
3067 );
3068 assert!(
3069 primary
3070 .storage
3071 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3072 .is_err()
3073 );
3074
3075 primary
3077 .storage
3078 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3079 .unwrap();
3080
3081 assert!(primary.storage.contains_certificate(certificate_id));
3083 for aborted_transmission_id in aborted_transmissions {
3085 assert!(primary.storage.contains_transmission(aborted_transmission_id));
3086 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3087 }
3088 }
3089}