1use crate::{
17 Gateway,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 PRIMARY_PING_IN_MS,
22 Sync,
23 Transport,
24 WORKER_PING_IN_MS,
25 Worker,
26 events::{BatchPropose, BatchSignature, Event},
27 helpers::{
28 BFTSender,
29 PrimaryReceiver,
30 PrimarySender,
31 Proposal,
32 ProposalCache,
33 SignedProposals,
34 Storage,
35 assign_to_worker,
36 assign_to_workers,
37 fmt_id,
38 init_sync_channels,
39 init_worker_channels,
40 now,
41 },
42 spawn_blocking,
43};
44
45use snarkos_account::Account;
46use snarkos_node_bft_events::PrimaryPing;
47use snarkos_node_bft_ledger_service::LedgerService;
48use snarkos_node_network::PeerPoolHandling;
49use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
50use snarkos_utilities::NodeDataDir;
51
52use snarkvm::{
53 console::{
54 prelude::*,
55 types::{Address, Field},
56 },
57 ledger::{
58 block::Transaction,
59 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
60 puzzle::{Solution, SolutionID},
61 },
62 prelude::{ConsensusVersion, committee::Committee},
63 utilities::flatten_error,
64};
65
66use anyhow::Context;
67use colored::Colorize;
68use futures::stream::{FuturesUnordered, StreamExt};
69use indexmap::{IndexMap, IndexSet};
70#[cfg(feature = "locktick")]
71use locktick::{
72 parking_lot::{Mutex, RwLock},
73 tokio::Mutex as TMutex,
74};
75#[cfg(not(feature = "locktick"))]
76use parking_lot::{Mutex, RwLock};
77#[cfg(not(feature = "serial"))]
78use rayon::prelude::*;
79use std::{
80 collections::{HashMap, HashSet},
81 future::Future,
82 net::SocketAddr,
83 sync::Arc,
84 time::Duration,
85};
86#[cfg(not(feature = "locktick"))]
87use tokio::sync::Mutex as TMutex;
88use tokio::{sync::OnceCell, task::JoinHandle};
89
90pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
92
93#[derive(Clone)]
96pub struct Primary<N: Network> {
97 sync: Sync<N>,
99 gateway: Gateway<N>,
101 storage: Storage<N>,
103 ledger: Arc<dyn LedgerService<N>>,
105 workers: Arc<[Worker<N>]>,
107 bft_sender: Arc<OnceCell<BFTSender<N>>>,
109 proposed_batch: Arc<ProposedBatch<N>>,
111 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
113 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
115 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
117 propose_lock: Arc<TMutex<u64>>,
119 node_data_dir: NodeDataDir,
121}
122
123impl<N: Network> Primary<N> {
124 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
126
127 #[allow(clippy::too_many_arguments)]
129 pub fn new(
130 account: Account<N>,
131 storage: Storage<N>,
132 ledger: Arc<dyn LedgerService<N>>,
133 block_sync: Arc<BlockSync<N>>,
134 ip: Option<SocketAddr>,
135 trusted_validators: &[SocketAddr],
136 trusted_peers_only: bool,
137 node_data_dir: NodeDataDir,
138 dev: Option<u16>,
139 ) -> Result<Self> {
140 let gateway = Gateway::new(
142 account,
143 storage.clone(),
144 ledger.clone(),
145 ip,
146 trusted_validators,
147 trusted_peers_only,
148 node_data_dir.clone(),
149 dev,
150 )?;
151 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
153
154 Ok(Self {
156 sync,
157 gateway,
158 storage,
159 ledger,
160 workers: Arc::from(vec![]),
161 bft_sender: Default::default(),
162 proposed_batch: Default::default(),
163 latest_proposed_batch_timestamp: Default::default(),
164 signed_proposals: Default::default(),
165 handles: Default::default(),
166 propose_lock: Default::default(),
167 node_data_dir,
168 })
169 }
170
171 async fn load_proposal_cache(&self) -> Result<()> {
173 match ProposalCache::<N>::exists(&self.node_data_dir) {
175 true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.node_data_dir) {
177 Ok(proposal_cache) => {
178 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
180 proposal_cache.into();
181
182 *self.proposed_batch.write() = proposed_batch;
184 *self.signed_proposals.write() = signed_proposals;
186 *self.propose_lock.lock().await = latest_certificate_round;
188
189 for certificate in pending_certificates {
191 let batch_id = certificate.batch_id();
192 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
196 {
197 let err = err.context(format!(
198 "Failed to load stored certificate {} from proposal cache",
199 fmt_id(batch_id)
200 ));
201 warn!("{}", &flatten_error(err));
202 }
203 }
204 Ok(())
205 }
206 Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
207 },
208 false => Ok(()),
210 }
211 }
212
213 pub async fn run(
215 &mut self,
216 ping: Option<Arc<Ping<N>>>,
217 bft_sender: Option<BFTSender<N>>,
218 primary_sender: PrimarySender<N>,
219 primary_receiver: PrimaryReceiver<N>,
220 ) -> Result<()> {
221 info!("Starting the primary instance of the memory pool...");
222
223 if let Some(bft_sender) = &bft_sender {
225 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
227 }
228
229 let mut worker_senders = IndexMap::new();
231 let mut workers = Vec::new();
233 for id in 0..MAX_WORKERS {
235 let (tx_worker, rx_worker) = init_worker_channels();
237 let worker = Worker::new(
239 id,
240 Arc::new(self.gateway.clone()),
241 self.storage.clone(),
242 self.ledger.clone(),
243 self.proposed_batch.clone(),
244 )?;
245 worker.run(rx_worker);
247 workers.push(worker);
249 worker_senders.insert(id, tx_worker);
251 }
252 self.workers = Arc::from(workers);
254
255 let (sync_sender, sync_receiver) = init_sync_channels();
257 self.sync.initialize(bft_sender).await?;
259 self.load_proposal_cache().await?;
261 self.sync.run(ping, sync_receiver).await?;
263 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
265 self.start_handlers(primary_receiver);
268
269 Ok(())
270 }
271
272 pub fn current_round(&self) -> u64 {
274 self.storage.current_round()
275 }
276
277 pub fn is_synced(&self) -> bool {
279 self.sync.is_synced()
280 }
281
282 pub const fn gateway(&self) -> &Gateway<N> {
284 &self.gateway
285 }
286
287 pub const fn storage(&self) -> &Storage<N> {
289 &self.storage
290 }
291
292 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
294 &self.ledger
295 }
296
297 pub fn num_workers(&self) -> u8 {
299 u8::try_from(self.workers.len()).expect("Too many workers")
300 }
301
302 pub const fn workers(&self) -> &Arc<[Worker<N>]> {
304 &self.workers
305 }
306
307 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
309 &self.proposed_batch
310 }
311}
312
313impl<N: Network> Primary<N> {
314 pub fn num_unconfirmed_transmissions(&self) -> usize {
316 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
317 }
318
319 pub fn num_unconfirmed_ratifications(&self) -> usize {
321 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
322 }
323
324 pub fn num_unconfirmed_solutions(&self) -> usize {
326 self.workers.iter().map(|worker| worker.num_solutions()).sum()
327 }
328
329 pub fn num_unconfirmed_transactions(&self) -> usize {
331 self.workers.iter().map(|worker| worker.num_transactions()).sum()
332 }
333}
334
335impl<N: Network> Primary<N> {
336 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
338 self.workers.iter().flat_map(|worker| worker.transmission_ids())
339 }
340
341 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
343 self.workers.iter().flat_map(|worker| worker.transmissions())
344 }
345
346 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
348 self.workers.iter().flat_map(|worker| worker.solutions())
349 }
350
351 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
353 self.workers.iter().flat_map(|worker| worker.transactions())
354 }
355}
356
357impl<N: Network> Primary<N> {
358 pub fn clear_worker_solutions(&self) {
360 self.workers.iter().for_each(Worker::clear_solutions);
361 }
362}
363
364impl<N: Network> Primary<N> {
365 pub async fn propose_batch(&self) -> Result<()> {
373 let mut lock_guard = self.propose_lock.lock().await;
375
376 if let Err(err) = self
378 .check_proposed_batch_for_expiration()
379 .await
380 .with_context(|| "Failed to check the proposed batch for expiration")
381 {
382 warn!("{}", flatten_error(&err));
383 return Ok(());
384 }
385
386 let round = self.current_round();
388 let previous_round = round.saturating_sub(1);
390
391 ensure!(round > 0, "Round 0 cannot have transaction batches");
395
396 if round < *lock_guard {
398 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
399 return Ok(());
400 }
401
402 if let Some(proposal) = self.proposed_batch.read().as_ref() {
405 if round < proposal.round()
407 || proposal
408 .batch_header()
409 .previous_certificate_ids()
410 .iter()
411 .any(|id| !self.storage.contains_certificate(*id))
412 {
413 warn!(
414 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
415 proposal.round(),
416 );
417 return Ok(());
418 }
419 let event = Event::BatchPropose(proposal.batch_header().clone().into());
422 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
424 match self.gateway.resolver().read().get_peer_ip_for_address(address) {
426 Some(peer_ip) => {
428 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
429 tokio::spawn(async move {
430 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
431 if gateway.send(peer_ip, event_).await.is_none() {
433 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
434 }
435 });
436 }
437 None => continue,
438 }
439 }
440 debug!("Proposed batch for round {} is still valid", proposal.round());
441 return Ok(());
442 }
443
444 #[cfg(feature = "metrics")]
445 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
446
447 if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
449 debug!(
450 "{}",
451 flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}")))
452 );
453 return Ok(());
454 }
455
456 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
458 if let Some(bft_sender) = self.bft_sender.get() {
460 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
461 Ok(true) => (), Ok(false) => return Ok(()),
465 Err(err) => {
467 let err = err.context("Failed to update the BFT to the next round");
468 warn!("{}", &flatten_error(&err));
469 return Err(err);
470 }
471 }
472 }
473 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
474 return Ok(());
475 }
476
477 if round == *lock_guard {
483 debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
484 return Ok(());
485 }
486
487 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
489 {
491 let mut connected_validators = self.gateway.connected_addresses();
493 connected_validators.insert(self.gateway.account().address());
495 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
497 debug!(
498 "Primary is safely skipping a batch proposal for round {round} {}",
499 "(please connect to more validators)".dimmed()
500 );
501 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
502 return Ok(());
503 }
504 }
505
506 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
508
509 let mut is_ready = previous_round == 0;
512 if previous_round > 0 {
514 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
516 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
517 };
518 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
520 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
522 is_ready = true;
523 }
524 }
525 if !is_ready {
527 debug!(
528 "Primary is safely skipping a batch proposal for round {round} {}",
529 format!("(previous round {previous_round} has not reached quorum)").dimmed()
530 );
531 return Ok(());
532 }
533
534 let mut transmissions: IndexMap<_, _> = Default::default();
536 let mut proposal_cost = 0u64;
538 debug_assert_eq!(MAX_WORKERS, 1);
542
543 'outer: for worker in self.workers().iter() {
544 let mut num_worker_transmissions = 0usize;
545
546 while let Some((id, transmission)) = worker.remove_front() {
547 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
549 worker.insert_front(id, transmission);
551 break 'outer;
552 }
553
554 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
556 worker.insert_front(id, transmission);
558 continue 'outer;
559 }
560
561 if self.ledger.contains_transmission(&id).unwrap_or(true) {
563 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
564 continue;
565 }
566
567 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
571 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
572 continue;
573 }
574
575 match (id, transmission.clone()) {
577 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
578 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
580 {
581 trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
582 continue;
583 }
584 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
586 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
587 continue;
588 }
589 }
590 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
591 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
593 {
594 trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
595 continue;
596 }
597
598 let transaction = spawn_blocking!({
600 match transaction {
601 Data::Object(transaction) => Ok(transaction),
602 Data::Buffer(bytes) => {
603 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
604 }
605 }
606 })?;
607
608 let current_block_height = self.ledger.latest_block_height();
612 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
613 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
614 let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
615 if current_block_height > consensus_version_v7_height
616 && current_block_height <= consensus_version_v8_height
617 && transaction.is_deploy()
618 {
619 trace!(
620 "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})",
621 fmt_id(transaction_id)
622 );
623 continue;
624 }
625
626 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
629 else {
630 debug!(
631 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
632 fmt_id(transaction_id)
633 );
634 continue;
635 };
636
637 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
639 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
640 continue;
641 }
642
643 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
646 debug!(
647 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
648 fmt_id(transaction_id)
649 );
650 continue;
651 };
652
653 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
655 if next_proposal_cost > batch_spend_limit {
656 debug!(
657 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
658 fmt_id(transaction_id),
659 batch_spend_limit
660 );
661
662 worker.insert_front(id, transmission);
664 break 'outer;
665 }
666
667 proposal_cost = next_proposal_cost;
669 }
670
671 (TransmissionID::Ratification, Transmission::Ratification) => continue,
674 _ => continue,
676 }
677
678 transmissions.insert(id, transmission);
680 num_worker_transmissions = num_worker_transmissions.saturating_add(1);
681 }
682 }
683
684 let current_timestamp = now();
686
687 *lock_guard = round;
688
689 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
691
692 let private_key = *self.gateway.account().private_key();
694 let committee_id = committee_lookback.id();
696 let transmission_ids = transmissions.keys().copied().collect();
698 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
700 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
702 &private_key,
703 round,
704 current_timestamp,
705 committee_id,
706 transmission_ids,
707 previous_certificate_ids,
708 &mut rand::thread_rng()
709 ))
710 .and_then(|batch_header| {
711 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
712 .map(|proposal| (batch_header, proposal))
713 })
714 .inspect_err(|_| {
715 if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
717 error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
718 }
719 })?;
720 self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
722 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
724 *self.proposed_batch.write() = Some(proposal);
726 Ok(())
727 }
728
729 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
739 let BatchPropose { round: batch_round, batch_header } = batch_propose;
740
741 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
743 if batch_round != batch_header.round() {
745 self.gateway.disconnect(peer_ip);
747 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
748 }
749
750 let batch_author = batch_header.author();
752
753 match self.gateway.resolve_to_aleo_addr(peer_ip) {
755 Some(address) => {
757 if address != batch_author {
758 self.gateway.disconnect(peer_ip);
760 bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
761 }
762 }
763 None => bail!("Batch proposal from a disconnected validator"),
764 }
765 if !self.gateway.is_authorized_validator_address(batch_author) {
767 self.gateway.disconnect(peer_ip);
769 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
770 }
771 if self.gateway.account().address() == batch_author {
773 bail!("Invalid peer - proposed batch from myself ({batch_author})");
774 }
775
776 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
778 if expected_committee_id != batch_header.committee_id() {
779 self.gateway.disconnect(peer_ip);
781 bail!(
782 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
783 batch_header.committee_id()
784 );
785 }
786
787 if let Some((signed_round, signed_batch_id, signature)) =
789 self.signed_proposals.read().get(&batch_author).copied()
790 {
791 if signed_round > batch_header.round() {
794 bail!(
795 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
796 batch_header.round()
797 );
798 }
799
800 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
802 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
803 }
804 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
807 let gateway = self.gateway.clone();
808 tokio::spawn(async move {
809 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
810 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
811 if gateway.send(peer_ip, event).await.is_none() {
813 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
814 }
815 });
816 return Ok(());
818 }
819 }
820
821 if self.storage.contains_batch(batch_header.batch_id()) {
824 debug!(
825 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
826 format!("batch for round {batch_round} already exists in storage").dimmed()
827 );
828 return Ok(());
829 }
830
831 let previous_round = batch_round.saturating_sub(1);
833 if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
835 self.gateway.disconnect(peer_ip);
837 return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
838 }
839
840 if batch_header.contains(TransmissionID::Ratification) {
842 self.gateway.disconnect(peer_ip);
844 bail!(
845 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
846 );
847 }
848
849 let mut missing_transmissions =
851 self.sync_with_batch_header_from_peer::<false, true>(peer_ip, &batch_header).await?;
852
853 if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
855 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
857 }) {
858 let err = err.context(format!(
859 "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
860 ));
861 debug!("{}", flatten_error(err));
862 return Ok(());
863 }
864
865 if let Err(e) = self.ensure_is_signing_round(batch_round) {
869 debug!("{e} from '{peer_ip}'");
871 return Ok(());
872 }
873
874 let (storage, header) = (self.storage.clone(), batch_header.clone());
876
877 let Some(missing_transmissions) =
879 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
880 else {
881 return Ok(());
882 };
883
884 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
886
887 let block_height = self.ledger.latest_block_height() + 1;
889 if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
890 let mut proposal_cost = 0u64;
891 for transmission_id in batch_header.transmission_ids() {
892 let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
893 let Some(worker) = self.workers.get(worker_id as usize) else {
894 debug!("Unable to find worker {worker_id}");
895 return Ok(());
896 };
897
898 let Some(transmission) = worker.get_transmission(*transmission_id) else {
899 debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
900 return Ok(());
901 };
902
903 if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
905 (transmission_id, transmission)
906 {
907 let transaction = spawn_blocking!({
909 match transaction {
910 Data::Object(transaction) => Ok(transaction),
911 Data::Buffer(bytes) => {
912 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
913 }
914 }
915 })?;
916
917 let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
921 let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
922 let consensus_version = N::CONSENSUS_VERSION(block_height)?;
923 if block_height > consensus_version_v7_height
924 && block_height <= consensus_version_v8_height
925 && transaction.is_deploy()
926 {
927 bail!(
928 "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})",
929 )
930 }
931
932 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
935 else {
936 bail!(
937 "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
938 fmt_id(transaction_id)
939 )
940 };
941
942 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
945 bail!(
946 "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
947 fmt_id(transaction_id)
948 )
949 };
950
951 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
953 if next_proposal_cost > batch_spend_limit {
954 bail!(
955 "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
956 fmt_id(transaction_id),
957 batch_spend_limit
958 );
959 }
960
961 proposal_cost = next_proposal_cost;
963 }
964 }
965 }
966
967 let batch_id = batch_header.batch_id();
971 let account = self.gateway.account().clone();
973 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
974
975 match self.signed_proposals.write().0.entry(batch_author) {
981 std::collections::hash_map::Entry::Occupied(mut entry) => {
982 if entry.get().0 == batch_round {
987 return Ok(());
988 }
989 entry.insert((batch_round, batch_id, signature));
991 }
992 std::collections::hash_map::Entry::Vacant(entry) => {
994 entry.insert((batch_round, batch_id, signature));
996 }
997 };
998
999 let self_ = self.clone();
1001 tokio::spawn(async move {
1002 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1003 if self_.gateway.send(peer_ip, event).await.is_some() {
1005 debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1006 }
1007 });
1008
1009 Ok(())
1010 }
1011
1012 async fn process_batch_signature_from_peer(
1021 &self,
1022 peer_ip: SocketAddr,
1023 batch_signature: BatchSignature<N>,
1024 ) -> Result<()> {
1025 self.check_proposed_batch_for_expiration().await?;
1027
1028 let BatchSignature { batch_id, signature } = batch_signature;
1030
1031 let signer = signature.to_address();
1033
1034 if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1036 self.gateway.disconnect(peer_ip);
1038 bail!("Malicious peer - batch signature is from a different validator ({signer})");
1039 }
1040 if self.gateway.account().address() == signer {
1042 bail!("Invalid peer - received a batch signature from myself ({signer})");
1043 }
1044
1045 let self_ = self.clone();
1046 let Some(proposal) = spawn_blocking!({
1047 let mut proposed_batch = self_.proposed_batch.write();
1049 match proposed_batch.as_mut() {
1051 Some(proposal) => {
1052 if proposal.batch_id() != batch_id {
1054 match self_.storage.contains_batch(batch_id) {
1055 true => {
1057 debug!(
1058 "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1059 proposal.round()
1060 );
1061 return Ok(None);
1062 }
1063 false => bail!(
1065 "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1066 proposal.batch_id(),
1067 proposal.round()
1068 ),
1069 }
1070 }
1071 let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1073 let Some(signer) = self_.gateway.resolve_to_aleo_addr(peer_ip) else {
1075 bail!("Signature is from a disconnected validator");
1076 };
1077 let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1079
1080 if new_signature {
1081 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1082 if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1084 return Ok(None);
1086 }
1087 } else {
1088 debug!(
1089 "Received duplicated signature from '{peer_ip}' for batch {batch_id} in round {round}",
1090 round = proposal.round()
1091 );
1092 return Ok(None);
1093 }
1094 }
1095 None => return Ok(None),
1097 };
1098 match proposed_batch.take() {
1100 Some(proposal) => Ok(Some(proposal)),
1101 None => Ok(None),
1102 }
1103 })?
1104 else {
1105 return Ok(());
1106 };
1107
1108 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1111
1112 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1114 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1117 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1119 return Err(e);
1120 }
1121
1122 #[cfg(feature = "metrics")]
1123 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1124 Ok(())
1125 }
1126
1127 async fn process_batch_certificate_from_peer(
1134 &self,
1135 peer_ip: SocketAddr,
1136 certificate: BatchCertificate<N>,
1137 ) -> Result<()> {
1138 if !self.gateway.is_authorized_validator_ip(peer_ip) {
1140 self.gateway.disconnect(peer_ip);
1142 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1143 }
1144 if self.storage.contains_certificate(certificate.id()) {
1146 return Ok(());
1147 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1149 self.storage.insert_unprocessed_certificate(certificate.clone())?;
1150 }
1151
1152 let author = certificate.author();
1154 let certificate_round = certificate.round();
1156 let committee_id = certificate.committee_id();
1158
1159 if self.gateway.account().address() == author {
1161 bail!("Received a batch certificate for myself ({author})");
1162 }
1163
1164 self.storage.check_incoming_certificate(&certificate)?;
1166
1167 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1179
1180 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1185
1186 let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1188 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1190
1191 let expected_committee_id = committee_lookback.id();
1193 if expected_committee_id != committee_id {
1194 self.gateway.disconnect(peer_ip);
1196 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1197 }
1198
1199 let should_advance = match &*self.proposed_batch.read() {
1203 Some(proposal) => proposal.round() < certificate_round,
1205 None => true,
1207 };
1208
1209 let current_round = self.current_round();
1211
1212 if is_quorum && should_advance && certificate_round >= current_round {
1214 self.try_increment_to_the_next_round(current_round + 1).await?;
1216 }
1217 Ok(())
1218 }
1219}
1220
1221impl<N: Network> Primary<N> {
1222 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1231 let PrimaryReceiver {
1232 mut rx_batch_propose,
1233 mut rx_batch_signature,
1234 mut rx_batch_certified,
1235 mut rx_primary_ping,
1236 mut rx_unconfirmed_solution,
1237 mut rx_unconfirmed_transaction,
1238 } = primary_receiver;
1239
1240 let self_ = self.clone();
1242 self.spawn(async move {
1243 loop {
1244 tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1246
1247 let self__ = self_.clone();
1249 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1250 Ok(block_locators) => block_locators,
1251 Err(e) => {
1252 warn!("Failed to retrieve block locators - {e}");
1253 continue;
1254 }
1255 };
1256
1257 let primary_certificate = {
1259 let primary_address = self_.gateway.account().address();
1261
1262 let mut certificate = None;
1264 let mut current_round = self_.current_round();
1265 while certificate.is_none() {
1266 if current_round == 0 {
1268 break;
1269 }
1270 if let Some(primary_certificate) =
1272 self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1273 {
1274 certificate = Some(primary_certificate);
1275 } else {
1277 current_round = current_round.saturating_sub(1);
1278 }
1279 }
1280
1281 match certificate {
1283 Some(certificate) => certificate,
1284 None => continue,
1286 }
1287 };
1288
1289 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1291 self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1293 }
1294 });
1295
1296 let self_ = self.clone();
1298 self.spawn(async move {
1299 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1300 if self_.sync.is_synced() {
1302 trace!("Processing new primary ping from '{peer_ip}'");
1303 } else {
1304 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1305 continue;
1306 }
1307
1308 {
1310 let self_ = self_.clone();
1311 tokio::spawn(async move {
1312 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1314 else {
1315 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1316 return;
1317 };
1318 let id = fmt_id(primary_certificate.id());
1320 let round = primary_certificate.round();
1321 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1322 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1323 }
1324 });
1325 }
1326 }
1327 });
1328
1329 let self_ = self.clone();
1331 self.spawn(async move {
1332 loop {
1333 tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1334 if !self_.sync.is_synced() {
1336 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1337 continue;
1338 }
1339 for worker in self_.workers.iter() {
1341 worker.broadcast_ping();
1342 }
1343 }
1344 });
1345
1346 let self_ = self.clone();
1348 self.spawn(async move {
1349 loop {
1350 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1352 let current_round = self_.current_round();
1353 if !self_.sync.is_synced() {
1355 debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1356 continue;
1357 }
1358 if self_.propose_lock.try_lock().is_err() {
1361 trace!(
1362 "Skipping batch proposal for round {current_round} {}",
1363 "(node is already proposing)".dimmed()
1364 );
1365 continue;
1366 };
1367 if let Err(e) = self_.propose_batch().await {
1371 warn!("Cannot propose a batch - {e}");
1372 }
1373 }
1374 });
1375
1376 let self_ = self.clone();
1378 self.spawn(async move {
1379 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1380 if !self_.sync.is_synced() {
1382 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1383 continue;
1384 }
1385 let self_ = self_.clone();
1387 tokio::spawn(async move {
1388 let round = batch_propose.round;
1390 if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1391 warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1392 }
1393 });
1394 }
1395 });
1396
1397 let self_ = self.clone();
1399 self.spawn(async move {
1400 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1401 if !self_.sync.is_synced() {
1403 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1404 continue;
1405 }
1406 let id = fmt_id(batch_signature.batch_id);
1412 if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1413 let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1414 warn!("{}", flatten_error(err));
1415 }
1416 }
1417 });
1418
1419 let self_ = self.clone();
1421 self.spawn(async move {
1422 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1423 if !self_.sync.is_synced() {
1425 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1426 continue;
1427 }
1428 let self_ = self_.clone();
1430 tokio::spawn(async move {
1431 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1433 warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1434 return;
1435 };
1436 let id = fmt_id(batch_certificate.id());
1438 let round = batch_certificate.round();
1439 if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1440 warn!(
1441 "{}",
1442 flatten_error(err.context(format!(
1443 "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1444 )))
1445 );
1446 }
1447 });
1448 }
1449 });
1450
1451 let self_ = self.clone();
1456 self.spawn(async move {
1457 loop {
1458 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1460 if !self_.sync.is_synced() {
1462 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1463 continue;
1464 }
1465 let current_round = self_.current_round();
1467 let next_round = current_round.saturating_add(1);
1468 let is_quorum_threshold_reached = {
1470 let authors = self_.storage.get_certificate_authors_for_round(current_round);
1472 if authors.is_empty() {
1474 continue;
1475 }
1476 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1478 warn!("Failed to retrieve the committee lookback for round {current_round}");
1479 continue;
1480 };
1481 committee_lookback.is_quorum_threshold_reached(&authors)
1483 };
1484 if is_quorum_threshold_reached {
1486 debug!("Quorum threshold reached for round {current_round}");
1487 if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1488 warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1489 }
1490 }
1491 }
1492 });
1493
1494 let self_ = self.clone();
1496 self.spawn(async move {
1497 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1498 let Ok(checksum) = solution.to_checksum::<N>() else {
1500 error!("Failed to compute the checksum for the unconfirmed solution");
1501 continue;
1502 };
1503 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1505 error!("Unable to determine the worker ID for the unconfirmed solution");
1506 continue;
1507 };
1508 let self_ = self_.clone();
1509 tokio::spawn(async move {
1510 let worker = &self_.workers[worker_id as usize];
1512 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1514 callback.send(result).ok();
1516 });
1517 }
1518 });
1519
1520 let self_ = self.clone();
1522 self.spawn(async move {
1523 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1524 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1525 let Ok(checksum) = transaction.to_checksum::<N>() else {
1527 error!("Failed to compute the checksum for the unconfirmed transaction");
1528 continue;
1529 };
1530 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1532 error!("Unable to determine the worker ID for the unconfirmed transaction");
1533 continue;
1534 };
1535 let self_ = self_.clone();
1536 tokio::spawn(async move {
1537 let worker = &self_.workers[worker_id as usize];
1539 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1541 callback.send(result).ok();
1543 });
1544 }
1545 });
1546 }
1547
1548 async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1550 let is_expired = match self.proposed_batch.read().as_ref() {
1552 Some(proposal) => proposal.round() < self.current_round(),
1553 None => false,
1554 };
1555 if is_expired {
1557 let proposal = self.proposed_batch.write().take();
1559 if let Some(proposal) = proposal {
1560 debug!("Cleared expired proposal for round {}", proposal.round());
1561 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1562 }
1563 }
1564 Ok(())
1565 }
1566
1567 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1569 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1571 let mut fast_forward_round = self.current_round();
1572 while fast_forward_round < next_round.saturating_sub(1) {
1574 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1576 *self.proposed_batch.write() = None;
1578 }
1579 }
1580
1581 let current_round = self.current_round();
1583 if current_round < next_round {
1585 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1587 match bft_sender.send_primary_round_to_bft(current_round).await {
1588 Ok(is_ready) => is_ready,
1589 Err(err) => {
1590 let err = err.context("Failed to update the BFT to the next round");
1591 warn!("{}", flatten_error(&err));
1592 return Err(err);
1593 }
1594 }
1595 }
1596 else {
1598 self.storage.increment_to_next_round(current_round)?;
1600 true
1602 };
1603
1604 match is_ready {
1606 true => debug!("Primary is ready to propose the next round"),
1607 false => debug!("Primary is not ready to propose the next round"),
1608 }
1609
1610 if is_ready {
1612 self.propose_batch().await?;
1613 }
1614 }
1615 Ok(())
1616 }
1617
1618 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1622 let current_round = self.current_round();
1624 if current_round + self.storage.max_gc_rounds() <= batch_round {
1626 bail!("Round {batch_round} is too far in the future")
1627 }
1628 if current_round > batch_round + 1 {
1632 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1633 }
1634 if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1636 if signing_round > batch_round {
1637 bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1638 }
1639 }
1640 Ok(())
1641 }
1642
1643 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1646 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1648 Some(certificate) => certificate.timestamp(),
1650 None => match self.gateway.account().address() == author {
1651 true => *self.latest_proposed_batch_timestamp.read(),
1653 false => return Ok(()),
1655 },
1656 };
1657
1658 let elapsed = timestamp
1660 .checked_sub(previous_timestamp)
1661 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1662 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1664 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1665 false => Ok(()),
1666 }
1667 }
1668
1669 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1671 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1673 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1676 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1678 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1679 debug!("Stored a batch certificate for round {}", certificate.round());
1680 if let Some(bft_sender) = self.bft_sender.get() {
1682 if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1684 let err = err.context("Failed to update the BFT DAG from primary");
1685 warn!("{}", flatten_error(&err));
1686 return Err(err);
1687 };
1688 }
1689 self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1691 let num_transmissions = certificate.transmission_ids().len();
1693 let round = certificate.round();
1694 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1695 self.try_increment_to_the_next_round(round + 1).await
1697 }
1698
1699 fn insert_missing_transmissions_into_workers(
1701 &self,
1702 peer_ip: SocketAddr,
1703 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1704 ) -> Result<()> {
1705 assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1707 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1708 })
1709 }
1710
1711 fn reinsert_transmissions_into_workers(
1713 &self,
1714 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1715 ) -> Result<()> {
1716 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1718 worker.reinsert(transmission_id, transmission);
1719 })
1720 }
1721
1722 #[async_recursion::async_recursion]
1732 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1733 &self,
1734 peer_ip: SocketAddr,
1735 certificate: BatchCertificate<N>,
1736 ) -> Result<()> {
1737 let batch_header = certificate.batch_header();
1739 let batch_round = batch_header.round();
1741
1742 if batch_round <= self.storage.gc_round() {
1744 return Ok(());
1745 }
1746 if self.storage.contains_certificate(certificate.id()) {
1748 return Ok(());
1749 }
1750
1751 if !IS_SYNCING && !self.is_synced() {
1753 bail!(
1754 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1755 fmt_id(certificate.id())
1756 );
1757 }
1758
1759 let missing_transmissions =
1761 self.sync_with_batch_header_from_peer::<IS_SYNCING, false>(peer_ip, batch_header).await?;
1762
1763 if !self.storage.contains_certificate(certificate.id()) {
1765 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1767 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1768 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1769 if let Some(bft_sender) = self.bft_sender.get() {
1771 if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1773 let err = err.context("Failed to update the BFT DAG from sync");
1774 warn!("{}", &flatten_error(&err));
1775 return Err(err);
1776 };
1777 }
1778 }
1779 Ok(())
1780 }
1781
1782 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool, const CHECK_PREVIOUS_CERTIFICATES: bool>(
1784 &self,
1785 peer_ip: SocketAddr,
1786 batch_header: &BatchHeader<N>,
1787 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1788 let batch_round = batch_header.round();
1790
1791 if batch_round <= self.storage.gc_round() {
1793 bail!("Round {batch_round} is too far in the past")
1794 }
1795
1796 if !IS_SYNCING && !self.is_synced() {
1798 bail!(
1799 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1800 fmt_id(batch_header.batch_id())
1801 );
1802 }
1803
1804 let is_quorum_threshold_reached = {
1806 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1807 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1808 committee_lookback.is_quorum_threshold_reached(&authors)
1809 };
1810
1811 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1816 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1818 if is_behind_schedule || is_peer_far_in_future {
1820 self.try_increment_to_the_next_round(batch_round).await?;
1822 }
1823
1824 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1826
1827 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1829
1830 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1832 missing_transmissions_handle,
1833 missing_previous_certificates_handle,
1834 ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1835
1836 for batch_certificate in missing_previous_certificates {
1838 if CHECK_PREVIOUS_CERTIFICATES {
1843 self.storage.check_incoming_certificate(&batch_certificate)?;
1844 }
1845 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1847 }
1848 Ok(missing_transmissions)
1849 }
1850
1851 async fn fetch_missing_transmissions(
1854 &self,
1855 peer_ip: SocketAddr,
1856 batch_header: &BatchHeader<N>,
1857 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1858 if batch_header.round() <= self.storage.gc_round() {
1860 return Ok(Default::default());
1861 }
1862
1863 if self.storage.contains_batch(batch_header.batch_id()) {
1865 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1866 return Ok(Default::default());
1867 }
1868
1869 let workers = self.workers.clone();
1871
1872 let mut fetch_transmissions = FuturesUnordered::new();
1874
1875 let num_workers = self.num_workers();
1877 for transmission_id in batch_header.transmission_ids() {
1879 if !self.storage.contains_transmission(*transmission_id) {
1881 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1883 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1884 };
1885 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1887 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1889 }
1890 }
1891
1892 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1894 while let Some(result) = fetch_transmissions.next().await {
1896 let (transmission_id, transmission) = result?;
1898 transmissions.insert(transmission_id, transmission);
1900 }
1901 Ok(transmissions)
1903 }
1904
1905 async fn fetch_missing_previous_certificates(
1907 &self,
1908 peer_ip: SocketAddr,
1909 batch_header: &BatchHeader<N>,
1910 ) -> Result<HashSet<BatchCertificate<N>>> {
1911 let round = batch_header.round();
1913 if round == 1 || round <= self.storage.gc_round() + 1 {
1915 return Ok(Default::default());
1916 }
1917
1918 let missing_previous_certificates =
1920 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1921 if !missing_previous_certificates.is_empty() {
1922 debug!(
1923 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1924 missing_previous_certificates.len(),
1925 );
1926 }
1927 Ok(missing_previous_certificates)
1929 }
1930
1931 async fn fetch_missing_certificates(
1933 &self,
1934 peer_ip: SocketAddr,
1935 round: u64,
1936 certificate_ids: &IndexSet<Field<N>>,
1937 ) -> Result<HashSet<BatchCertificate<N>>> {
1938 let mut fetch_certificates = FuturesUnordered::new();
1940 let mut missing_certificates = HashSet::default();
1942 for certificate_id in certificate_ids {
1944 if self.ledger.contains_certificate(certificate_id)? {
1946 continue;
1947 }
1948 if self.storage.contains_certificate(*certificate_id) {
1950 continue;
1951 }
1952 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1954 missing_certificates.insert(certificate);
1955 } else {
1956 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1958 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1961 }
1962 }
1963
1964 match fetch_certificates.is_empty() {
1966 true => return Ok(missing_certificates),
1967 false => trace!(
1968 "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1969 fetch_certificates.len(),
1970 ),
1971 }
1972
1973 while let Some(result) = fetch_certificates.next().await {
1975 missing_certificates.insert(result?);
1977 }
1978 Ok(missing_certificates)
1980 }
1981}
1982
1983impl<N: Network> Primary<N> {
1984 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1986 self.handles.lock().push(tokio::spawn(future));
1987 }
1988
1989 pub async fn shut_down(&self) {
1991 info!("Shutting down the primary...");
1992 self.workers.iter().for_each(|worker| worker.shut_down());
1994 self.handles.lock().iter().for_each(|handle| handle.abort());
1996 let proposal_cache = {
1998 let proposal = self.proposed_batch.write().take();
1999 let signed_proposals = self.signed_proposals.read().clone();
2000 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
2001 let pending_certificates = self.storage.get_pending_certificates();
2002 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
2003 };
2004 if let Err(err) = proposal_cache.store(&self.node_data_dir) {
2005 error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
2006 }
2007 self.gateway.shut_down().await;
2009 }
2010}
2011
2012#[cfg(test)]
2013mod tests {
2014 use super::*;
2015 use snarkos_node_bft_ledger_service::MockLedgerService;
2016 use snarkos_node_bft_storage_service::BFTMemoryService;
2017 use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
2018 use snarkvm::{
2019 ledger::{
2020 committee::{Committee, MIN_VALIDATOR_STAKE},
2021 test_helpers::sample_execution_transaction_with_fee,
2022 },
2023 prelude::{Address, Signature},
2024 };
2025
2026 use bytes::Bytes;
2027 use indexmap::IndexSet;
2028 use rand::RngCore;
2029
2030 type CurrentNetwork = snarkvm::prelude::MainnetV0;
2031
2032 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2033 const COMMITTEE_SIZE: usize = 4;
2035 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2036 let mut members = IndexMap::new();
2037
2038 for i in 0..COMMITTEE_SIZE {
2039 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2040 let account = Account::new(rng).unwrap();
2041
2042 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
2043 accounts.push((socket_addr, account));
2044 }
2045
2046 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2047 }
2048
2049 fn primary_with_committee(
2051 account_index: usize,
2052 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2053 committee: Committee<CurrentNetwork>,
2054 height: u32,
2055 ) -> Primary<CurrentNetwork> {
2056 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2057 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2058
2059 let account = accounts[account_index].1.clone();
2061 let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2062 let mut primary =
2063 Primary::new(account, storage, ledger, block_sync, None, &[], false, NodeDataDir::new_test(None), None)
2064 .unwrap();
2065
2066 primary.workers = Arc::from([Worker::new(
2068 0, Arc::new(primary.gateway.clone()),
2070 primary.storage.clone(),
2071 primary.ledger.clone(),
2072 primary.proposed_batch.clone(),
2073 )
2074 .unwrap()]);
2075 for a in accounts.iter().skip(account_index) {
2076 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2077 }
2078
2079 primary
2080 }
2081
2082 fn primary_without_handlers(
2083 rng: &mut TestRng,
2084 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2085 let (accounts, committee) = sample_committee(rng);
2086 let primary = primary_with_committee(
2087 0, &accounts,
2089 committee,
2090 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2091 );
2092
2093 (primary, accounts)
2094 }
2095
2096 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2098 let solution_id = rng.r#gen::<u64>().into();
2100 let size = rng.gen_range(1024..10 * 1024);
2102 let mut vec = vec![0u8; size];
2104 rng.fill_bytes(&mut vec);
2105 let solution = Data::Buffer(Bytes::from(vec));
2106 (solution_id, solution)
2108 }
2109
2110 fn sample_unconfirmed_transaction(
2112 rng: &mut TestRng,
2113 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2114 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2115 let id = transaction.id();
2116
2117 (id, Data::Object(transaction))
2118 }
2119
2120 fn create_test_proposal(
2122 author: &Account<CurrentNetwork>,
2123 committee: Committee<CurrentNetwork>,
2124 round: u64,
2125 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2126 timestamp: i64,
2127 num_transactions: u64,
2128 rng: &mut TestRng,
2129 ) -> Proposal<CurrentNetwork> {
2130 let mut transmission_ids = IndexSet::new();
2131 let mut transmissions = IndexMap::new();
2132
2133 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2135 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2136 let solution_transmission_id = (solution_id, solution_checksum).into();
2137 transmission_ids.insert(solution_transmission_id);
2138 transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2139
2140 for _ in 0..num_transactions {
2142 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2143 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2144 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2145 transmission_ids.insert(transaction_transmission_id);
2146 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2147 }
2148
2149 let private_key = author.private_key();
2151 let batch_header = BatchHeader::new(
2153 private_key,
2154 round,
2155 timestamp,
2156 committee.id(),
2157 transmission_ids,
2158 previous_certificate_ids,
2159 rng,
2160 )
2161 .unwrap();
2162 Proposal::new(committee, batch_header, transmissions).unwrap()
2164 }
2165
2166 fn peer_signatures_for_proposal(
2169 primary: &Primary<CurrentNetwork>,
2170 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2171 rng: &mut TestRng,
2172 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2173 let mut signatures = Vec::with_capacity(accounts.len() - 1);
2175 for (socket_addr, account) in accounts {
2176 if account.address() == primary.gateway.account().address() {
2177 continue;
2178 }
2179 let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2180 let signature = account.sign(&[batch_id], rng).unwrap();
2181 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2182 }
2183
2184 signatures
2185 }
2186
2187 fn peer_signatures_for_batch(
2189 primary_address: Address<CurrentNetwork>,
2190 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2191 batch_id: Field<CurrentNetwork>,
2192 rng: &mut TestRng,
2193 ) -> IndexSet<Signature<CurrentNetwork>> {
2194 let mut signatures = IndexSet::new();
2195 for (_, account) in accounts {
2196 if account.address() == primary_address {
2197 continue;
2198 }
2199 let signature = account.sign(&[batch_id], rng).unwrap();
2200 signatures.insert(signature);
2201 }
2202 signatures
2203 }
2204
2205 fn create_batch_certificate(
2207 primary_address: Address<CurrentNetwork>,
2208 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2209 round: u64,
2210 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2211 rng: &mut TestRng,
2212 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2213 let timestamp = now();
2214
2215 let author =
2216 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2217 let private_key = author.private_key();
2218
2219 let committee_id = Field::rand(rng);
2220 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2221 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2222 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2223 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2224
2225 let solution_transmission_id = (solution_id, solution_checksum).into();
2226 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2227
2228 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2229 let transmissions = [
2230 (solution_transmission_id, Transmission::Solution(solution)),
2231 (transaction_transmission_id, Transmission::Transaction(transaction)),
2232 ]
2233 .into();
2234
2235 let batch_header = BatchHeader::new(
2236 private_key,
2237 round,
2238 timestamp,
2239 committee_id,
2240 transmission_ids,
2241 previous_certificate_ids,
2242 rng,
2243 )
2244 .unwrap();
2245 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2246 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2247 (certificate, transmissions)
2248 }
2249
2250 fn store_certificate_chain(
2252 primary: &Primary<CurrentNetwork>,
2253 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2254 round: u64,
2255 rng: &mut TestRng,
2256 ) -> IndexSet<Field<CurrentNetwork>> {
2257 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2258 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2259 for cur_round in 1..round {
2260 for (_, account) in accounts.iter() {
2261 let (certificate, transmissions) = create_batch_certificate(
2262 account.address(),
2263 accounts,
2264 cur_round,
2265 previous_certificates.clone(),
2266 rng,
2267 );
2268 next_certificates.insert(certificate.id());
2269 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2270 }
2271
2272 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2273 previous_certificates = next_certificates;
2274 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2275 }
2276
2277 previous_certificates
2278 }
2279
2280 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2283 for (addr, acct) in accounts.iter().skip(1) {
2285 primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2286 }
2287 }
2288
2289 #[tokio::test]
2290 async fn test_propose_batch() {
2291 let mut rng = TestRng::default();
2292 let (primary, _) = primary_without_handlers(&mut rng);
2293
2294 assert!(primary.proposed_batch.read().is_none());
2296
2297 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2299 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2300
2301 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2303 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2304
2305 assert!(primary.propose_batch().await.is_ok());
2307 assert!(primary.proposed_batch.read().is_some());
2308 }
2309
2310 #[tokio::test]
2311 async fn test_propose_batch_with_no_transmissions() {
2312 let mut rng = TestRng::default();
2313 let (primary, _) = primary_without_handlers(&mut rng);
2314
2315 assert!(primary.proposed_batch.read().is_none());
2317
2318 assert!(primary.propose_batch().await.is_ok());
2320 assert!(primary.proposed_batch.read().is_some());
2321 }
2322
2323 #[tokio::test]
2324 async fn test_propose_batch_in_round() {
2325 let round = 3;
2326 let mut rng = TestRng::default();
2327 let (primary, accounts) = primary_without_handlers(&mut rng);
2328
2329 store_certificate_chain(&primary, &accounts, round, &mut rng);
2331
2332 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2334
2335 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2337 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2338
2339 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2341 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2342
2343 assert!(primary.propose_batch().await.is_ok());
2345 assert!(primary.proposed_batch.read().is_some());
2346 }
2347
2348 #[tokio::test]
2349 async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2350 let round = 3;
2351 let prev_round = round - 1;
2352 let mut rng = TestRng::default();
2353 let (primary, accounts) = primary_without_handlers(&mut rng);
2354 let peer_account = &accounts[1];
2355 let peer_ip = peer_account.0;
2356
2357 store_certificate_chain(&primary, &accounts, round, &mut rng);
2359
2360 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2362
2363 let mut num_transmissions_in_previous_round = 0;
2365
2366 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2368 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2369 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2370 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2371
2372 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2374 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2375
2376 assert_eq!(primary.workers[0].num_transmissions(), 2);
2378
2379 for (_, account) in accounts.iter() {
2381 let (certificate, transmissions) = create_batch_certificate(
2382 account.address(),
2383 &accounts,
2384 round,
2385 previous_certificate_ids.clone(),
2386 &mut rng,
2387 );
2388
2389 for (transmission_id, transmission) in transmissions.iter() {
2391 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2392 }
2393
2394 num_transmissions_in_previous_round += transmissions.len();
2396 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2397 }
2398
2399 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2401
2402 assert!(primary.storage.increment_to_next_round(round).is_ok());
2404
2405 assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2407
2408 assert!(primary.propose_batch().await.is_ok());
2410
2411 let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2413 assert_eq!(proposed_transmissions.len(), 2);
2414 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2415 assert!(
2416 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2417 );
2418 }
2419
2420 #[tokio::test]
2421 async fn test_propose_batch_over_spend_limit() {
2422 let mut rng = TestRng::default();
2423
2424 let (accounts, committee) = sample_committee(&mut rng);
2426 let primary = primary_with_committee(
2427 0,
2428 &accounts,
2429 committee.clone(),
2430 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2431 );
2432
2433 assert!(primary.proposed_batch.read().is_none());
2435 primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2437
2438 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2440 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2441
2442 for _i in 0..5 {
2443 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2444 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2446 }
2447
2448 assert!(primary.propose_batch().await.is_ok());
2450 assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2452 assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2454 }
2455
2456 #[tokio::test]
2457 async fn test_batch_propose_from_peer() {
2458 let mut rng = TestRng::default();
2459 let (primary, accounts) = primary_without_handlers(&mut rng);
2460
2461 let round = 1;
2463 let peer_account = &accounts[1];
2464 let peer_ip = peer_account.0;
2465 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2466 let proposal = create_test_proposal(
2467 &peer_account.1,
2468 primary.ledger.current_committee().unwrap(),
2469 round,
2470 Default::default(),
2471 timestamp,
2472 1,
2473 &mut rng,
2474 );
2475
2476 for (transmission_id, transmission) in proposal.transmissions() {
2478 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2479 }
2480
2481 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2483
2484 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2487 primary.sync.testing_only_try_block_sync_testing_only().await;
2488
2489 assert!(
2491 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2492 );
2493 }
2494
2495 #[tokio::test]
2496 async fn test_batch_propose_from_peer_when_not_synced() {
2497 let mut rng = TestRng::default();
2498 let (primary, accounts) = primary_without_handlers(&mut rng);
2499
2500 let round = 1;
2502 let peer_account = &accounts[1];
2503 let peer_ip = peer_account.0;
2504 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2505 let proposal = create_test_proposal(
2506 &peer_account.1,
2507 primary.ledger.current_committee().unwrap(),
2508 round,
2509 Default::default(),
2510 timestamp,
2511 1,
2512 &mut rng,
2513 );
2514
2515 for (transmission_id, transmission) in proposal.transmissions() {
2517 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2518 }
2519
2520 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2522
2523 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2525
2526 assert!(
2528 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2529 );
2530 }
2531
2532 #[tokio::test]
2533 async fn test_batch_propose_from_peer_in_round() {
2534 let round = 2;
2535 let mut rng = TestRng::default();
2536 let (primary, accounts) = primary_without_handlers(&mut rng);
2537
2538 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2540
2541 let peer_account = &accounts[1];
2543 let peer_ip = peer_account.0;
2544 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2545 let proposal = create_test_proposal(
2546 &peer_account.1,
2547 primary.ledger.current_committee().unwrap(),
2548 round,
2549 previous_certificates,
2550 timestamp,
2551 1,
2552 &mut rng,
2553 );
2554
2555 for (transmission_id, transmission) in proposal.transmissions() {
2557 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2558 }
2559
2560 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2562
2563 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2566 primary.sync.testing_only_try_block_sync_testing_only().await;
2567
2568 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2570 }
2571
2572 #[tokio::test]
2573 async fn test_batch_propose_from_peer_wrong_round() {
2574 let mut rng = TestRng::default();
2575 let (primary, accounts) = primary_without_handlers(&mut rng);
2576
2577 let round = 1;
2579 let peer_account = &accounts[1];
2580 let peer_ip = peer_account.0;
2581 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2582 let proposal = create_test_proposal(
2583 &peer_account.1,
2584 primary.ledger.current_committee().unwrap(),
2585 round,
2586 Default::default(),
2587 timestamp,
2588 1,
2589 &mut rng,
2590 );
2591
2592 for (transmission_id, transmission) in proposal.transmissions() {
2594 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2595 }
2596
2597 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2599 primary.sync.testing_only_try_block_sync_testing_only().await;
2601
2602 assert!(
2604 primary
2605 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2606 round: round + 1,
2607 batch_header: Data::Object(proposal.batch_header().clone())
2608 })
2609 .await
2610 .is_err()
2611 );
2612 }
2613
2614 #[tokio::test]
2615 async fn test_batch_propose_from_peer_in_round_wrong_round() {
2616 let round = 4;
2617 let mut rng = TestRng::default();
2618 let (primary, accounts) = primary_without_handlers(&mut rng);
2619
2620 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2622
2623 let peer_account = &accounts[1];
2625 let peer_ip = peer_account.0;
2626 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2627 let proposal = create_test_proposal(
2628 &peer_account.1,
2629 primary.ledger.current_committee().unwrap(),
2630 round,
2631 previous_certificates,
2632 timestamp,
2633 1,
2634 &mut rng,
2635 );
2636
2637 for (transmission_id, transmission) in proposal.transmissions() {
2639 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2640 }
2641
2642 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2644 primary.sync.testing_only_try_block_sync_testing_only().await;
2646
2647 assert!(
2649 primary
2650 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2651 round: round + 1,
2652 batch_header: Data::Object(proposal.batch_header().clone())
2653 })
2654 .await
2655 .is_err()
2656 );
2657 }
2658
2659 #[tokio::test]
2661 async fn test_batch_propose_from_peer_with_past_timestamp() {
2662 let round = 2;
2663 let mut rng = TestRng::default();
2664 let (primary, accounts) = primary_without_handlers(&mut rng);
2665
2666 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2668
2669 let peer_account = &accounts[1];
2671 let peer_ip = peer_account.0;
2672
2673 let last_timestamp = primary
2677 .storage
2678 .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2679 .expect("No previous proposal exists")
2680 .timestamp();
2681 let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2682
2683 let proposal = create_test_proposal(
2684 &peer_account.1,
2685 primary.ledger.current_committee().unwrap(),
2686 round,
2687 previous_certificates,
2688 invalid_timestamp,
2689 1,
2690 &mut rng,
2691 );
2692
2693 for (transmission_id, transmission) in proposal.transmissions() {
2695 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2696 }
2697
2698 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2700 primary.sync.testing_only_try_block_sync_testing_only().await;
2702
2703 assert!(
2705 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2706 );
2707 }
2708
2709 #[tokio::test]
2711 async fn test_batch_propose_from_peer_over_spend_limit() {
2712 let mut rng = TestRng::default();
2713
2714 let (accounts, committee) = sample_committee(&mut rng);
2716 let primary_v4 = primary_with_committee(
2717 0,
2718 &accounts,
2719 committee.clone(),
2720 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2721 );
2722 let primary_v5 = primary_with_committee(
2723 1,
2724 &accounts,
2725 committee.clone(),
2726 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2727 );
2728
2729 let round = 1;
2731 let peer_account = &accounts[2];
2732 let peer_ip = peer_account.0;
2733
2734 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2735
2736 let proposal =
2737 create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2738
2739 for (transmission_id, transmission) in proposal.transmissions() {
2741 primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2742 primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2743 }
2744
2745 primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2747 primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2748
2749 primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2751 primary_v4.sync.testing_only_try_block_sync_testing_only().await;
2752
2753 primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2755 primary_v5.sync.testing_only_try_block_sync_testing_only().await;
2756
2757 assert!(
2759 primary_v4
2760 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2761 .await
2762 .is_ok()
2763 );
2764
2765 assert!(
2766 primary_v5
2767 .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2768 .await
2769 .is_err()
2770 );
2771 }
2772
2773 #[tokio::test]
2774 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2775 let round = 3;
2776 let mut rng = TestRng::default();
2777 let (primary, _) = primary_without_handlers(&mut rng);
2778
2779 assert!(primary.proposed_batch.read().is_none());
2781
2782 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2784 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2785
2786 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2788 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2789
2790 let old_proposal_lock_round = *primary.propose_lock.lock().await;
2792 *primary.propose_lock.lock().await = round + 1;
2793
2794 assert!(primary.propose_batch().await.is_ok());
2796 assert!(primary.proposed_batch.read().is_none());
2797
2798 *primary.propose_lock.lock().await = old_proposal_lock_round;
2800
2801 assert!(primary.propose_batch().await.is_ok());
2803 assert!(primary.proposed_batch.read().is_some());
2804 }
2805
2806 #[tokio::test]
2807 async fn test_propose_batch_with_storage_round_behind_proposal() {
2808 let round = 5;
2809 let mut rng = TestRng::default();
2810 let (primary, accounts) = primary_without_handlers(&mut rng);
2811
2812 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2814
2815 let timestamp = now();
2817 let proposal = create_test_proposal(
2818 primary.gateway.account(),
2819 primary.ledger.current_committee().unwrap(),
2820 round + 1,
2821 previous_certificates,
2822 timestamp,
2823 1,
2824 &mut rng,
2825 );
2826
2827 *primary.proposed_batch.write() = Some(proposal);
2829
2830 assert!(primary.propose_batch().await.is_ok());
2832 assert!(primary.proposed_batch.read().is_some());
2833 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2834 }
2835
2836 #[tokio::test(flavor = "multi_thread")]
2837 async fn test_batch_signature_from_peer() {
2838 let mut rng = TestRng::default();
2839 let (primary, accounts) = primary_without_handlers(&mut rng);
2840 map_account_addresses(&primary, &accounts);
2841
2842 let round = 1;
2844 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2845 let proposal = create_test_proposal(
2846 primary.gateway.account(),
2847 primary.ledger.current_committee().unwrap(),
2848 round,
2849 Default::default(),
2850 timestamp,
2851 1,
2852 &mut rng,
2853 );
2854
2855 *primary.proposed_batch.write() = Some(proposal);
2857
2858 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2860
2861 for (socket_addr, signature) in signatures {
2863 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2864 }
2865
2866 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2868 assert_eq!(primary.current_round(), round + 1);
2870 }
2871
2872 #[tokio::test(flavor = "multi_thread")]
2873 async fn test_batch_signature_from_peer_in_round() {
2874 let round = 5;
2875 let mut rng = TestRng::default();
2876 let (primary, accounts) = primary_without_handlers(&mut rng);
2877 map_account_addresses(&primary, &accounts);
2878
2879 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2881
2882 let timestamp = now();
2884 let proposal = create_test_proposal(
2885 primary.gateway.account(),
2886 primary.ledger.current_committee().unwrap(),
2887 round,
2888 previous_certificates,
2889 timestamp,
2890 1,
2891 &mut rng,
2892 );
2893
2894 *primary.proposed_batch.write() = Some(proposal);
2896
2897 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2899
2900 for (socket_addr, signature) in signatures {
2902 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2903 }
2904
2905 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2907 assert_eq!(primary.current_round(), round + 1);
2909 }
2910
2911 #[tokio::test]
2912 async fn test_batch_signature_from_peer_no_quorum() {
2913 let mut rng = TestRng::default();
2914 let (primary, accounts) = primary_without_handlers(&mut rng);
2915 map_account_addresses(&primary, &accounts);
2916
2917 let round = 1;
2919 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2920 let proposal = create_test_proposal(
2921 primary.gateway.account(),
2922 primary.ledger.current_committee().unwrap(),
2923 round,
2924 Default::default(),
2925 timestamp,
2926 1,
2927 &mut rng,
2928 );
2929
2930 *primary.proposed_batch.write() = Some(proposal);
2932
2933 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2935
2936 let (socket_addr, signature) = signatures.first().unwrap();
2938 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2939
2940 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2942 assert_eq!(primary.current_round(), round);
2944 }
2945
2946 #[tokio::test]
2947 async fn test_batch_signature_from_peer_in_round_no_quorum() {
2948 let round = 7;
2949 let mut rng = TestRng::default();
2950 let (primary, accounts) = primary_without_handlers(&mut rng);
2951 map_account_addresses(&primary, &accounts);
2952
2953 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2955
2956 let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2958 let proposal = create_test_proposal(
2959 primary.gateway.account(),
2960 primary.ledger.current_committee().unwrap(),
2961 round,
2962 previous_certificates,
2963 timestamp,
2964 1,
2965 &mut rng,
2966 );
2967
2968 *primary.proposed_batch.write() = Some(proposal);
2970
2971 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2973
2974 let (socket_addr, signature) = signatures.first().unwrap();
2976 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2977
2978 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2980 assert_eq!(primary.current_round(), round);
2982 }
2983
2984 #[tokio::test]
2985 async fn test_insert_certificate_with_aborted_transmissions() {
2986 let round = 3;
2987 let prev_round = round - 1;
2988 let mut rng = TestRng::default();
2989 let (primary, accounts) = primary_without_handlers(&mut rng);
2990 let peer_account = &accounts[1];
2991 let peer_ip = peer_account.0;
2992
2993 store_certificate_chain(&primary, &accounts, round, &mut rng);
2995
2996 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2998
2999 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
3001 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
3002
3003 primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
3005 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
3006
3007 assert_eq!(primary.workers[0].num_transmissions(), 2);
3009
3010 let account = accounts[0].1.clone();
3012 let (certificate, transmissions) =
3013 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3014 let certificate_id = certificate.id();
3015
3016 let mut aborted_transmissions = HashSet::new();
3018 let mut transmissions_without_aborted = HashMap::new();
3019 for (transmission_id, transmission) in transmissions.clone() {
3020 match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
3021 true => {
3022 aborted_transmissions.insert(transmission_id);
3024 }
3025 false => {
3026 transmissions_without_aborted.insert(transmission_id, transmission);
3028 }
3029 };
3030 }
3031
3032 for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3034 primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3035 }
3036
3037 assert!(
3039 primary
3040 .storage
3041 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3042 .is_err()
3043 );
3044 assert!(
3045 primary
3046 .storage
3047 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3048 .is_err()
3049 );
3050
3051 primary
3053 .storage
3054 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3055 .unwrap();
3056
3057 assert!(primary.storage.contains_certificate(certificate_id));
3059 for aborted_transmission_id in aborted_transmissions {
3061 assert!(primary.storage.contains_transmission(aborted_transmission_id));
3062 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3063 }
3064 }
3065}