1use crate::{
17 Gateway,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 PRIMARY_PING_IN_MS,
22 Sync,
23 Transport,
24 WORKER_PING_IN_MS,
25 Worker,
26 events::{BatchPropose, BatchSignature, Event},
27 helpers::{
28 BFTSender,
29 PrimaryReceiver,
30 PrimarySender,
31 Proposal,
32 ProposalCache,
33 SignedProposals,
34 Storage,
35 assign_to_worker,
36 assign_to_workers,
37 fmt_id,
38 init_sync_channels,
39 init_worker_channels,
40 now,
41 },
42 spawn_blocking,
43};
44
45use snarkos_account::Account;
46use snarkos_node_bft_events::PrimaryPing;
47use snarkos_node_bft_ledger_service::LedgerService;
48use snarkos_node_network::PeerPoolHandling;
49use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
50use snarkos_utilities::NodeDataDir;
51
52use snarkvm::{
53 console::{
54 prelude::*,
55 types::{Address, Field},
56 },
57 ledger::{
58 block::Transaction,
59 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
60 puzzle::{Solution, SolutionID},
61 },
62 prelude::{ConsensusVersion, committee::Committee},
63 utilities::flatten_error,
64};
65
66use anyhow::Context;
67use colored::Colorize;
68use futures::stream::{FuturesUnordered, StreamExt};
69use indexmap::{IndexMap, IndexSet};
70#[cfg(feature = "locktick")]
71use locktick::{
72 parking_lot::{Mutex, RwLock},
73 tokio::Mutex as TMutex,
74};
75#[cfg(not(feature = "locktick"))]
76use parking_lot::{Mutex, RwLock};
77#[cfg(not(feature = "serial"))]
78use rayon::prelude::*;
79#[cfg(feature = "metrics")]
80use std::time::Instant;
81use std::{
82 collections::{HashMap, HashSet},
83 future::Future,
84 net::SocketAddr,
85 sync::Arc,
86 time::Duration,
87};
88#[cfg(not(feature = "locktick"))]
89use tokio::sync::Mutex as TMutex;
90use tokio::{sync::OnceCell, task::JoinHandle};
91
92pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
94
95#[derive(Clone)]
98pub struct Primary<N: Network> {
99 sync: Sync<N>,
101 gateway: Gateway<N>,
103 storage: Storage<N>,
105 ledger: Arc<dyn LedgerService<N>>,
107 workers: Arc<[Worker<N>]>,
109 bft_sender: Arc<OnceCell<BFTSender<N>>>,
111 proposed_batch: Arc<ProposedBatch<N>>,
113 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
115 #[cfg(feature = "metrics")]
118 batch_propose_start: Arc<Mutex<Option<Instant>>>,
119 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
121 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
123 propose_lock: Arc<TMutex<u64>>,
125 node_data_dir: NodeDataDir,
127}
128
129impl<N: Network> Primary<N> {
130 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
132
133 #[allow(clippy::too_many_arguments)]
135 pub fn new(
136 account: Account<N>,
137 storage: Storage<N>,
138 ledger: Arc<dyn LedgerService<N>>,
139 block_sync: Arc<BlockSync<N>>,
140 ip: Option<SocketAddr>,
141 trusted_validators: &[SocketAddr],
142 trusted_peers_only: bool,
143 node_data_dir: NodeDataDir,
144 dev: Option<u16>,
145 ) -> Result<Self> {
146 let gateway = Gateway::new(
148 account,
149 storage.clone(),
150 ledger.clone(),
151 ip,
152 trusted_validators,
153 trusted_peers_only,
154 node_data_dir.clone(),
155 dev,
156 )?;
157 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
159
160 Ok(Self {
162 sync,
163 gateway,
164 storage,
165 ledger,
166 workers: Arc::from(vec![]),
167 bft_sender: Default::default(),
168 proposed_batch: Default::default(),
169 latest_proposed_batch_timestamp: Default::default(),
170 #[cfg(feature = "metrics")]
171 batch_propose_start: Default::default(),
172 signed_proposals: Default::default(),
173 handles: Default::default(),
174 propose_lock: Default::default(),
175 node_data_dir,
176 })
177 }
178
179 async fn load_proposal_cache(&self) -> Result<()> {
181 match ProposalCache::<N>::exists(&self.node_data_dir) {
183 true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.node_data_dir) {
185 Ok(proposal_cache) => {
186 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
188 proposal_cache.into();
189
190 *self.proposed_batch.write() = proposed_batch;
192 *self.signed_proposals.write() = signed_proposals;
194 *self.propose_lock.lock().await = latest_certificate_round;
196
197 for certificate in pending_certificates {
199 let batch_id = certificate.batch_id();
200 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
204 {
205 let err = err.context(format!(
206 "Failed to load stored certificate {} from proposal cache",
207 fmt_id(batch_id)
208 ));
209 warn!("{}", &flatten_error(err));
210 }
211 }
212 Ok(())
213 }
214 Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
215 },
216 false => Ok(()),
218 }
219 }
220
221 pub async fn run(
223 &mut self,
224 ping: Option<Arc<Ping<N>>>,
225 bft_sender: Option<BFTSender<N>>,
226 primary_sender: PrimarySender<N>,
227 primary_receiver: PrimaryReceiver<N>,
228 ) -> Result<()> {
229 info!("Starting the primary instance of the memory pool...");
230
231 if let Some(bft_sender) = &bft_sender {
233 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
235 }
236
237 let mut worker_senders = IndexMap::new();
239 let mut workers = Vec::new();
241 for id in 0..MAX_WORKERS {
243 let (tx_worker, rx_worker) = init_worker_channels();
245 let worker = Worker::new(
247 id,
248 Arc::new(self.gateway.clone()),
249 self.storage.clone(),
250 self.ledger.clone(),
251 self.proposed_batch.clone(),
252 )?;
253 worker.run(rx_worker);
255 workers.push(worker);
257 worker_senders.insert(id, tx_worker);
259 }
260 self.workers = Arc::from(workers);
262
263 let (sync_sender, sync_receiver) = init_sync_channels();
265 self.sync.initialize(bft_sender).await?;
267 self.load_proposal_cache().await?;
269 self.sync.run(ping, sync_receiver).await?;
271 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
273 self.start_handlers(primary_receiver);
276
277 Ok(())
278 }
279
280 pub fn current_round(&self) -> u64 {
282 self.storage.current_round()
283 }
284
285 pub fn is_synced(&self) -> bool {
287 self.sync.is_synced()
288 }
289
290 pub const fn gateway(&self) -> &Gateway<N> {
292 &self.gateway
293 }
294
295 pub const fn storage(&self) -> &Storage<N> {
297 &self.storage
298 }
299
300 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
302 &self.ledger
303 }
304
305 pub fn num_workers(&self) -> u8 {
307 u8::try_from(self.workers.len()).expect("Too many workers")
308 }
309
310 pub const fn workers(&self) -> &Arc<[Worker<N>]> {
312 &self.workers
313 }
314
315 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
317 &self.proposed_batch
318 }
319}
320
321impl<N: Network> Primary<N> {
322 pub fn num_unconfirmed_transmissions(&self) -> usize {
324 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
325 }
326
327 pub fn num_unconfirmed_ratifications(&self) -> usize {
329 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
330 }
331
332 pub fn num_unconfirmed_solutions(&self) -> usize {
334 self.workers.iter().map(|worker| worker.num_solutions()).sum()
335 }
336
337 pub fn num_unconfirmed_transactions(&self) -> usize {
339 self.workers.iter().map(|worker| worker.num_transactions()).sum()
340 }
341}
342
343impl<N: Network> Primary<N> {
344 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
346 self.workers.iter().flat_map(|worker| worker.transmission_ids())
347 }
348
349 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
351 self.workers.iter().flat_map(|worker| worker.transmissions())
352 }
353
354 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
356 self.workers.iter().flat_map(|worker| worker.solutions())
357 }
358
359 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
361 self.workers.iter().flat_map(|worker| worker.transactions())
362 }
363}
364
365impl<N: Network> Primary<N> {
366 pub fn clear_worker_solutions(&self) {
368 self.workers.iter().for_each(Worker::clear_solutions);
369 }
370}
371
372impl<N: Network> Primary<N> {
373 pub async fn propose_batch(&self) -> Result<()> {
381 let mut lock_guard = self.propose_lock.lock().await;
383
384 if let Err(err) = self
386 .check_proposed_batch_for_expiration()
387 .await
388 .with_context(|| "Failed to check the proposed batch for expiration")
389 {
390 warn!("{}", flatten_error(&err));
391 return Ok(());
392 }
393
394 let round = self.current_round();
396 let previous_round = round.saturating_sub(1);
398
399 ensure!(round > 0, "Round 0 cannot have transaction batches");
403
404 if round < *lock_guard {
406 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
407 return Ok(());
408 }
409
410 if let Some(proposal) = self.proposed_batch.read().as_ref() {
413 if round < proposal.round()
415 || proposal
416 .batch_header()
417 .previous_certificate_ids()
418 .iter()
419 .any(|id| !self.storage.contains_certificate(*id))
420 {
421 warn!(
422 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
423 proposal.round(),
424 );
425 return Ok(());
426 }
427 let event = Event::BatchPropose(proposal.batch_header().clone().into());
430 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
432 match self.gateway.resolver().read().get_peer_ip_for_address(address) {
434 Some(peer_ip) => {
436 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
437 tokio::spawn(async move {
438 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
439 if gateway.send(peer_ip, event_).await.is_none() {
441 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
442 }
443 });
444 }
445 None => continue,
446 }
447 }
448 debug!("Proposed batch for round {} is still valid", proposal.round());
449 return Ok(());
450 }
451
452 #[cfg(feature = "metrics")]
453 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
454
455 if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
457 debug!(
458 "{}",
459 flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}")))
460 );
461 return Ok(());
462 }
463
464 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
466 if let Some(bft_sender) = self.bft_sender.get() {
468 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
469 Ok(true) => (), Ok(false) => return Ok(()),
473 Err(err) => {
475 let err = err.context("Failed to update the BFT to the next round");
476 warn!("{}", &flatten_error(&err));
477 return Err(err);
478 }
479 }
480 }
481 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
482 return Ok(());
483 }
484
485 if round == *lock_guard {
491 debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
492 return Ok(());
493 }
494
495 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
497 {
499 let mut connected_validators = self.gateway.connected_addresses();
501 connected_validators.insert(self.gateway.account().address());
503 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
505 debug!(
506 "Primary is safely skipping a batch proposal for round {round} {}",
507 "(please connect to more validators)".dimmed()
508 );
509 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
510 return Ok(());
511 }
512 }
513
514 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
516
517 let mut is_ready = previous_round == 0;
520 if previous_round > 0 {
522 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
524 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
525 };
526 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
528 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
530 is_ready = true;
531 }
532 }
533 if !is_ready {
535 debug!(
536 "Primary is safely skipping a batch proposal for round {round} {}",
537 format!("(previous round {previous_round} has not reached quorum)").dimmed()
538 );
539 return Ok(());
540 }
541
542 let mut transmissions: IndexMap<_, _> = Default::default();
544 let mut proposal_cost = 0u64;
546 debug_assert_eq!(MAX_WORKERS, 1);
550
551 'outer: for worker in self.workers().iter() {
552 let mut num_worker_transmissions = 0usize;
553
554 while let Some((id, transmission)) = worker.remove_front() {
555 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
557 worker.insert_front(id, transmission);
559 break 'outer;
560 }
561
562 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
564 worker.insert_front(id, transmission);
566 continue 'outer;
567 }
568
569 if self.ledger.contains_transmission(&id).unwrap_or(true) {
571 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
572 continue;
573 }
574
575 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
579 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
580 continue;
581 }
582
583 match (id, transmission.clone()) {
585 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
586 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
588 {
589 trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
590 continue;
591 }
592 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
594 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
595 continue;
596 }
597 }
598 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
599 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
601 {
602 trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
603 continue;
604 }
605
606 let transaction = spawn_blocking!({
608 match transaction {
609 Data::Object(transaction) => Ok(transaction),
610 Data::Buffer(bytes) => {
611 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
612 }
613 }
614 })?;
615
616 let current_block_height = self.ledger.latest_block_height();
620 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
621 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
622 let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
623 if current_block_height > consensus_version_v7_height
624 && current_block_height <= consensus_version_v8_height
625 && transaction.is_deploy()
626 {
627 trace!(
628 "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})",
629 fmt_id(transaction_id)
630 );
631 continue;
632 }
633
634 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
637 else {
638 debug!(
639 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
640 fmt_id(transaction_id)
641 );
642 continue;
643 };
644
645 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
647 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
648 continue;
649 }
650
651 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
654 debug!(
655 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
656 fmt_id(transaction_id)
657 );
658 continue;
659 };
660
661 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
663 if next_proposal_cost > batch_spend_limit {
664 debug!(
665 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
666 fmt_id(transaction_id),
667 batch_spend_limit
668 );
669
670 worker.insert_front(id, transmission);
672 break 'outer;
673 }
674
675 proposal_cost = next_proposal_cost;
677 }
678
679 (TransmissionID::Ratification, Transmission::Ratification) => continue,
682 _ => continue,
684 }
685
686 transmissions.insert(id, transmission);
688 num_worker_transmissions = num_worker_transmissions.saturating_add(1);
689 }
690 }
691
692 let current_timestamp = now();
694
695 *lock_guard = round;
696
697 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
699
700 let private_key = *self.gateway.account().private_key();
702 let committee_id = committee_lookback.id();
704 let transmission_ids = transmissions.keys().copied().collect();
706 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
708 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
710 &private_key,
711 round,
712 current_timestamp,
713 committee_id,
714 transmission_ids,
715 previous_certificate_ids,
716 &mut rand::thread_rng()
717 ))
718 .and_then(|batch_header| {
719 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
720 .map(|proposal| (batch_header, proposal))
721 })
722 .inspect_err(|_| {
723 if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
725 error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
726 }
727 })?;
728 self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
730 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
732 #[cfg(feature = "metrics")]
734 {
735 *self.batch_propose_start.lock() = Some(Instant::now());
736 }
737 *self.proposed_batch.write() = Some(proposal);
739 Ok(())
740 }
741
742 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
752 let BatchPropose { round: batch_round, batch_header } = batch_propose;
753
754 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
756 if batch_round != batch_header.round() {
758 self.gateway.disconnect(peer_ip);
760 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
761 }
762
763 let batch_author = batch_header.author();
765
766 match self.gateway.resolve_to_aleo_addr(peer_ip) {
768 Some(address) => {
770 if address != batch_author {
771 self.gateway.disconnect(peer_ip);
773 bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
774 }
775 }
776 None => bail!("Batch proposal from a disconnected validator"),
777 }
778 if !self.gateway.is_authorized_validator_address(batch_author) {
780 self.gateway.disconnect(peer_ip);
782 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
783 }
784 if self.gateway.account().address() == batch_author {
786 bail!("Invalid peer - proposed batch from myself ({batch_author})");
787 }
788
789 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
791 if expected_committee_id != batch_header.committee_id() {
792 self.gateway.disconnect(peer_ip);
794 bail!(
795 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
796 batch_header.committee_id()
797 );
798 }
799
800 if let Some((signed_round, signed_batch_id, signature)) =
802 self.signed_proposals.read().get(&batch_author).copied()
803 {
804 if signed_round > batch_header.round() {
807 bail!(
808 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
809 batch_header.round()
810 );
811 }
812
813 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
815 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
816 }
817 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
820 let gateway = self.gateway.clone();
821 tokio::spawn(async move {
822 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
823 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
824 if gateway.send(peer_ip, event).await.is_none() {
826 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
827 }
828 });
829 return Ok(());
831 }
832 }
833
834 if self.storage.contains_batch(batch_header.batch_id()) {
837 debug!(
838 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
839 format!("batch for round {batch_round} already exists in storage").dimmed()
840 );
841 return Ok(());
842 }
843
844 let previous_round = batch_round.saturating_sub(1);
846 if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
848 self.gateway.disconnect(peer_ip);
850 return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
851 }
852
853 if batch_header.contains(TransmissionID::Ratification) {
855 self.gateway.disconnect(peer_ip);
857 bail!(
858 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
859 );
860 }
861
862 let mut missing_transmissions =
864 self.sync_with_batch_header_from_peer::<false, true>(peer_ip, &batch_header).await?;
865
866 if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
868 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
870 }) {
871 let err = err.context(format!(
872 "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
873 ));
874 debug!("{}", flatten_error(err));
875 return Ok(());
876 }
877
878 if let Err(e) = self.ensure_is_signing_round(batch_round) {
882 debug!("{e} from '{peer_ip}'");
884 return Ok(());
885 }
886
887 let (storage, header) = (self.storage.clone(), batch_header.clone());
889
890 let Some(missing_transmissions) =
892 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
893 else {
894 return Ok(());
895 };
896
897 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
899
900 let block_height = self.ledger.latest_block_height() + 1;
902 if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
903 let mut proposal_cost = 0u64;
904 for transmission_id in batch_header.transmission_ids() {
905 let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
906 let Some(worker) = self.workers.get(worker_id as usize) else {
907 debug!("Unable to find worker {worker_id}");
908 return Ok(());
909 };
910
911 let Some(transmission) = worker.get_transmission(*transmission_id) else {
912 debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
913 return Ok(());
914 };
915
916 if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
918 (transmission_id, transmission)
919 {
920 let transaction = spawn_blocking!({
922 match transaction {
923 Data::Object(transaction) => Ok(transaction),
924 Data::Buffer(bytes) => {
925 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
926 }
927 }
928 })?;
929
930 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
934 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
935 let consensus_version = N::CONSENSUS_VERSION(block_height)?;
936 if block_height > consensus_version_v7_height
937 && block_height <= consensus_version_v8_height
938 && transaction.is_deploy()
939 {
940 bail!(
941 "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})",
942 )
943 }
944
945 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
948 else {
949 bail!(
950 "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
951 fmt_id(transaction_id)
952 )
953 };
954
955 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
958 bail!(
959 "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
960 fmt_id(transaction_id)
961 )
962 };
963
964 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
966 if next_proposal_cost > batch_spend_limit {
967 bail!(
968 "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
969 fmt_id(transaction_id),
970 batch_spend_limit
971 );
972 }
973
974 proposal_cost = next_proposal_cost;
976 }
977 }
978 }
979
980 let batch_id = batch_header.batch_id();
984 let account = self.gateway.account().clone();
986 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
987
988 match self.signed_proposals.write().0.entry(batch_author) {
994 std::collections::hash_map::Entry::Occupied(mut entry) => {
995 if entry.get().0 == batch_round {
1000 return Ok(());
1001 }
1002 entry.insert((batch_round, batch_id, signature));
1004 }
1005 std::collections::hash_map::Entry::Vacant(entry) => {
1007 entry.insert((batch_round, batch_id, signature));
1009 }
1010 };
1011
1012 let self_ = self.clone();
1014 tokio::spawn(async move {
1015 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1016 if self_.gateway.send(peer_ip, event).await.is_some() {
1018 debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1019 }
1020 });
1021
1022 Ok(())
1023 }
1024
1025 async fn process_batch_signature_from_peer(
1034 &self,
1035 peer_ip: SocketAddr,
1036 batch_signature: BatchSignature<N>,
1037 ) -> Result<()> {
1038 self.check_proposed_batch_for_expiration().await?;
1040
1041 let BatchSignature { batch_id, signature } = batch_signature;
1043
1044 let signer = signature.to_address();
1046
1047 if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1049 self.gateway.disconnect(peer_ip);
1051 bail!("Malicious peer - batch signature is from a different validator ({signer})");
1052 }
1053 if self.gateway.account().address() == signer {
1055 bail!("Invalid peer - received a batch signature from myself ({signer})");
1056 }
1057
1058 let self_ = self.clone();
1059 let Some(proposal) = spawn_blocking!({
1060 let mut proposed_batch = self_.proposed_batch.write();
1062 match proposed_batch.as_mut() {
1064 Some(proposal) => {
1065 if proposal.batch_id() != batch_id {
1067 match self_.storage.contains_batch(batch_id) {
1068 true => {
1070 debug!(
1071 "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1072 proposal.round()
1073 );
1074 return Ok(None);
1075 }
1076 false => bail!(
1078 "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1079 proposal.batch_id(),
1080 proposal.round()
1081 ),
1082 }
1083 }
1084 let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1086 let Some(signer) = self_.gateway.resolve_to_aleo_addr(peer_ip) else {
1088 bail!("Signature is from a disconnected validator");
1089 };
1090 let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1092
1093 if new_signature {
1094 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1095 if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1097 return Ok(None);
1099 }
1100 } else {
1101 debug!(
1102 "Received duplicated signature from '{peer_ip}' for batch {batch_id} in round {round}",
1103 round = proposal.round()
1104 );
1105 return Ok(None);
1106 }
1107 }
1108 None => return Ok(None),
1110 };
1111 match proposed_batch.take() {
1113 Some(proposal) => Ok(Some(proposal)),
1114 None => Ok(None),
1115 }
1116 })?
1117 else {
1118 return Ok(());
1119 };
1120
1121 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1124
1125 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1127 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1130 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1132 return Err(e);
1133 }
1134
1135 #[cfg(feature = "metrics")]
1136 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1137 Ok(())
1138 }
1139
1140 async fn process_batch_certificate_from_peer(
1147 &self,
1148 peer_ip: SocketAddr,
1149 certificate: BatchCertificate<N>,
1150 ) -> Result<()> {
1151 if !self.gateway.is_authorized_validator_ip(peer_ip) {
1153 self.gateway.disconnect(peer_ip);
1155 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1156 }
1157 if self.storage.contains_certificate(certificate.id()) {
1159 return Ok(());
1160 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1162 self.storage.insert_unprocessed_certificate(certificate.clone())?;
1163 }
1164
1165 let author = certificate.author();
1167 let certificate_round = certificate.round();
1169 let committee_id = certificate.committee_id();
1171
1172 if self.gateway.account().address() == author {
1174 bail!("Received a batch certificate for myself ({author})");
1175 }
1176
1177 self.storage.check_incoming_certificate(&certificate)?;
1179
1180 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1192
1193 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1198
1199 let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1201 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1203
1204 let expected_committee_id = committee_lookback.id();
1206 if expected_committee_id != committee_id {
1207 self.gateway.disconnect(peer_ip);
1209 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1210 }
1211
1212 let should_advance = match &*self.proposed_batch.read() {
1216 Some(proposal) => proposal.round() < certificate_round,
1218 None => true,
1220 };
1221
1222 let current_round = self.current_round();
1224
1225 if is_quorum && should_advance && certificate_round >= current_round {
1227 self.try_increment_to_the_next_round(current_round + 1).await?;
1229 }
1230 Ok(())
1231 }
1232}
1233
1234impl<N: Network> Primary<N> {
1235 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1244 let PrimaryReceiver {
1245 mut rx_batch_propose,
1246 mut rx_batch_signature,
1247 mut rx_batch_certified,
1248 mut rx_primary_ping,
1249 mut rx_unconfirmed_solution,
1250 mut rx_unconfirmed_transaction,
1251 } = primary_receiver;
1252
1253 let self_ = self.clone();
1255 self.spawn(async move {
1256 loop {
1257 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1259
1260 let self__ = self_.clone();
1262 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1263 Ok(block_locators) => block_locators,
1264 Err(e) => {
1265 warn!("Failed to retrieve block locators - {e}");
1266 continue;
1267 }
1268 };
1269
1270 let primary_certificate = {
1272 let primary_address = self_.gateway.account().address();
1274
1275 let mut certificate = None;
1277 let mut current_round = self_.current_round();
1278 while certificate.is_none() {
1279 if current_round == 0 {
1281 break;
1282 }
1283 if let Some(primary_certificate) =
1285 self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1286 {
1287 certificate = Some(primary_certificate);
1288 } else {
1290 current_round = current_round.saturating_sub(1);
1291 }
1292 }
1293
1294 match certificate {
1296 Some(certificate) => certificate,
1297 None => continue,
1299 }
1300 };
1301
1302 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1304 self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1306 }
1307 });
1308
1309 let self_ = self.clone();
1311 self.spawn(async move {
1312 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1313 if self_.sync.is_synced() {
1315 trace!("Processing new primary ping from '{peer_ip}'");
1316 } else {
1317 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1318 continue;
1319 }
1320
1321 {
1323 let self_ = self_.clone();
1324 tokio::spawn(async move {
1325 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1327 else {
1328 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1329 return;
1330 };
1331 let id = fmt_id(primary_certificate.id());
1333 let round = primary_certificate.round();
1334 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1335 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1336 }
1337 });
1338 }
1339 }
1340 });
1341
1342 let self_ = self.clone();
1344 self.spawn(async move {
1345 loop {
1346 tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1347 if !self_.sync.is_synced() {
1349 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1350 continue;
1351 }
1352 for worker in self_.workers.iter() {
1354 worker.broadcast_ping();
1355 }
1356 }
1357 });
1358
1359 let self_ = self.clone();
1361 self.spawn(async move {
1362 loop {
1363 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1365 let current_round = self_.current_round();
1366 if !self_.sync.is_synced() {
1368 debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1369 continue;
1370 }
1371 if self_.propose_lock.try_lock().is_err() {
1374 trace!(
1375 "Skipping batch proposal for round {current_round} {}",
1376 "(node is already proposing)".dimmed()
1377 );
1378 continue;
1379 };
1380 if let Err(e) = self_.propose_batch().await {
1384 warn!("Cannot propose a batch - {e}");
1385 }
1386 }
1387 });
1388
1389 let self_ = self.clone();
1391 self.spawn(async move {
1392 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1393 if !self_.sync.is_synced() {
1395 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1396 continue;
1397 }
1398 let self_ = self_.clone();
1400 tokio::spawn(async move {
1401 let round = batch_propose.round;
1403 if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1404 warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1405 }
1406 });
1407 }
1408 });
1409
1410 let self_ = self.clone();
1412 self.spawn(async move {
1413 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1414 if !self_.sync.is_synced() {
1416 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1417 continue;
1418 }
1419 let id = fmt_id(batch_signature.batch_id);
1425 if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1426 let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1427 warn!("{}", flatten_error(err));
1428 }
1429 }
1430 });
1431
1432 let self_ = self.clone();
1434 self.spawn(async move {
1435 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1436 if !self_.sync.is_synced() {
1438 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1439 continue;
1440 }
1441 let self_ = self_.clone();
1443 tokio::spawn(async move {
1444 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1446 warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1447 return;
1448 };
1449 let id = fmt_id(batch_certificate.id());
1451 let round = batch_certificate.round();
1452 if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1453 warn!(
1454 "{}",
1455 flatten_error(err.context(format!(
1456 "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1457 )))
1458 );
1459 }
1460 });
1461 }
1462 });
1463
1464 let self_ = self.clone();
1469 self.spawn(async move {
1470 loop {
1471 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1473 if !self_.sync.is_synced() {
1475 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1476 continue;
1477 }
1478 let current_round = self_.current_round();
1480 let next_round = current_round.saturating_add(1);
1481 let is_quorum_threshold_reached = {
1483 let authors = self_.storage.get_certificate_authors_for_round(current_round);
1485 if authors.is_empty() {
1487 continue;
1488 }
1489 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1491 warn!("Failed to retrieve the committee lookback for round {current_round}");
1492 continue;
1493 };
1494 committee_lookback.is_quorum_threshold_reached(&authors)
1496 };
1497 if is_quorum_threshold_reached {
1499 debug!("Quorum threshold reached for round {current_round}");
1500 if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1501 warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1502 }
1503 }
1504 }
1505 });
1506
1507 let self_ = self.clone();
1509 self.spawn(async move {
1510 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1511 let Ok(checksum) = solution.to_checksum::<N>() else {
1513 error!("Failed to compute the checksum for the unconfirmed solution");
1514 continue;
1515 };
1516 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1518 error!("Unable to determine the worker ID for the unconfirmed solution");
1519 continue;
1520 };
1521 let self_ = self_.clone();
1522 tokio::spawn(async move {
1523 let worker = &self_.workers[worker_id as usize];
1525 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1527 callback.send(result).ok();
1529 });
1530 }
1531 });
1532
1533 let self_ = self.clone();
1535 self.spawn(async move {
1536 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1537 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1538 let Ok(checksum) = transaction.to_checksum::<N>() else {
1540 error!("Failed to compute the checksum for the unconfirmed transaction");
1541 continue;
1542 };
1543 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1545 error!("Unable to determine the worker ID for the unconfirmed transaction");
1546 continue;
1547 };
1548 let self_ = self_.clone();
1549 tokio::spawn(async move {
1550 let worker = &self_.workers[worker_id as usize];
1552 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1554 callback.send(result).ok();
1556 });
1557 }
1558 });
1559 }
1560
1561 async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1563 let is_expired = match self.proposed_batch.read().as_ref() {
1565 Some(proposal) => proposal.round() < self.current_round(),
1566 None => false,
1567 };
1568 if is_expired {
1570 let proposal = self.proposed_batch.write().take();
1572 if let Some(proposal) = proposal {
1573 debug!("Cleared expired proposal for round {}", proposal.round());
1574 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1575 }
1576 }
1577 Ok(())
1578 }
1579
1580 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1582 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1584 let mut fast_forward_round = self.current_round();
1585 while fast_forward_round < next_round.saturating_sub(1) {
1587 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1589 *self.proposed_batch.write() = None;
1591 }
1592 }
1593
1594 let current_round = self.current_round();
1596 if current_round < next_round {
1598 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1600 match bft_sender.send_primary_round_to_bft(current_round).await {
1601 Ok(is_ready) => is_ready,
1602 Err(err) => {
1603 let err = err.context("Failed to update the BFT to the next round");
1604 warn!("{}", flatten_error(&err));
1605 return Err(err);
1606 }
1607 }
1608 }
1609 else {
1611 self.storage.increment_to_next_round(current_round)?;
1613 true
1615 };
1616
1617 match is_ready {
1619 true => debug!("Primary is ready to propose the next round"),
1620 false => debug!("Primary is not ready to propose the next round"),
1621 }
1622
1623 if is_ready {
1625 self.propose_batch().await?;
1626 }
1627 }
1628 Ok(())
1629 }
1630
1631 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1635 let current_round = self.current_round();
1637 if current_round + self.storage.max_gc_rounds() <= batch_round {
1639 bail!("Round {batch_round} is too far in the future")
1640 }
1641 if current_round > batch_round + 1 {
1645 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1646 }
1647 if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1649 if signing_round > batch_round {
1650 bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1651 }
1652 }
1653 Ok(())
1654 }
1655
1656 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1659 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1661 Some(certificate) => certificate.timestamp(),
1663 None => match self.gateway.account().address() == author {
1664 true => *self.latest_proposed_batch_timestamp.read(),
1666 false => return Ok(()),
1668 },
1669 };
1670
1671 let elapsed = timestamp
1673 .checked_sub(previous_timestamp)
1674 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1675 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1677 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1678 false => Ok(()),
1679 }
1680 }
1681
1682 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1684 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1686 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1689 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1691 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1692 debug!("Stored a batch certificate for round {}", certificate.round());
1693 if let Some(bft_sender) = self.bft_sender.get() {
1695 if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1697 let err = err.context("Failed to update the BFT DAG from primary");
1698 warn!("{}", flatten_error(&err));
1699 return Err(err);
1700 };
1701 }
1702 self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1704 let num_transmissions = certificate.transmission_ids().len();
1706 let round = certificate.round();
1707 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1708 #[cfg(feature = "metrics")]
1710 if let Some(start) = self.batch_propose_start.lock().take() {
1711 metrics::histogram(metrics::bft::BATCH_CERTIFICATION_LATENCY, start.elapsed().as_secs_f64());
1712 }
1713 self.try_increment_to_the_next_round(round + 1).await
1715 }
1716
1717 fn insert_missing_transmissions_into_workers(
1719 &self,
1720 peer_ip: SocketAddr,
1721 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1722 ) -> Result<()> {
1723 assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1725 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1726 })
1727 }
1728
1729 fn reinsert_transmissions_into_workers(
1731 &self,
1732 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1733 ) -> Result<()> {
1734 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1736 worker.reinsert(transmission_id, transmission);
1737 })
1738 }
1739
1740 #[async_recursion::async_recursion]
1750 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1751 &self,
1752 peer_ip: SocketAddr,
1753 certificate: BatchCertificate<N>,
1754 ) -> Result<()> {
1755 let batch_header = certificate.batch_header();
1757 let batch_round = batch_header.round();
1759
1760 if batch_round <= self.storage.gc_round() {
1762 return Ok(());
1763 }
1764 if self.storage.contains_certificate(certificate.id()) {
1766 return Ok(());
1767 }
1768
1769 if !IS_SYNCING && !self.is_synced() {
1771 bail!(
1772 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1773 fmt_id(certificate.id())
1774 );
1775 }
1776
1777 let missing_transmissions =
1779 self.sync_with_batch_header_from_peer::<IS_SYNCING, false>(peer_ip, batch_header).await?;
1780
1781 if !self.storage.contains_certificate(certificate.id()) {
1783 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1785 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1786 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1787 if let Some(bft_sender) = self.bft_sender.get() {
1789 if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1791 let err = err.context("Failed to update the BFT DAG from sync");
1792 warn!("{}", &flatten_error(&err));
1793 return Err(err);
1794 };
1795 }
1796 }
1797 Ok(())
1798 }
1799
1800 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool, const CHECK_PREVIOUS_CERTIFICATES: bool>(
1802 &self,
1803 peer_ip: SocketAddr,
1804 batch_header: &BatchHeader<N>,
1805 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1806 let batch_round = batch_header.round();
1808
1809 if batch_round <= self.storage.gc_round() {
1811 bail!("Round {batch_round} is too far in the past")
1812 }
1813
1814 if !IS_SYNCING && !self.is_synced() {
1816 bail!(
1817 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1818 fmt_id(batch_header.batch_id())
1819 );
1820 }
1821
1822 let is_quorum_threshold_reached = {
1824 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1825 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1826 committee_lookback.is_quorum_threshold_reached(&authors)
1827 };
1828
1829 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1834 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1836 if is_behind_schedule || is_peer_far_in_future {
1838 self.try_increment_to_the_next_round(batch_round).await?;
1840 }
1841
1842 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1844
1845 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1847
1848 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1850 missing_transmissions_handle,
1851 missing_previous_certificates_handle,
1852 ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1853
1854 for batch_certificate in missing_previous_certificates {
1856 if CHECK_PREVIOUS_CERTIFICATES {
1861 self.storage.check_incoming_certificate(&batch_certificate)?;
1862 }
1863 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1865 }
1866 Ok(missing_transmissions)
1867 }
1868
1869 async fn fetch_missing_transmissions(
1872 &self,
1873 peer_ip: SocketAddr,
1874 batch_header: &BatchHeader<N>,
1875 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1876 if batch_header.round() <= self.storage.gc_round() {
1878 return Ok(Default::default());
1879 }
1880
1881 if self.storage.contains_batch(batch_header.batch_id()) {
1883 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1884 return Ok(Default::default());
1885 }
1886
1887 let workers = self.workers.clone();
1889
1890 let mut fetch_transmissions = FuturesUnordered::new();
1892
1893 let num_workers = self.num_workers();
1895 for transmission_id in batch_header.transmission_ids() {
1897 if !self.storage.contains_transmission(*transmission_id) {
1899 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1901 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1902 };
1903 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1905 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1907 }
1908 }
1909
1910 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1912 while let Some(result) = fetch_transmissions.next().await {
1914 let (transmission_id, transmission) = result?;
1916 transmissions.insert(transmission_id, transmission);
1918 }
1919 Ok(transmissions)
1921 }
1922
1923 async fn fetch_missing_previous_certificates(
1925 &self,
1926 peer_ip: SocketAddr,
1927 batch_header: &BatchHeader<N>,
1928 ) -> Result<HashSet<BatchCertificate<N>>> {
1929 let round = batch_header.round();
1931 if round == 1 || round <= self.storage.gc_round() + 1 {
1933 return Ok(Default::default());
1934 }
1935
1936 let missing_previous_certificates =
1938 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1939 if !missing_previous_certificates.is_empty() {
1940 debug!(
1941 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1942 missing_previous_certificates.len(),
1943 );
1944 }
1945 Ok(missing_previous_certificates)
1947 }
1948
1949 async fn fetch_missing_certificates(
1951 &self,
1952 peer_ip: SocketAddr,
1953 round: u64,
1954 certificate_ids: &IndexSet<Field<N>>,
1955 ) -> Result<HashSet<BatchCertificate<N>>> {
1956 let mut fetch_certificates = FuturesUnordered::new();
1958 let mut missing_certificates = HashSet::default();
1960 for certificate_id in certificate_ids {
1962 if self.ledger.contains_certificate(certificate_id)? {
1964 continue;
1965 }
1966 if self.storage.contains_certificate(*certificate_id) {
1968 continue;
1969 }
1970 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1972 missing_certificates.insert(certificate);
1973 } else {
1974 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1976 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1979 }
1980 }
1981
1982 match fetch_certificates.is_empty() {
1984 true => return Ok(missing_certificates),
1985 false => trace!(
1986 "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1987 fetch_certificates.len(),
1988 ),
1989 }
1990
1991 while let Some(result) = fetch_certificates.next().await {
1993 missing_certificates.insert(result?);
1995 }
1996 Ok(missing_certificates)
1998 }
1999}
2000
2001impl<N: Network> Primary<N> {
2002 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
2004 self.handles.lock().push(tokio::spawn(future));
2005 }
2006
2007 pub async fn shut_down(&self) {
2009 info!("Shutting down the primary...");
2010 self.workers.iter().for_each(|worker| worker.shut_down());
2012 self.handles.lock().iter().for_each(|handle| handle.abort());
2014 let proposal_cache = {
2016 let proposal = self.proposed_batch.write().take();
2017 let signed_proposals = self.signed_proposals.read().clone();
2018 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
2019 let pending_certificates = self.storage.get_pending_certificates();
2020 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
2021 };
2022 if let Err(err) = proposal_cache.store(&self.node_data_dir) {
2023 error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
2024 }
2025 self.gateway.shut_down().await;
2027 }
2028}
2029
2030#[cfg(test)]
2031mod tests {
2032 use super::*;
2033 use snarkos_node_bft_ledger_service::MockLedgerService;
2034 use snarkos_node_bft_storage_service::BFTMemoryService;
2035 use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
2036 use snarkvm::{
2037 ledger::{
2038 committee::{Committee, MIN_VALIDATOR_STAKE},
2039 test_helpers::sample_execution_transaction_with_fee,
2040 },
2041 prelude::{Address, Signature},
2042 };
2043
2044 use bytes::Bytes;
2045 use indexmap::IndexSet;
2046 use rand::RngCore;
2047
2048 type CurrentNetwork = snarkvm::prelude::MainnetV0;
2049
2050 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2051 const COMMITTEE_SIZE: usize = 4;
2053 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2054 let mut members = IndexMap::new();
2055
2056 for i in 0..COMMITTEE_SIZE {
2057 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2058 let account = Account::new(rng).unwrap();
2059
2060 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
2061 accounts.push((socket_addr, account));
2062 }
2063
2064 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2065 }
2066
2067 fn primary_with_committee(
2069 account_index: usize,
2070 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2071 committee: Committee<CurrentNetwork>,
2072 height: u32,
2073 ) -> Primary<CurrentNetwork> {
2074 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2075 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2076
2077 let account = accounts[account_index].1.clone();
2079 let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2080 let mut primary =
2081 Primary::new(account, storage, ledger, block_sync, None, &[], false, NodeDataDir::new_test(None), None)
2082 .unwrap();
2083
2084 primary.workers = Arc::from([Worker::new(
2086 0, Arc::new(primary.gateway.clone()),
2088 primary.storage.clone(),
2089 primary.ledger.clone(),
2090 primary.proposed_batch.clone(),
2091 )
2092 .unwrap()]);
2093 for a in accounts.iter().skip(account_index) {
2094 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2095 }
2096
2097 primary
2098 }
2099
2100 fn primary_without_handlers(
2101 rng: &mut TestRng,
2102 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2103 let (accounts, committee) = sample_committee(rng);
2104 let primary = primary_with_committee(
2105 0, &accounts,
2107 committee,
2108 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2109 );
2110
2111 (primary, accounts)
2112 }
2113
2114 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2116 let solution_id = rng.r#gen::<u64>().into();
2118 let size = rng.gen_range(1024..10 * 1024);
2120 let mut vec = vec![0u8; size];
2122 rng.fill_bytes(&mut vec);
2123 let solution = Data::Buffer(Bytes::from(vec));
2124 (solution_id, solution)
2126 }
2127
2128 fn sample_unconfirmed_transaction(
2130 rng: &mut TestRng,
2131 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2132 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2133 let id = transaction.id();
2134
2135 (id, Data::Object(transaction))
2136 }
2137
2138 fn create_test_proposal(
2140 author: &Account<CurrentNetwork>,
2141 committee: Committee<CurrentNetwork>,
2142 round: u64,
2143 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2144 timestamp: i64,
2145 num_transactions: u64,
2146 rng: &mut TestRng,
2147 ) -> Proposal<CurrentNetwork> {
2148 let mut transmission_ids = IndexSet::new();
2149 let mut transmissions = IndexMap::new();
2150
2151 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2153 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2154 let solution_transmission_id = (solution_id, solution_checksum).into();
2155 transmission_ids.insert(solution_transmission_id);
2156 transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2157
2158 for _ in 0..num_transactions {
2160 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2161 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2162 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2163 transmission_ids.insert(transaction_transmission_id);
2164 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2165 }
2166
2167 let private_key = author.private_key();
2169 let batch_header = BatchHeader::new(
2171 private_key,
2172 round,
2173 timestamp,
2174 committee.id(),
2175 transmission_ids,
2176 previous_certificate_ids,
2177 rng,
2178 )
2179 .unwrap();
2180 Proposal::new(committee, batch_header, transmissions).unwrap()
2182 }
2183
2184 fn peer_signatures_for_proposal(
2187 primary: &Primary<CurrentNetwork>,
2188 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2189 rng: &mut TestRng,
2190 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2191 let mut signatures = Vec::with_capacity(accounts.len() - 1);
2193 for (socket_addr, account) in accounts {
2194 if account.address() == primary.gateway.account().address() {
2195 continue;
2196 }
2197 let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2198 let signature = account.sign(&[batch_id], rng).unwrap();
2199 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2200 }
2201
2202 signatures
2203 }
2204
2205 fn peer_signatures_for_batch(
2207 primary_address: Address<CurrentNetwork>,
2208 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2209 batch_id: Field<CurrentNetwork>,
2210 rng: &mut TestRng,
2211 ) -> IndexSet<Signature<CurrentNetwork>> {
2212 let mut signatures = IndexSet::new();
2213 for (_, account) in accounts {
2214 if account.address() == primary_address {
2215 continue;
2216 }
2217 let signature = account.sign(&[batch_id], rng).unwrap();
2218 signatures.insert(signature);
2219 }
2220 signatures
2221 }
2222
2223 fn create_batch_certificate(
2225 primary_address: Address<CurrentNetwork>,
2226 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2227 round: u64,
2228 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2229 rng: &mut TestRng,
2230 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2231 let timestamp = now();
2232
2233 let author =
2234 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2235 let private_key = author.private_key();
2236
2237 let committee_id = Field::rand(rng);
2238 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2239 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2240 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2241 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2242
2243 let solution_transmission_id = (solution_id, solution_checksum).into();
2244 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2245
2246 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2247 let transmissions = [
2248 (solution_transmission_id, Transmission::Solution(solution)),
2249 (transaction_transmission_id, Transmission::Transaction(transaction)),
2250 ]
2251 .into();
2252
2253 let batch_header = BatchHeader::new(
2254 private_key,
2255 round,
2256 timestamp,
2257 committee_id,
2258 transmission_ids,
2259 previous_certificate_ids,
2260 rng,
2261 )
2262 .unwrap();
2263 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2264 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2265 (certificate, transmissions)
2266 }
2267
2268 fn store_certificate_chain(
2270 primary: &Primary<CurrentNetwork>,
2271 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2272 round: u64,
2273 rng: &mut TestRng,
2274 ) -> IndexSet<Field<CurrentNetwork>> {
2275 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2276 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2277 for cur_round in 1..round {
2278 for (_, account) in accounts.iter() {
2279 let (certificate, transmissions) = create_batch_certificate(
2280 account.address(),
2281 accounts,
2282 cur_round,
2283 previous_certificates.clone(),
2284 rng,
2285 );
2286 next_certificates.insert(certificate.id());
2287 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2288 }
2289
2290 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2291 previous_certificates = next_certificates;
2292 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2293 }
2294
2295 previous_certificates
2296 }
2297
2298 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2301 for (addr, acct) in accounts.iter().skip(1) {
2303 primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2304 }
2305 }
2306
2307 #[tokio::test]
2308 async fn test_propose_batch() {
2309 let mut rng = TestRng::default();
2310 let (primary, _) = primary_without_handlers(&mut rng);
2311
2312 assert!(primary.proposed_batch.read().is_none());
2314
2315 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2317 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2318
2319 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2321 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2322
2323 assert!(primary.propose_batch().await.is_ok());
2325 assert!(primary.proposed_batch.read().is_some());
2326 }
2327
2328 #[tokio::test]
2329 async fn test_propose_batch_with_no_transmissions() {
2330 let mut rng = TestRng::default();
2331 let (primary, _) = primary_without_handlers(&mut rng);
2332
2333 assert!(primary.proposed_batch.read().is_none());
2335
2336 assert!(primary.propose_batch().await.is_ok());
2338 assert!(primary.proposed_batch.read().is_some());
2339 }
2340
2341 #[tokio::test]
2342 async fn test_propose_batch_in_round() {
2343 let round = 3;
2344 let mut rng = TestRng::default();
2345 let (primary, accounts) = primary_without_handlers(&mut rng);
2346
2347 store_certificate_chain(&primary, &accounts, round, &mut rng);
2349
2350 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2352
2353 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2355 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2356
2357 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2359 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2360
2361 assert!(primary.propose_batch().await.is_ok());
2363 assert!(primary.proposed_batch.read().is_some());
2364 }
2365
2366 #[tokio::test]
2367 async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2368 let round = 3;
2369 let prev_round = round - 1;
2370 let mut rng = TestRng::default();
2371 let (primary, accounts) = primary_without_handlers(&mut rng);
2372 let peer_account = &accounts[1];
2373 let peer_ip = peer_account.0;
2374
2375 store_certificate_chain(&primary, &accounts, round, &mut rng);
2377
2378 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2380
2381 let mut num_transmissions_in_previous_round = 0;
2383
2384 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2386 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2387 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2388 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2389
2390 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2392 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2393
2394 assert_eq!(primary.workers[0].num_transmissions(), 2);
2396
2397 for (_, account) in accounts.iter() {
2399 let (certificate, transmissions) = create_batch_certificate(
2400 account.address(),
2401 &accounts,
2402 round,
2403 previous_certificate_ids.clone(),
2404 &mut rng,
2405 );
2406
2407 for (transmission_id, transmission) in transmissions.iter() {
2409 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2410 }
2411
2412 num_transmissions_in_previous_round += transmissions.len();
2414 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2415 }
2416
2417 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2419
2420 assert!(primary.storage.increment_to_next_round(round).is_ok());
2422
2423 assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2425
2426 assert!(primary.propose_batch().await.is_ok());
2428
2429 let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2431 assert_eq!(proposed_transmissions.len(), 2);
2432 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2433 assert!(
2434 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2435 );
2436 }
2437
2438 #[tokio::test]
2439 async fn test_propose_batch_over_spend_limit() {
2440 let mut rng = TestRng::default();
2441
2442 let (accounts, committee) = sample_committee(&mut rng);
2444 let primary = primary_with_committee(
2445 0,
2446 &accounts,
2447 committee.clone(),
2448 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2449 );
2450
2451 assert!(primary.proposed_batch.read().is_none());
2453 primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2455
2456 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2458 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2459
2460 for _i in 0..5 {
2461 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2462 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2464 }
2465
2466 assert!(primary.propose_batch().await.is_ok());
2468 assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2470 assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2472 }
2473
2474 #[tokio::test]
2475 async fn test_batch_propose_from_peer() {
2476 let mut rng = TestRng::default();
2477 let (primary, accounts) = primary_without_handlers(&mut rng);
2478
2479 let round = 1;
2481 let peer_account = &accounts[1];
2482 let peer_ip = peer_account.0;
2483 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2484 let proposal = create_test_proposal(
2485 &peer_account.1,
2486 primary.ledger.current_committee().unwrap(),
2487 round,
2488 Default::default(),
2489 timestamp,
2490 1,
2491 &mut rng,
2492 );
2493
2494 for (transmission_id, transmission) in proposal.transmissions() {
2496 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2497 }
2498
2499 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2501
2502 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2505 primary.sync.testing_only_try_block_sync_testing_only().await;
2506
2507 assert!(
2509 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2510 );
2511 }
2512
2513 #[tokio::test]
2514 async fn test_batch_propose_from_peer_when_not_synced() {
2515 let mut rng = TestRng::default();
2516 let (primary, accounts) = primary_without_handlers(&mut rng);
2517
2518 let round = 1;
2520 let peer_account = &accounts[1];
2521 let peer_ip = peer_account.0;
2522 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2523 let proposal = create_test_proposal(
2524 &peer_account.1,
2525 primary.ledger.current_committee().unwrap(),
2526 round,
2527 Default::default(),
2528 timestamp,
2529 1,
2530 &mut rng,
2531 );
2532
2533 for (transmission_id, transmission) in proposal.transmissions() {
2535 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2536 }
2537
2538 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2540
2541 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2543
2544 assert!(
2546 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2547 );
2548 }
2549
2550 #[tokio::test]
2551 async fn test_batch_propose_from_peer_in_round() {
2552 let round = 2;
2553 let mut rng = TestRng::default();
2554 let (primary, accounts) = primary_without_handlers(&mut rng);
2555
2556 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2558
2559 let peer_account = &accounts[1];
2561 let peer_ip = peer_account.0;
2562 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2563 let proposal = create_test_proposal(
2564 &peer_account.1,
2565 primary.ledger.current_committee().unwrap(),
2566 round,
2567 previous_certificates,
2568 timestamp,
2569 1,
2570 &mut rng,
2571 );
2572
2573 for (transmission_id, transmission) in proposal.transmissions() {
2575 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2576 }
2577
2578 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2580
2581 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2584 primary.sync.testing_only_try_block_sync_testing_only().await;
2585
2586 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2588 }
2589
2590 #[tokio::test]
2591 async fn test_batch_propose_from_peer_wrong_round() {
2592 let mut rng = TestRng::default();
2593 let (primary, accounts) = primary_without_handlers(&mut rng);
2594
2595 let round = 1;
2597 let peer_account = &accounts[1];
2598 let peer_ip = peer_account.0;
2599 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2600 let proposal = create_test_proposal(
2601 &peer_account.1,
2602 primary.ledger.current_committee().unwrap(),
2603 round,
2604 Default::default(),
2605 timestamp,
2606 1,
2607 &mut rng,
2608 );
2609
2610 for (transmission_id, transmission) in proposal.transmissions() {
2612 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2613 }
2614
2615 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2617 primary.sync.testing_only_try_block_sync_testing_only().await;
2619
2620 assert!(
2622 primary
2623 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2624 round: round + 1,
2625 batch_header: Data::Object(proposal.batch_header().clone())
2626 })
2627 .await
2628 .is_err()
2629 );
2630 }
2631
2632 #[tokio::test]
2633 async fn test_batch_propose_from_peer_in_round_wrong_round() {
2634 let round = 4;
2635 let mut rng = TestRng::default();
2636 let (primary, accounts) = primary_without_handlers(&mut rng);
2637
2638 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2640
2641 let peer_account = &accounts[1];
2643 let peer_ip = peer_account.0;
2644 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2645 let proposal = create_test_proposal(
2646 &peer_account.1,
2647 primary.ledger.current_committee().unwrap(),
2648 round,
2649 previous_certificates,
2650 timestamp,
2651 1,
2652 &mut rng,
2653 );
2654
2655 for (transmission_id, transmission) in proposal.transmissions() {
2657 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2658 }
2659
2660 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2662 primary.sync.testing_only_try_block_sync_testing_only().await;
2664
2665 assert!(
2667 primary
2668 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2669 round: round + 1,
2670 batch_header: Data::Object(proposal.batch_header().clone())
2671 })
2672 .await
2673 .is_err()
2674 );
2675 }
2676
2677 #[tokio::test]
2679 async fn test_batch_propose_from_peer_with_past_timestamp() {
2680 let round = 2;
2681 let mut rng = TestRng::default();
2682 let (primary, accounts) = primary_without_handlers(&mut rng);
2683
2684 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2686
2687 let peer_account = &accounts[1];
2689 let peer_ip = peer_account.0;
2690
2691 let last_timestamp = primary
2695 .storage
2696 .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2697 .expect("No previous proposal exists")
2698 .timestamp();
2699 let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2700
2701 let proposal = create_test_proposal(
2702 &peer_account.1,
2703 primary.ledger.current_committee().unwrap(),
2704 round,
2705 previous_certificates,
2706 invalid_timestamp,
2707 1,
2708 &mut rng,
2709 );
2710
2711 for (transmission_id, transmission) in proposal.transmissions() {
2713 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2714 }
2715
2716 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2718 primary.sync.testing_only_try_block_sync_testing_only().await;
2720
2721 assert!(
2723 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2724 );
2725 }
2726
2727 #[tokio::test]
2729 async fn test_batch_propose_from_peer_over_spend_limit() {
2730 let mut rng = TestRng::default();
2731
2732 let (accounts, committee) = sample_committee(&mut rng);
2734 let primary_v4 = primary_with_committee(
2735 0,
2736 &accounts,
2737 committee.clone(),
2738 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2739 );
2740 let primary_v5 = primary_with_committee(
2741 1,
2742 &accounts,
2743 committee.clone(),
2744 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2745 );
2746
2747 let round = 1;
2749 let peer_account = &accounts[2];
2750 let peer_ip = peer_account.0;
2751
2752 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2753
2754 let proposal =
2755 create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2756
2757 for (transmission_id, transmission) in proposal.transmissions() {
2759 primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2760 primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2761 }
2762
2763 primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2765 primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2766
2767 primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2769 primary_v4.sync.testing_only_try_block_sync_testing_only().await;
2770
2771 primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2773 primary_v5.sync.testing_only_try_block_sync_testing_only().await;
2774
2775 assert!(
2777 primary_v4
2778 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2779 .await
2780 .is_ok()
2781 );
2782
2783 assert!(
2784 primary_v5
2785 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2786 .await
2787 .is_err()
2788 );
2789 }
2790
2791 #[tokio::test]
2792 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2793 let round = 3;
2794 let mut rng = TestRng::default();
2795 let (primary, _) = primary_without_handlers(&mut rng);
2796
2797 assert!(primary.proposed_batch.read().is_none());
2799
2800 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2802 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2803
2804 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2806 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2807
2808 let old_proposal_lock_round = *primary.propose_lock.lock().await;
2810 *primary.propose_lock.lock().await = round + 1;
2811
2812 assert!(primary.propose_batch().await.is_ok());
2814 assert!(primary.proposed_batch.read().is_none());
2815
2816 *primary.propose_lock.lock().await = old_proposal_lock_round;
2818
2819 assert!(primary.propose_batch().await.is_ok());
2821 assert!(primary.proposed_batch.read().is_some());
2822 }
2823
2824 #[tokio::test]
2825 async fn test_propose_batch_with_storage_round_behind_proposal() {
2826 let round = 5;
2827 let mut rng = TestRng::default();
2828 let (primary, accounts) = primary_without_handlers(&mut rng);
2829
2830 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2832
2833 let timestamp = now();
2835 let proposal = create_test_proposal(
2836 primary.gateway.account(),
2837 primary.ledger.current_committee().unwrap(),
2838 round + 1,
2839 previous_certificates,
2840 timestamp,
2841 1,
2842 &mut rng,
2843 );
2844
2845 *primary.proposed_batch.write() = Some(proposal);
2847
2848 assert!(primary.propose_batch().await.is_ok());
2850 assert!(primary.proposed_batch.read().is_some());
2851 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2852 }
2853
2854 #[tokio::test(flavor = "multi_thread")]
2855 async fn test_batch_signature_from_peer() {
2856 let mut rng = TestRng::default();
2857 let (primary, accounts) = primary_without_handlers(&mut rng);
2858 map_account_addresses(&primary, &accounts);
2859
2860 let round = 1;
2862 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2863 let proposal = create_test_proposal(
2864 primary.gateway.account(),
2865 primary.ledger.current_committee().unwrap(),
2866 round,
2867 Default::default(),
2868 timestamp,
2869 1,
2870 &mut rng,
2871 );
2872
2873 *primary.proposed_batch.write() = Some(proposal);
2875
2876 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2878
2879 for (socket_addr, signature) in signatures {
2881 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2882 }
2883
2884 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2886 assert_eq!(primary.current_round(), round + 1);
2888 }
2889
2890 #[tokio::test(flavor = "multi_thread")]
2891 async fn test_batch_signature_from_peer_in_round() {
2892 let round = 5;
2893 let mut rng = TestRng::default();
2894 let (primary, accounts) = primary_without_handlers(&mut rng);
2895 map_account_addresses(&primary, &accounts);
2896
2897 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2899
2900 let timestamp = now();
2902 let proposal = create_test_proposal(
2903 primary.gateway.account(),
2904 primary.ledger.current_committee().unwrap(),
2905 round,
2906 previous_certificates,
2907 timestamp,
2908 1,
2909 &mut rng,
2910 );
2911
2912 *primary.proposed_batch.write() = Some(proposal);
2914
2915 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2917
2918 for (socket_addr, signature) in signatures {
2920 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2921 }
2922
2923 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2925 assert_eq!(primary.current_round(), round + 1);
2927 }
2928
2929 #[tokio::test]
2930 async fn test_batch_signature_from_peer_no_quorum() {
2931 let mut rng = TestRng::default();
2932 let (primary, accounts) = primary_without_handlers(&mut rng);
2933 map_account_addresses(&primary, &accounts);
2934
2935 let round = 1;
2937 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2938 let proposal = create_test_proposal(
2939 primary.gateway.account(),
2940 primary.ledger.current_committee().unwrap(),
2941 round,
2942 Default::default(),
2943 timestamp,
2944 1,
2945 &mut rng,
2946 );
2947
2948 *primary.proposed_batch.write() = Some(proposal);
2950
2951 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2953
2954 let (socket_addr, signature) = signatures.first().unwrap();
2956 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2957
2958 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2960 assert_eq!(primary.current_round(), round);
2962 }
2963
2964 #[tokio::test]
2965 async fn test_batch_signature_from_peer_in_round_no_quorum() {
2966 let round = 7;
2967 let mut rng = TestRng::default();
2968 let (primary, accounts) = primary_without_handlers(&mut rng);
2969 map_account_addresses(&primary, &accounts);
2970
2971 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2973
2974 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2976 let proposal = create_test_proposal(
2977 primary.gateway.account(),
2978 primary.ledger.current_committee().unwrap(),
2979 round,
2980 previous_certificates,
2981 timestamp,
2982 1,
2983 &mut rng,
2984 );
2985
2986 *primary.proposed_batch.write() = Some(proposal);
2988
2989 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2991
2992 let (socket_addr, signature) = signatures.first().unwrap();
2994 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2995
2996 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2998 assert_eq!(primary.current_round(), round);
3000 }
3001
3002 #[tokio::test]
3003 async fn test_insert_certificate_with_aborted_transmissions() {
3004 let round = 3;
3005 let prev_round = round - 1;
3006 let mut rng = TestRng::default();
3007 let (primary, accounts) = primary_without_handlers(&mut rng);
3008 let peer_account = &accounts[1];
3009 let peer_ip = peer_account.0;
3010
3011 store_certificate_chain(&primary, &accounts, round, &mut rng);
3013
3014 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
3016
3017 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
3019 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
3020
3021 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
3023 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
3024
3025 assert_eq!(primary.workers[0].num_transmissions(), 2);
3027
3028 let account = accounts[0].1.clone();
3030 let (certificate, transmissions) =
3031 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3032 let certificate_id = certificate.id();
3033
3034 let mut aborted_transmissions = HashSet::new();
3036 let mut transmissions_without_aborted = HashMap::new();
3037 for (transmission_id, transmission) in transmissions.clone() {
3038 match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
3039 true => {
3040 aborted_transmissions.insert(transmission_id);
3042 }
3043 false => {
3044 transmissions_without_aborted.insert(transmission_id, transmission);
3046 }
3047 };
3048 }
3049
3050 for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3052 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3053 }
3054
3055 assert!(
3057 primary
3058 .storage
3059 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3060 .is_err()
3061 );
3062 assert!(
3063 primary
3064 .storage
3065 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3066 .is_err()
3067 );
3068
3069 primary
3071 .storage
3072 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3073 .unwrap();
3074
3075 assert!(primary.storage.contains_certificate(certificate_id));
3077 for aborted_transmission_id in aborted_transmissions {
3079 assert!(primary.storage.contains_transmission(aborted_transmission_id));
3080 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3081 }
3082 }
3083}