1mod proposal_task;
17pub use proposal_task::ProposalTask;
18
19use crate::{
20 Gateway,
21 MAX_BATCH_DELAY,
22 MAX_LEADER_CERTIFICATE_DELAY,
23 MAX_WORKERS,
24 MIN_BATCH_DELAY,
25 PRIMARY_PING_INTERVAL,
26 Sync,
27 Transport,
28 WORKER_PING_INTERVAL,
29 Worker,
30 events::{BatchPropose, BatchSignature, Event},
31 helpers::{
32 PrimaryReceiver,
33 PrimarySender,
34 Proposal,
35 ProposalCache,
36 SignedProposals,
37 Storage,
38 assign_to_worker,
39 assign_to_workers,
40 fmt_id,
41 init_sync_channels,
42 init_worker_channels,
43 now,
44 },
45 spawn_blocking,
46 sync::SyncCallback,
47};
48
49use snarkos_account::Account;
50use snarkos_node_bft_events::PrimaryPing;
51use snarkos_node_bft_ledger_service::LedgerService;
52#[cfg(test)]
53use snarkos_node_network::ConnectionMode;
54use snarkos_node_network::PeerPoolHandling;
55use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
56use snarkos_utilities::{CallbackHandle, NodeDataDir};
57
58use snarkvm::{
59 console::{
60 prelude::*,
61 types::{Address, Field},
62 },
63 ledger::{
64 block::Transaction,
65 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
66 puzzle::{Solution, SolutionID},
67 },
68 prelude::{Signature, committee::Committee},
69 utilities::flatten_error,
70};
71
72use anyhow::Context;
73use colored::Colorize;
74use futures::{
75 future::join_all,
76 stream::{FuturesUnordered, StreamExt},
77};
78use indexmap::{IndexMap, IndexSet};
79#[cfg(feature = "locktick")]
80use locktick::{
81 parking_lot::{Mutex, RwLock},
82 tokio::RwLock as TRwLock,
83};
84#[cfg(not(feature = "locktick"))]
85use parking_lot::{Mutex, RwLock};
86#[cfg(not(feature = "serial"))]
87use rayon::prelude::*;
88use std::{
89 collections::{HashMap, HashSet},
90 future::Future,
91 net::SocketAddr,
92 pin::Pin,
93 sync::{Arc, OnceLock},
94 time::Instant,
95};
96#[cfg(not(feature = "locktick"))]
97use tokio::sync::RwLock as TRwLock;
98use tokio::{sync::Notify, task::JoinHandle};
99
100#[derive(Debug, PartialEq, Eq)]
102pub enum ProposedBatchState<N: Network> {
103 None,
105 Certifying(Box<Proposal<N>>),
107 Certified(Field<N>),
110}
111
112impl<N: Network> Default for ProposedBatchState<N> {
113 fn default() -> Self {
114 Self::None
115 }
116}
117
118impl<N: Network> ProposedBatchState<N> {
119 pub fn is_none(&self) -> bool {
121 matches!(self, Self::None)
122 }
123
124 pub fn is_proposed(&self) -> bool {
126 matches!(self, Self::Certifying(_))
127 }
128
129 pub fn as_proposal(&self) -> Option<&Proposal<N>> {
131 match self {
132 Self::Certifying(p) => Some(p.as_ref()),
133 _ => None,
134 }
135 }
136}
137
138pub type ProposedBatch<N> = RwLock<ProposedBatchState<N>>;
140
141#[async_trait::async_trait]
144pub trait PrimaryCallback<N: Network>: Send + std::marker::Sync {
145 fn try_advance_to_next_round(&self, current_round: u64) -> bool;
153
154 async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()>;
156}
157
158#[derive(Clone)]
161pub struct Primary<N: Network> {
162 sync: Sync<N>,
164 gateway: Gateway<N>,
166 storage: Storage<N>,
168 ledger: Arc<dyn LedgerService<N>>,
170 workers: Arc<OnceLock<Vec<Worker<N>>>>,
172
173 primary_callback: Arc<CallbackHandle<Arc<dyn PrimaryCallback<N>>>>,
175
176 proposed_batch: Arc<ProposedBatch<N>>,
178
179 #[cfg(feature = "metrics")]
182 batch_propose_start: Arc<Mutex<Option<Instant>>>,
183
184 latest_proposal_timestamp: Arc<TRwLock<Option<(u64, i64)>>>,
188
189 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
191
192 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
194
195 node_data_dir: NodeDataDir,
197
198 proposal_task: ProposalTask<N>,
200
201 round_increment_notify: Arc<Notify>,
204}
205
206impl<N: Network> Primary<N> {
207 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
209
210 #[allow(clippy::too_many_arguments)]
212 pub fn new(
213 account: Account<N>,
214 storage: Storage<N>,
215 ledger: Arc<dyn LedgerService<N>>,
216 block_sync: Arc<BlockSync<N>>,
217 ip: Option<SocketAddr>,
218 trusted_validators: &[SocketAddr],
219 trusted_peers_only: bool,
220 node_data_dir: NodeDataDir,
221 dev: Option<u16>,
222 ) -> Result<Self> {
223 let gateway = Gateway::new(
225 account,
226 storage.clone(),
227 ledger.clone(),
228 ip,
229 trusted_validators,
230 trusted_peers_only,
231 node_data_dir.clone(),
232 dev,
233 )?;
234 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
236
237 Ok(Self {
239 sync,
240 gateway,
241 storage,
242 ledger,
243 node_data_dir,
244 workers: Default::default(),
245 primary_callback: Default::default(),
246 proposed_batch: Default::default(),
247 #[cfg(feature = "metrics")]
248 batch_propose_start: Default::default(),
249 latest_proposal_timestamp: Default::default(),
250 signed_proposals: Default::default(),
251 handles: Default::default(),
252 proposal_task: Default::default(),
253 round_increment_notify: Default::default(),
254 })
255 }
256
257 async fn load_proposal_cache(&self) -> Result<()> {
259 match ProposalCache::<N>::exists(&self.node_data_dir) {
261 true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.node_data_dir) {
263 Ok(proposal_cache) => {
264 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
266 proposal_cache.into();
267
268 *self.latest_proposal_timestamp.write().await = Some((latest_certificate_round, now()));
269 *self.proposed_batch.write() = match proposed_batch {
270 Some(p) => ProposedBatchState::Certifying(Box::new(p)),
271 None => ProposedBatchState::None,
272 };
273 *self.signed_proposals.write() = signed_proposals;
274
275 for certificate in pending_certificates {
277 let batch_id = certificate.batch_id();
278 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
282 {
283 let err = err.context(format!(
284 "Failed to load stored certificate {} from proposal cache",
285 fmt_id(batch_id)
286 ));
287 warn!("{}", &flatten_error(err));
288 }
289 }
290 Ok(())
291 }
292 Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
293 },
294 false => Ok(()),
296 }
297 }
298
299 pub async fn run(
301 &self,
302 ping: Option<Arc<Ping<N>>>,
303 primary_callback: Option<Arc<dyn PrimaryCallback<N>>>,
304 sync_callback: Option<Arc<dyn SyncCallback<N>>>,
305 primary_sender: PrimarySender<N>,
306 primary_receiver: PrimaryReceiver<N>,
307 ) -> Result<()> {
308 info!("Starting the primary instance of the memory pool...");
309
310 if let Some(callback) = primary_callback {
312 self.primary_callback.set(callback)?;
313 }
314
315 let mut worker_senders = IndexMap::new();
317 let mut workers = Vec::new();
319 for id in 0..MAX_WORKERS {
321 let (tx_worker, rx_worker) = init_worker_channels();
323 let worker = Worker::new(
325 id,
326 Arc::new(self.gateway.clone()),
327 self.storage.clone(),
328 self.ledger.clone(),
329 self.proposed_batch.clone(),
330 )?;
331 worker.run(rx_worker);
333 workers.push(worker);
335 worker_senders.insert(id, tx_worker);
337 }
338 if self.workers.set(workers).is_err() {
340 bail!("Workers already set. `Primary::run` cannot be called more than once.");
341 }
342
343 let (sync_sender, sync_receiver) = init_sync_channels();
345 self.sync.initialize(sync_callback)?;
347 self.load_proposal_cache().await?;
349 self.sync.run(ping, sync_receiver).await?;
351 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
353 self.start_handlers(primary_receiver);
356
357 Ok(())
358 }
359
360 pub fn current_round(&self) -> u64 {
362 self.storage.current_round()
363 }
364
365 pub fn is_synced(&self) -> bool {
367 self.sync.is_synced()
368 }
369
370 pub const fn gateway(&self) -> &Gateway<N> {
372 &self.gateway
373 }
374
375 pub const fn storage(&self) -> &Storage<N> {
377 &self.storage
378 }
379
380 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
382 &self.ledger
383 }
384
385 pub fn num_workers(&self) -> u8 {
387 u8::try_from(self.workers.get().expect("Primary is not running yet").len()).expect("Too many workers")
388 }
389
390 pub fn workers(&self) -> &[Worker<N>] {
392 self.workers.get().expect("Primary is not running yet")
393 }
394}
395
396impl<N: Network> Primary<N> {
397 pub fn num_unconfirmed_transmissions(&self) -> usize {
399 self.workers().iter().map(|worker| worker.num_transmissions()).sum()
400 }
401
402 pub fn num_unconfirmed_ratifications(&self) -> usize {
404 self.workers().iter().map(|worker| worker.num_ratifications()).sum()
405 }
406
407 pub fn num_unconfirmed_solutions(&self) -> usize {
409 self.workers().iter().map(|worker| worker.num_solutions()).sum()
410 }
411
412 pub fn num_unconfirmed_transactions(&self) -> usize {
414 self.workers().iter().map(|worker| worker.num_transactions()).sum()
415 }
416}
417
418impl<N: Network> Primary<N> {
419 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
421 self.workers().iter().flat_map(|worker| worker.transmission_ids())
422 }
423
424 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
426 self.workers().iter().flat_map(|worker| worker.transmissions())
427 }
428
429 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
431 self.workers().iter().flat_map(|worker| worker.solutions())
432 }
433
434 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
436 self.workers().iter().flat_map(|worker| worker.transactions())
437 }
438}
439
440impl<N: Network> Primary<N> {
441 pub fn clear_worker_solutions(&self) {
443 self.workers().iter().for_each(Worker::clear_solutions);
444 }
445}
446
447#[async_trait::async_trait]
448impl<N: Network> proposal_task::BatchPropose for Primary<N> {
449 fn current_round(&self) -> u64 {
450 Primary::current_round(self)
451 }
452
453 fn wait_for_synced_if_syncing(&self) -> Option<futures::future::BoxFuture<'_, ()>> {
454 self.sync.wait_for_synced_if_syncing()
455 }
456
457 fn is_synced(&self) -> bool {
458 self.sync.is_synced()
459 }
460
461 async fn propose_batch(&self) -> Result<bool> {
474 let mut lock_guard = self.latest_proposal_timestamp.write().await;
479
480 if let Err(err) = self
482 .check_proposed_batch_for_expiration()
483 .with_context(|| "Failed to check the proposed batch for expiration")
484 {
485 warn!("{}", flatten_error(&err));
486 return Ok(false);
487 }
488
489 let round = self.current_round();
491 let previous_round = round.saturating_sub(1);
493
494 ensure!(round > 0, "Round 0 cannot have transaction batches");
498
499 if let Some((latest_round, _)) = &*lock_guard
501 && round < *latest_round
502 {
503 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {latest_round}");
504 return Ok(false);
505 }
506
507 match &*self.proposed_batch.read() {
509 ProposedBatchState::Certifying(proposal) => {
510 if round < proposal.round()
512 || proposal
513 .batch_header()
514 .previous_certificate_ids()
515 .iter()
516 .any(|id| !self.storage.contains_certificate(*id))
517 {
518 warn!(
519 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
520 proposal.round(),
521 );
522 return Ok(false);
523 }
524 let event = Event::BatchPropose(proposal.batch_header().clone().into());
527 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
529 match self.gateway.resolver().read().get_peer_ip_for_address(address) {
531 Some(peer_ip) => {
533 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
534 tokio::spawn(async move {
535 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
536 if gateway.send(peer_ip, event_).await.is_none() {
538 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
539 }
540 });
541 }
542 None => continue,
543 }
544 }
545 debug!("Proposed batch for round {} is still valid", proposal.round());
546 return Ok(false);
547 }
548 ProposedBatchState::Certified(_) => {
550 debug!("Cannot propose a batch for round {round} - a batch is currently being certified");
551 return Ok(false);
552 }
553 ProposedBatchState::None => {
554 }
556 }
557
558 #[cfg(feature = "metrics")]
559 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
560
561 if let Some((_, latest_timestamp)) = &*lock_guard
563 && !self.check_own_proposal_timestamp(previous_round, *latest_timestamp, now())?
564 {
565 return Ok(false);
566 }
567
568 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
570 if let Some(cb) = &*self.primary_callback.get_ref() {
572 match cb.try_advance_to_next_round(self.current_round()) {
573 true => (), false => return Ok(false),
575 }
576 }
577 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
578 return Ok(false);
579 }
580
581 if let Some((latest_round, _)) = &*lock_guard
587 && *latest_round == round
588 {
589 debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
590 return Ok(false);
591 }
592
593 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
595 {
597 let mut connected_validators = self.gateway.connected_addresses();
599 connected_validators.insert(self.gateway.account().address());
601 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
603 debug!(
604 "Primary is safely skipping a batch proposal for round {round} {}",
605 "(please connect to more validators)".dimmed()
606 );
607 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
608 return Ok(false);
609 }
610 }
611
612 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
614
615 let mut is_ready = previous_round == 0;
618 if previous_round > 0 {
620 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
622 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
623 };
624 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
626 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
628 is_ready = true;
629 }
630 #[cfg(feature = "test_network")]
631 {
632 if let Some(dev_committee) = self.ledger.dev_committee_for_round(previous_round)? {
634 if round <= dev_committee.starting_round() {
635 is_ready = true;
636 }
637 }
638 }
639 }
640 if !is_ready {
642 debug!(
643 "Primary is safely skipping a batch proposal for round {round} {}",
644 format!("(previous round {previous_round} has not reached quorum)").dimmed()
645 );
646 return Ok(false);
647 }
648
649 let mut transmissions: IndexMap<_, _> = Default::default();
651 let mut proposal_cost = 0u64;
653 debug_assert_eq!(MAX_WORKERS, 1);
657
658 'outer: for worker in self.workers().iter() {
659 let mut num_worker_transmissions = 0usize;
660
661 while let Some((id, transmission)) = worker.remove_front() {
662 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
664 worker.insert_front(id, transmission);
666 break 'outer;
667 }
668
669 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
671 worker.insert_front(id, transmission);
673 continue 'outer;
674 }
675
676 if self.ledger.contains_transmission(&id).unwrap_or(true) {
678 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
679 continue;
680 }
681
682 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
686 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
687 continue;
688 }
689
690 match (id, transmission.clone()) {
692 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
693 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
695 {
696 trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
697 continue;
698 }
699 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
701 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
702 continue;
703 }
704 }
705 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
706 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
708 {
709 trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
710 continue;
711 }
712
713 let transaction = spawn_blocking!({
715 match transaction {
716 Data::Object(transaction) => Ok(transaction),
717 Data::Buffer(bytes) => Ok(Transaction::<N>::read_le(
718 &mut bytes.take(N::LATEST_MAX_TRANSACTION_SIZE() as u64),
719 )?),
720 }
721 })?;
722
723 let current_block_height = self.ledger.latest_block_height();
725 let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
726
727 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
730 else {
731 debug!(
732 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
733 fmt_id(transaction_id)
734 );
735 continue;
736 };
737
738 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
740 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
741 continue;
742 }
743
744 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
747 debug!(
748 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
749 fmt_id(transaction_id)
750 );
751 continue;
752 };
753
754 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
756 if next_proposal_cost > batch_spend_limit {
757 debug!(
758 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
759 fmt_id(transaction_id),
760 batch_spend_limit
761 );
762
763 worker.insert_front(id, transmission);
765 break 'outer;
766 }
767
768 proposal_cost = next_proposal_cost;
770 }
771
772 (TransmissionID::Ratification, Transmission::Ratification) => continue,
775 _ => continue,
777 }
778
779 transmissions.insert(id, transmission);
781 num_worker_transmissions = num_worker_transmissions.saturating_add(1);
782 }
783 }
784
785 let current_timestamp = now();
787
788 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
790
791 *lock_guard = Some((round, current_timestamp));
793 let private_key = *self.gateway.account().private_key();
795 let committee_id = committee_lookback.id();
797 let transmission_ids = transmissions.keys().copied().collect();
799 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
801 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
803 &private_key,
804 round,
805 current_timestamp,
806 committee_id,
807 transmission_ids,
808 previous_certificate_ids,
809 &mut rand::rng()
810 ))
811 .and_then(|batch_header| {
812 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
813 .map(|proposal| (batch_header, proposal))
814 })
815 .inspect_err(|_| {
816 if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
818 error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
819 }
820 })?;
821
822 self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
824 *self.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
826 #[cfg(feature = "metrics")]
828 {
829 *self.batch_propose_start.lock() = Some(Instant::now());
830 }
831
832 Ok(true)
833 }
834}
835
836impl<N: Network> Primary<N> {
837 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
847 let BatchPropose { round: batch_round, batch_header } = batch_propose;
848
849 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
851 if batch_round != batch_header.round() {
853 self.gateway.disconnect(peer_ip);
855 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
856 }
857
858 let batch_author = batch_header.author();
860
861 match self.gateway.resolve_to_aleo_addr(peer_ip) {
863 Some(address) => {
865 if address != batch_author {
866 self.gateway.disconnect(peer_ip);
868 bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
869 }
870 }
871 None => bail!("Batch proposal from a disconnected validator"),
872 }
873 if !self.gateway.is_authorized_validator_address(batch_author) {
875 self.gateway.disconnect(peer_ip);
877 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
878 }
879 if self.gateway.account().address() == batch_author {
881 bail!("Invalid peer - proposed batch from myself ({batch_author})");
882 }
883
884 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
890 if expected_committee_id != batch_header.committee_id() {
891 self.gateway.disconnect(peer_ip);
893 bail!(
894 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
895 batch_header.committee_id()
896 );
897 }
898
899 if let Some((signed_round, signed_batch_id, signature)) =
901 self.signed_proposals.read().get(&batch_author).copied()
902 {
903 if signed_round > batch_header.round() {
906 bail!(
907 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
908 batch_header.round()
909 );
910 }
911
912 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
914 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
915 }
916 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
919 let gateway = self.gateway.clone();
920 tokio::spawn(async move {
921 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
922 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
923 if gateway.send(peer_ip, event).await.is_none() {
925 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
926 }
927 });
928 return Ok(());
930 }
931 }
932
933 if self.storage.contains_batch(batch_header.batch_id()) {
936 debug!(
937 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
938 format!("batch for round {batch_round} already exists in storage").dimmed()
939 );
940 return Ok(());
941 }
942
943 let previous_round = batch_round.saturating_sub(1);
945 if let Err(err) = self.check_peer_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
947 self.gateway.disconnect(peer_ip);
949 return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
950 }
951
952 if batch_header.contains(TransmissionID::Ratification) {
954 self.gateway.disconnect(peer_ip);
956 bail!(
957 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
958 );
959 }
960
961 let mut missing_transmissions =
963 self.sync_with_batch_header_from_peer::<false, true>(peer_ip, &batch_header).await?;
964
965 if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
967 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
969 }) {
970 let err = err.context(format!(
971 "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
972 ));
973 debug!("{}", flatten_error(err));
974 return Ok(());
975 }
976
977 if let Err(e) = self.ensure_is_signing_round(batch_round) {
981 debug!("{e} from '{peer_ip}'");
983 return Ok(());
984 }
985
986 let (storage, header) = (self.storage.clone(), batch_header.clone());
988
989 let Some(missing_transmissions) =
991 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
992 else {
993 return Ok(());
994 };
995
996 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
998
999 let batch_id = batch_header.batch_id();
1003 let account = self.gateway.account().clone();
1005 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::rng()))?;
1006
1007 match self.signed_proposals.write().0.entry(batch_author) {
1013 std::collections::hash_map::Entry::Occupied(mut entry) => {
1014 if entry.get().0 == batch_round {
1019 return Ok(());
1020 }
1021 entry.insert((batch_round, batch_id, signature));
1023 }
1024 std::collections::hash_map::Entry::Vacant(entry) => {
1026 entry.insert((batch_round, batch_id, signature));
1028 }
1029 };
1030
1031 let self_ = self.clone();
1033 tokio::spawn(async move {
1034 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1035 if self_.gateway.send(peer_ip, event).await.is_some() {
1037 debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1038 }
1039 });
1040
1041 Ok(())
1042 }
1043
1044 fn add_signature_to_batch(
1055 &self,
1056 state: ProposedBatchState<N>,
1057 peer_ip: SocketAddr,
1058 batch_id: Field<N>,
1059 signature: Signature<N>,
1060 ) -> (Result<Option<Proposal<N>>>, ProposedBatchState<N>) {
1061 match state {
1062 ProposedBatchState::Certifying(mut proposal) if proposal.batch_id() == batch_id => {
1063 let inner: Result<bool> = (|| {
1066 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1067 let Some(signer) = self.gateway.resolve_to_aleo_addr(peer_ip) else {
1068 bail!("Signature is from a disconnected validator");
1069 };
1070 let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1071 if new_signature {
1072 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1073 Ok(proposal.is_quorum_threshold_reached(&committee_lookback))
1074 } else {
1075 debug!(
1076 "Received duplicated signature from '{peer_ip}' for batch \
1077 {batch_id} in round {round}",
1078 round = proposal.round()
1079 );
1080 Ok(false)
1081 }
1082 })();
1083 match inner {
1084 Ok(true) => {
1085 let certified_id = proposal.batch_id();
1086 (Ok(Some(*proposal)), ProposedBatchState::Certified(certified_id))
1087 }
1088 Ok(false) => (Ok(None), ProposedBatchState::Certifying(proposal)),
1089 Err(e) => (Err(e), ProposedBatchState::Certifying(proposal)),
1090 }
1091 }
1092 ProposedBatchState::Certifying(proposal) => {
1093 if self.storage.contains_batch(batch_id) {
1095 debug!(
1096 "Primary is safely skipping a batch signature from {peer_ip} for \
1097 round {} - batch is already certified",
1098 proposal.round()
1099 );
1100 (Ok(None), ProposedBatchState::Certifying(proposal))
1101 } else {
1102 let expected_id = proposal.batch_id();
1103 let round = proposal.round();
1104 (
1105 Err(anyhow!("Unknown batch ID '{batch_id}', expected '{expected_id}' for round {round}")),
1106 ProposedBatchState::Certifying(proposal),
1107 )
1108 }
1109 }
1110 ProposedBatchState::Certified(id) if id == batch_id => {
1111 debug!(
1113 "Skipping batch signature from {peer_ip} for batch '{batch_id}' - \
1114 already received sufficient signatures"
1115 );
1116 (Ok(None), ProposedBatchState::Certified(id))
1117 }
1118 ProposedBatchState::Certified(id) => {
1119 let result = if self.storage.contains_batch(batch_id) {
1120 warn!("Received signature for an older batch {batch_id}");
1122 Ok(None)
1123 } else {
1124 Err(anyhow!("Unknown batch ID '{batch_id}'"))
1125 };
1126
1127 (result, ProposedBatchState::Certified(id))
1128 }
1129 ProposedBatchState::None => {
1130 let result = if self.storage.contains_batch(batch_id) {
1131 warn!("Received signature for an older batch {batch_id}");
1133 Ok(None)
1134 } else {
1135 Err(anyhow!("Unknown batch ID '{batch_id}'"))
1136 };
1137
1138 (result, ProposedBatchState::None)
1139 }
1140 }
1141 }
1142
1143 async fn process_batch_signature_from_peer(
1152 &self,
1153 peer_ip: SocketAddr,
1154 batch_signature: BatchSignature<N>,
1155 ) -> Result<()> {
1156 self.check_proposed_batch_for_expiration()?;
1158
1159 let BatchSignature { batch_id, signature } = batch_signature;
1161
1162 let signer = signature.to_address();
1164
1165 if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1167 self.gateway.disconnect(peer_ip);
1169 bail!("Malicious peer - batch signature is from a different validator ({signer})");
1170 }
1171 if self.gateway.account().address() == signer {
1173 bail!("Invalid peer - received a batch signature from myself ({signer})");
1174 }
1175
1176 let self_ = self.clone();
1177 let Some(proposal) = spawn_blocking!({
1178 let mut proposed_batch = self_.proposed_batch.write();
1180
1181 let (result, new_state) =
1182 self_.add_signature_to_batch(std::mem::take(&mut *proposed_batch), peer_ip, batch_id, signature);
1183 *proposed_batch = new_state;
1184 result
1185 })?
1186 else {
1187 return Ok(());
1188 };
1189
1190 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1193
1194 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1196 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1199 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1201 return Err(e);
1202 }
1203
1204 #[cfg(feature = "metrics")]
1205 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1206 Ok(())
1207 }
1208
1209 async fn process_batch_certificate_from_peer(
1216 &self,
1217 peer_ip: SocketAddr,
1218 certificate: BatchCertificate<N>,
1219 ) -> Result<()> {
1220 if !self.gateway.is_authorized_validator_ip(peer_ip) {
1222 self.gateway.disconnect(peer_ip);
1224 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1225 }
1226 if self.storage.contains_certificate(certificate.id()) {
1228 return Ok(());
1229 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1231 self.storage.insert_unprocessed_certificate(certificate.clone())?;
1232 }
1233
1234 let author = certificate.author();
1236 let certificate_round = certificate.round();
1238 let committee_id = certificate.committee_id();
1240
1241 if self.gateway.account().address() == author {
1243 bail!("Received a batch certificate for myself ({author})");
1244 }
1245
1246 self.storage.check_incoming_certificate(&certificate)?;
1248
1249 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1261
1262 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1267
1268 let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1270 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1272
1273 let expected_committee_id = committee_lookback.id();
1275 if expected_committee_id != committee_id {
1276 self.gateway.disconnect(peer_ip);
1278 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1279 }
1280
1281 let should_advance = match &*self.latest_proposal_timestamp.read().await {
1285 Some((latest_round, _)) => *latest_round < certificate_round,
1287 None => true,
1289 };
1290
1291 let current_round = self.current_round();
1293
1294 if is_quorum && should_advance && certificate_round >= current_round {
1296 self.round_increment_notify.notify_one();
1298 }
1299 Ok(())
1300 }
1301}
1302
1303impl<N: Network> Primary<N> {
1304 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1313 let PrimaryReceiver {
1314 mut rx_batch_propose,
1315 mut rx_batch_signature,
1316 mut rx_batch_certified,
1317 mut rx_primary_ping,
1318 mut rx_unconfirmed_solution,
1319 mut rx_unconfirmed_transaction,
1320 } = primary_receiver;
1321
1322 let self_ = self.clone();
1324 self.spawn(async move {
1325 loop {
1326 tokio::time::sleep(PRIMARY_PING_INTERVAL).await;
1328
1329 let self__ = self_.clone();
1331 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1332 Ok(block_locators) => block_locators,
1333 Err(e) => {
1334 warn!("Failed to retrieve block locators - {e}");
1335 continue;
1336 }
1337 };
1338
1339 let primary_certificate = {
1341 let primary_address = self_.gateway.account().address();
1343
1344 let mut certificate = None;
1346 let mut current_round = self_.current_round();
1347 while certificate.is_none() {
1348 if current_round == 0 {
1350 break;
1351 }
1352 if let Some(primary_certificate) =
1354 self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1355 {
1356 certificate = Some(primary_certificate);
1357 } else {
1359 current_round = current_round.saturating_sub(1);
1360 }
1361 }
1362
1363 match certificate {
1365 Some(certificate) => certificate,
1366 None => continue,
1368 }
1369 };
1370
1371 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1373 self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1375 }
1376 });
1377
1378 let self_ = self.clone();
1380 self.spawn(async move {
1381 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1382 if self_.sync.is_synced() {
1384 trace!("Processing new primary ping from '{peer_ip}'");
1385 } else {
1386 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1387 continue;
1388 }
1389
1390 {
1392 let self_ = self_.clone();
1393 tokio::spawn(async move {
1394 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1396 else {
1397 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1398 return;
1399 };
1400 let id = fmt_id(primary_certificate.id());
1402 let round = primary_certificate.round();
1403 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1404 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1405 }
1406 });
1407 }
1408 }
1409 });
1410
1411 let self_ = self.clone();
1413 self.spawn(async move {
1414 loop {
1415 tokio::time::sleep(WORKER_PING_INTERVAL).await;
1416 if !self_.sync.is_synced() {
1418 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1419 continue;
1420 }
1421 for worker in self_.workers() {
1423 worker.broadcast_ping();
1424 }
1425 }
1426 });
1427
1428 let proposal_task = self.proposal_task.clone();
1430 let self_ = self.clone();
1431 self.spawn(async move { proposal_task.run(self_).await });
1432
1433 let self_ = self.clone();
1435 self.spawn(async move {
1436 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1437 if !self_.sync.is_synced() {
1439 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1440 continue;
1441 }
1442
1443 let self_ = self_.clone();
1445 tokio::spawn(async move {
1446 let round = batch_propose.round;
1448 if let Err(err) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1449 let err = err.context(format!("Cannot sign a batch at round {round} from '{peer_ip}'"));
1450 warn!("{}", flatten_error(err));
1451 }
1452 });
1453 }
1454 });
1455
1456 let self_ = self.clone();
1458 self.spawn(async move {
1459 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1460 if !self_.sync.is_synced() {
1462 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1463 continue;
1464 }
1465 let id = fmt_id(batch_signature.batch_id);
1471 if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1472 let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1473 warn!("{}", flatten_error(err));
1474 }
1475 }
1476 });
1477
1478 let self_ = self.clone();
1480 self.spawn(async move {
1481 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1482 if !self_.sync.is_synced() {
1484 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1485 continue;
1486 }
1487 let self_ = self_.clone();
1489 tokio::spawn(async move {
1490 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1492 warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1493 return;
1494 };
1495 let id = fmt_id(batch_certificate.id());
1497 let round = batch_certificate.round();
1498 if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1499 warn!(
1500 "{}",
1501 flatten_error(err.context(format!(
1502 "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1503 )))
1504 );
1505 }
1506 });
1507 }
1508 });
1509
1510 let self_ = self.clone();
1513 self.spawn(async move {
1514 loop {
1515 let round_start = Instant::now();
1516 let current_round = self_.current_round();
1517
1518 while self_.current_round() == current_round {
1520 let mut futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> =
1521 vec![Box::pin(self_.round_increment_notify.notified())];
1522
1523 if let Some(remaining_delay) = MAX_BATCH_DELAY.checked_sub(round_start.elapsed())
1524 && !remaining_delay.is_zero()
1525 {
1526 futures.push(Box::pin(tokio::time::sleep(remaining_delay)));
1527 }
1528 futures.push(Box::pin(tokio::time::sleep(MAX_LEADER_CERTIFICATE_DELAY)));
1533 if !self_.sync.is_synced() {
1534 futures.push(Box::pin(self_.sync.wait_for_synced()));
1535 }
1536 let _ = futures::future::select_all(futures).await;
1537
1538 if !self_.sync.is_synced() {
1539 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1540 continue;
1541 }
1542
1543 let next_round = current_round.saturating_add(1);
1544 let is_quorum_threshold_reached = {
1545 let authors = self_.storage.get_certificate_authors_for_round(current_round);
1546 if authors.is_empty() {
1547 continue;
1548 }
1549 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round)
1550 else {
1551 warn!("Failed to retrieve the committee lookback for round {current_round}");
1552 continue;
1553 };
1554 committee_lookback.is_quorum_threshold_reached(&authors)
1555 };
1556
1557 if is_quorum_threshold_reached {
1558 debug!("Quorum threshold reached for round {current_round}");
1559 if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1560 warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1561 }
1562 }
1563 }
1564 }
1565 });
1566
1567 let self_ = self.clone();
1569 self.spawn(async move {
1570 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1571 let Ok(checksum) = solution.to_checksum::<N>() else {
1573 error!("Failed to compute the checksum for the unconfirmed solution");
1574 continue;
1575 };
1576 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1578 error!("Unable to determine the worker ID for the unconfirmed solution");
1579 continue;
1580 };
1581 let self_ = self_.clone();
1582 tokio::spawn(async move {
1583 let worker = &self_.workers()[worker_id as usize];
1585 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1587 callback.send(result).ok();
1589 });
1590 }
1591 });
1592
1593 let self_ = self.clone();
1595 self.spawn(async move {
1596 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1597 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1598 let Ok(checksum) = transaction.to_checksum::<N>() else {
1600 error!("Failed to compute the checksum for the unconfirmed transaction");
1601 continue;
1602 };
1603 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1605 error!("Unable to determine the worker ID for the unconfirmed transaction");
1606 continue;
1607 };
1608 let self_ = self_.clone();
1609 tokio::spawn(async move {
1610 let worker = &self_.workers().get(worker_id as usize).expect("Invalid worker ID");
1612 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1614 callback.send(result).ok();
1616 });
1617 }
1618 });
1619 }
1620
1621 fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1623 let is_expired = match &*self.proposed_batch.read() {
1626 ProposedBatchState::Certifying(proposal) => proposal.round() < self.current_round(),
1627 _ => false,
1628 };
1629 if is_expired {
1631 let old = std::mem::replace(&mut *self.proposed_batch.write(), ProposedBatchState::None);
1633 if let ProposedBatchState::Certifying(proposal) = old {
1634 debug!("Cleared expired proposal for round {}", proposal.round());
1635 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1636 }
1637 }
1638 Ok(())
1639 }
1640
1641 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1643 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1645 let mut fast_forward_round = self.current_round();
1646 while fast_forward_round < next_round.saturating_sub(1) {
1648 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1650 *self.proposed_batch.write() = ProposedBatchState::None;
1652 }
1653 }
1654
1655 let current_round = self.current_round();
1657 if current_round < next_round {
1659 let is_ready = if let Some(cb) = self.primary_callback.get() {
1661 cb.try_advance_to_next_round(current_round)
1662 }
1663 else {
1665 self.storage.increment_to_next_round(current_round)?;
1667 true
1669 };
1670
1671 if is_ready && self.is_synced() {
1673 debug!("Primary is ready to propose the next round");
1674 self.proposal_task.signal();
1675 } else {
1676 debug!("Primary is not ready to propose the next round");
1677 }
1678 }
1679 Ok(())
1680 }
1681
1682 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1686 let current_round = self.current_round();
1688 if current_round + self.storage.max_gc_rounds() <= batch_round {
1690 bail!("Round {batch_round} is too far in the future")
1691 }
1692 if current_round > batch_round + 1 {
1696 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1697 }
1698 if let ProposedBatchState::Certifying(proposal) = &*self.proposed_batch.read()
1700 && proposal.round() > batch_round
1701 {
1702 bail!("Our primary at round {} is no longer signing for round {batch_round}", proposal.round())
1703 }
1704 Ok(())
1705 }
1706
1707 fn check_peer_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1710 ensure!(author != self.gateway.account().address(), "Peer cannot propose a batch that is authored by myself");
1711
1712 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1714 Some(certificate) => certificate.timestamp(),
1716 None => return Ok(()),
1718 };
1719
1720 let elapsed = timestamp
1722 .checked_sub(previous_timestamp)
1723 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1724 match elapsed < MIN_BATCH_DELAY.as_secs() as i64 {
1726 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1727 false => Ok(()),
1728 }
1729 }
1730
1731 fn check_own_proposal_timestamp(
1739 &self,
1740 previous_round: u64,
1741 previous_timestamp: i64,
1742 timestamp: i64,
1743 ) -> Result<bool> {
1744 let elapsed = timestamp
1746 .checked_sub(previous_timestamp)
1747 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1748
1749 Ok(elapsed >= MIN_BATCH_DELAY.as_secs() as i64)
1750 }
1751
1752 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1754 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1756
1757 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1760
1761 let round = certificate.round();
1763 let num_transmissions = certificate.transmission_ids().len();
1764
1765 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1767 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1768 debug!("Stored a batch certificate for round {}", certificate.round());
1769 *self.proposed_batch.write() = ProposedBatchState::None;
1772
1773 if let Some(cb) = self.primary_callback.get() {
1775 cb.add_new_certificate(certificate.clone()).await.with_context(|| {
1777 format!("Failed to insert our newly certified batch for round {round} into the DAG")
1778 })?;
1779 }
1780 self.gateway.broadcast(Event::BatchCertified(certificate.into()));
1782
1783 info!("Our batch with {num_transmissions} transmissions for round {round} was certified!");
1785
1786 #[cfg(feature = "metrics")]
1788 if let Some(start) = self.batch_propose_start.lock().take() {
1789 metrics::histogram(metrics::bft::BATCH_CERTIFICATION_LATENCY, start.elapsed().as_secs_f64());
1790 }
1791
1792 self.round_increment_notify.notify_one();
1794
1795 Ok(())
1796 }
1797
1798 fn insert_missing_transmissions_into_workers(
1800 &self,
1801 peer_ip: SocketAddr,
1802 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1803 ) -> Result<()> {
1804 assign_to_workers(self.workers(), transmissions, |worker, transmission_id, transmission| {
1806 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1807 })
1808 }
1809
1810 fn reinsert_transmissions_into_workers(
1812 &self,
1813 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1814 ) -> Result<()> {
1815 assign_to_workers(self.workers(), transmissions.into_iter(), |worker, transmission_id, transmission| {
1817 worker.reinsert(transmission_id, transmission);
1818 })
1819 }
1820
1821 #[async_recursion::async_recursion]
1831 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1832 &self,
1833 peer_ip: SocketAddr,
1834 certificate: BatchCertificate<N>,
1835 ) -> Result<()> {
1836 let batch_header = certificate.batch_header();
1838 let batch_round = batch_header.round();
1840
1841 if batch_round <= self.storage.gc_round() {
1843 return Ok(());
1844 }
1845 if self.storage.contains_certificate(certificate.id()) {
1847 return Ok(());
1848 }
1849
1850 if !IS_SYNCING && !self.is_synced() {
1852 bail!(
1853 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1854 fmt_id(certificate.id())
1855 );
1856 }
1857
1858 let missing_transmissions =
1860 self.sync_with_batch_header_from_peer::<IS_SYNCING, false>(peer_ip, batch_header).await?;
1861
1862 if !self.storage.contains_certificate(certificate.id()) {
1864 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1866 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1867 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1868 if let Some(cb) = self.primary_callback.get() {
1870 cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?;
1871 }
1872 self.round_increment_notify.notify_one();
1874 }
1875 Ok(())
1876 }
1877
1878 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool, const CHECK_PREVIOUS_CERTIFICATES: bool>(
1880 &self,
1881 peer_ip: SocketAddr,
1882 batch_header: &BatchHeader<N>,
1883 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1884 let batch_round = batch_header.round();
1886
1887 if batch_round <= self.storage.gc_round() {
1889 bail!("Round {batch_round} is too far in the past")
1890 }
1891
1892 if !IS_SYNCING && !self.is_synced() {
1894 bail!(
1895 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1896 fmt_id(batch_header.batch_id())
1897 );
1898 }
1899
1900 let is_quorum_threshold_reached = {
1902 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1903 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1904 committee_lookback.is_quorum_threshold_reached(&authors)
1905 };
1906
1907 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1912 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1914 if is_behind_schedule || is_peer_far_in_future {
1916 self.try_increment_to_the_next_round(batch_round)
1918 .await
1919 .with_context(|| "Failed to fast forward current round")?;
1920 }
1921
1922 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1924
1925 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1927
1928 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1930 missing_transmissions_handle,
1931 missing_previous_certificates_handle,
1932 ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1933
1934 let futures = missing_previous_certificates.into_iter().map(|batch_certificate| {
1938 let self_ = self.clone();
1939 async move {
1940 if CHECK_PREVIOUS_CERTIFICATES {
1945 self_.storage.check_incoming_certificate(&batch_certificate)?;
1946 }
1947 self_.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await
1949 }
1950 });
1951
1952 let mut latest_error = None;
1954 for result in join_all(futures).await.into_iter() {
1955 if let Err(err) = result {
1956 let err = err.context("Failed to fetch previous certificate");
1957 error!("{}", flatten_error(&err));
1958 latest_error = Some(err);
1959 }
1960 }
1961
1962 if let Some(err) = latest_error { Err(err) } else { Ok(missing_transmissions) }
1963 }
1964
1965 async fn fetch_missing_transmissions(
1968 &self,
1969 peer_ip: SocketAddr,
1970 batch_header: &BatchHeader<N>,
1971 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1972 if batch_header.round() <= self.storage.gc_round() {
1974 return Ok(Default::default());
1975 }
1976
1977 if self.storage.contains_batch(batch_header.batch_id()) {
1979 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1980 return Ok(Default::default());
1981 }
1982
1983 let workers = self.workers.clone();
1985
1986 let mut fetch_transmissions = FuturesUnordered::new();
1988
1989 let num_workers = self.num_workers();
1991 for transmission_id in batch_header.transmission_ids() {
1993 if !self.storage.contains_transmission(*transmission_id) {
1995 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1997 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1998 };
1999 let Some(worker) = workers.get().expect("No workers set").get(worker_id as usize) else {
2001 bail!("Unable to find worker {worker_id}")
2002 };
2003 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
2005 }
2006 }
2007
2008 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
2010 while let Some(result) = fetch_transmissions.next().await {
2012 let (transmission_id, transmission) = result?;
2014 transmissions.insert(transmission_id, transmission);
2016 }
2017 Ok(transmissions)
2019 }
2020
2021 async fn fetch_missing_previous_certificates(
2023 &self,
2024 peer_ip: SocketAddr,
2025 batch_header: &BatchHeader<N>,
2026 ) -> Result<HashSet<BatchCertificate<N>>> {
2027 let round = batch_header.round();
2029 if round == 1 || round <= self.storage.gc_round() + 1 {
2031 return Ok(Default::default());
2032 }
2033
2034 let missing_previous_certificates =
2036 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
2037 if !missing_previous_certificates.is_empty() {
2038 debug!(
2039 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
2040 missing_previous_certificates.len(),
2041 );
2042 }
2043 Ok(missing_previous_certificates)
2045 }
2046
2047 async fn fetch_missing_certificates(
2049 &self,
2050 peer_ip: SocketAddr,
2051 round: u64,
2052 certificate_ids: &IndexSet<Field<N>>,
2053 ) -> Result<HashSet<BatchCertificate<N>>> {
2054 let mut fetch_certificates = FuturesUnordered::new();
2056 let mut missing_certificates = HashSet::default();
2058 for certificate_id in certificate_ids {
2060 if self.ledger.contains_certificate(certificate_id)? {
2062 continue;
2063 }
2064 if self.storage.contains_certificate(*certificate_id) {
2066 continue;
2067 }
2068 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
2070 missing_certificates.insert(certificate);
2071 } else {
2072 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
2074 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
2077 }
2078 }
2079
2080 match fetch_certificates.is_empty() {
2082 true => return Ok(missing_certificates),
2083 false => trace!(
2084 "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
2085 fetch_certificates.len(),
2086 ),
2087 }
2088
2089 while let Some(result) = fetch_certificates.next().await {
2091 missing_certificates.insert(result?);
2093 }
2094 Ok(missing_certificates)
2096 }
2097}
2098
2099impl<N: Network> Primary<N> {
2100 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
2102 self.handles.lock().push(tokio::spawn(future));
2103 }
2104
2105 pub async fn shut_down(&self) {
2107 info!("Shutting down the primary...");
2108 self.primary_callback.clear();
2110 self.sync.shut_down().await;
2112 self.workers().iter().for_each(|worker| worker.shut_down());
2114 self.handles.lock().drain(..).for_each(|handle| handle.abort());
2116 let proposal_cache = {
2118 let proposal = match std::mem::replace(&mut *self.proposed_batch.write(), ProposedBatchState::None) {
2122 ProposedBatchState::Certifying(p) => Some(*p),
2123 _ => None,
2124 };
2125 let signed_proposals = self.signed_proposals.read().clone();
2126 let latest_round = proposal
2127 .as_ref()
2128 .map(Proposal::round)
2129 .unwrap_or(self.latest_proposal_timestamp.read().await.map(|(round, _)| round).unwrap_or(0));
2130 let pending_certificates = self.storage.get_pending_certificates();
2131 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
2132 };
2133 if let Err(err) = proposal_cache.store(&self.node_data_dir) {
2134 error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
2135 }
2136 self.gateway.shut_down().await;
2138 }
2139}
2140
2141#[cfg(test)]
2142mod tests {
2143 use super::{proposal_task::BatchPropose as _, *};
2144
2145 use snarkos_node_bft_ledger_service::MockLedgerService;
2146 use snarkos_node_bft_storage_service::BFTMemoryService;
2147 use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
2148 use snarkvm::{
2149 ledger::{
2150 committee::{Committee, MIN_VALIDATOR_STAKE},
2151 test_helpers::sample_execution_transaction_with_fee,
2152 },
2153 prelude::{Address, Signature},
2154 };
2155
2156 use bytes::Bytes;
2157 use indexmap::IndexSet;
2158 use rand::RngExt;
2159
2160 type CurrentNetwork = snarkvm::prelude::MainnetV0;
2161
2162 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2163 const COMMITTEE_SIZE: usize = 4;
2165 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2166 let mut members = IndexMap::new();
2167
2168 for i in 0..COMMITTEE_SIZE {
2169 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2170 let account = Account::new(rng).unwrap();
2171
2172 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.random_range(0..100)));
2173 accounts.push((socket_addr, account));
2174 }
2175
2176 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2177 }
2178
2179 fn primary_with_committee(
2181 account_index: usize,
2182 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2183 committee: Committee<CurrentNetwork>,
2184 height: u32,
2185 ) -> Primary<CurrentNetwork> {
2186 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2187 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10).unwrap();
2188
2189 let account = accounts[account_index].1.clone();
2191 let block_sync = Arc::new(BlockSync::new(ledger.clone(), ConnectionMode::Gateway));
2192 let primary =
2193 Primary::new(account, storage, ledger, block_sync, None, &[], false, NodeDataDir::new_test(None), None)
2194 .unwrap();
2195
2196 let worker = Worker::new(
2198 0, Arc::new(primary.gateway.clone()),
2200 primary.storage.clone(),
2201 primary.ledger.clone(),
2202 primary.proposed_batch.clone(),
2203 )
2204 .unwrap();
2205 let _ = primary.workers.set(vec![worker]);
2206 for a in accounts.iter().skip(account_index) {
2207 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2208 }
2209
2210 primary
2211 }
2212
2213 fn primary_without_handlers(
2214 rng: &mut TestRng,
2215 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2216 let (accounts, committee) = sample_committee(rng);
2217 let primary = primary_with_committee(
2218 0, &accounts,
2220 committee,
2221 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2222 );
2223
2224 (primary, accounts)
2225 }
2226
2227 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2229 let solution_id = rng.random::<u64>().into();
2231 let size = rng.random_range(1024..10 * 1024);
2233 let vec: Vec<u8> = (0..size).map(|_| rng.random::<u8>()).collect();
2235 let solution = Data::Buffer(Bytes::from(vec));
2236 (solution_id, solution)
2238 }
2239
2240 fn sample_unconfirmed_transaction(
2242 rng: &mut TestRng,
2243 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2244 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2245 let id = transaction.id();
2246
2247 (id, Data::Object(transaction))
2248 }
2249
2250 fn create_test_proposal(
2252 author: &Account<CurrentNetwork>,
2253 committee: Committee<CurrentNetwork>,
2254 round: u64,
2255 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2256 timestamp: i64,
2257 num_transactions: u64,
2258 rng: &mut TestRng,
2259 ) -> Proposal<CurrentNetwork> {
2260 let mut transmission_ids = IndexSet::new();
2261 let mut transmissions = IndexMap::new();
2262
2263 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2265 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2266 let solution_transmission_id = (solution_id, solution_checksum).into();
2267 transmission_ids.insert(solution_transmission_id);
2268 transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2269
2270 for _ in 0..num_transactions {
2272 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2273 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2274 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2275 transmission_ids.insert(transaction_transmission_id);
2276 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2277 }
2278
2279 let private_key = author.private_key();
2281 let batch_header = BatchHeader::new(
2283 private_key,
2284 round,
2285 timestamp,
2286 committee.id(),
2287 transmission_ids,
2288 previous_certificate_ids,
2289 rng,
2290 )
2291 .unwrap();
2292 Proposal::new(committee, batch_header, transmissions).unwrap()
2294 }
2295
2296 fn peer_signatures_for_proposal(
2299 primary: &Primary<CurrentNetwork>,
2300 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2301 rng: &mut TestRng,
2302 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2303 let mut signatures = Vec::with_capacity(accounts.len() - 1);
2305 for (socket_addr, account) in accounts {
2306 if account.address() == primary.gateway.account().address() {
2307 continue;
2308 }
2309 let batch_id = primary.proposed_batch.read().as_proposal().unwrap().batch_id();
2310 let signature = account.sign(&[batch_id], rng).unwrap();
2311 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2312 }
2313
2314 signatures
2315 }
2316
2317 fn peer_signatures_for_batch(
2319 primary_address: Address<CurrentNetwork>,
2320 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2321 batch_id: Field<CurrentNetwork>,
2322 rng: &mut TestRng,
2323 ) -> IndexSet<Signature<CurrentNetwork>> {
2324 let mut signatures = IndexSet::new();
2325 for (_, account) in accounts {
2326 if account.address() == primary_address {
2327 continue;
2328 }
2329 let signature = account.sign(&[batch_id], rng).unwrap();
2330 signatures.insert(signature);
2331 }
2332 signatures
2333 }
2334
2335 fn create_batch_certificate(
2337 primary_address: Address<CurrentNetwork>,
2338 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2339 round: u64,
2340 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2341 rng: &mut TestRng,
2342 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2343 let timestamp = now();
2344
2345 let author =
2346 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2347 let private_key = author.private_key();
2348
2349 let committee_id = Field::rand(rng);
2350 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2351 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2352 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2353 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2354
2355 let solution_transmission_id = (solution_id, solution_checksum).into();
2356 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2357
2358 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2359 let transmissions = [
2360 (solution_transmission_id, Transmission::Solution(solution)),
2361 (transaction_transmission_id, Transmission::Transaction(transaction)),
2362 ]
2363 .into();
2364
2365 let batch_header = BatchHeader::new(
2366 private_key,
2367 round,
2368 timestamp,
2369 committee_id,
2370 transmission_ids,
2371 previous_certificate_ids,
2372 rng,
2373 )
2374 .unwrap();
2375 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2376 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2377 (certificate, transmissions)
2378 }
2379
2380 fn store_certificate_chain(
2382 primary: &Primary<CurrentNetwork>,
2383 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2384 round: u64,
2385 rng: &mut TestRng,
2386 ) -> IndexSet<Field<CurrentNetwork>> {
2387 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2388 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2389 for cur_round in 1..round {
2390 for (_, account) in accounts.iter() {
2391 let (certificate, transmissions) = create_batch_certificate(
2392 account.address(),
2393 accounts,
2394 cur_round,
2395 previous_certificates.clone(),
2396 rng,
2397 );
2398 next_certificates.insert(certificate.id());
2399 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2400 }
2401
2402 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2403 previous_certificates = next_certificates;
2404 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2405 }
2406
2407 previous_certificates
2408 }
2409
2410 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2413 for (addr, acct) in accounts.iter().skip(1) {
2415 primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2416 }
2417 }
2418
2419 #[test_log::test(tokio::test)]
2420 async fn test_propose_batch() {
2421 let mut rng = TestRng::default();
2422 let (primary, _) = primary_without_handlers(&mut rng);
2423
2424 assert!(primary.proposed_batch.read().is_none());
2426
2427 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2429 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2430
2431 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2433 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2434
2435 assert!(primary.propose_batch().await.is_ok());
2437 assert!(primary.proposed_batch.read().is_proposed());
2438 }
2439
2440 #[test_log::test(tokio::test)]
2441 async fn test_propose_batch_with_no_transmissions() {
2442 let mut rng = TestRng::default();
2443 let (primary, _) = primary_without_handlers(&mut rng);
2444
2445 assert!(primary.proposed_batch.read().is_none());
2447
2448 assert!(primary.propose_batch().await.is_ok());
2450 assert!(primary.proposed_batch.read().is_proposed());
2451 }
2452
2453 #[test_log::test(tokio::test)]
2454 async fn test_propose_batch_in_round() {
2455 let round = 3;
2456 let mut rng = TestRng::default();
2457 let (primary, accounts) = primary_without_handlers(&mut rng);
2458
2459 store_certificate_chain(&primary, &accounts, round, &mut rng);
2461
2462 tokio::time::sleep(MIN_BATCH_DELAY).await;
2464
2465 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2467 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2468
2469 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2471 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2472
2473 assert!(primary.propose_batch().await.is_ok());
2475 assert!(primary.proposed_batch.read().is_proposed());
2476 }
2477
2478 #[test_log::test(tokio::test)]
2479 async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2480 let round = 3;
2481 let prev_round = round - 1;
2482 let mut rng = TestRng::default();
2483 let (primary, accounts) = primary_without_handlers(&mut rng);
2484 let peer_account = &accounts[1];
2485 let peer_ip = peer_account.0;
2486
2487 store_certificate_chain(&primary, &accounts, round, &mut rng);
2489
2490 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2492
2493 let mut num_transmissions_in_previous_round = 0;
2495
2496 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2498 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2499 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2500 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2501
2502 primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2504 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2505
2506 assert_eq!(primary.workers()[0].num_transmissions(), 2);
2508
2509 for (_, account) in accounts.iter() {
2511 let (certificate, transmissions) = create_batch_certificate(
2512 account.address(),
2513 &accounts,
2514 round,
2515 previous_certificate_ids.clone(),
2516 &mut rng,
2517 );
2518
2519 for (transmission_id, transmission) in transmissions.iter() {
2521 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2522 }
2523
2524 num_transmissions_in_previous_round += transmissions.len();
2526 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2527 }
2528
2529 tokio::time::sleep(MIN_BATCH_DELAY).await;
2531
2532 assert!(primary.storage.increment_to_next_round(round).is_ok());
2534
2535 assert_eq!(primary.workers()[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2537
2538 assert!(primary.propose_batch().await.is_ok());
2540
2541 let proposed_transmissions = primary.proposed_batch.read().as_proposal().unwrap().transmissions().clone();
2543 assert_eq!(proposed_transmissions.len(), 2);
2544 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2545 assert!(
2546 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2547 );
2548 }
2549
2550 #[test_log::test(tokio::test)]
2551 async fn test_propose_batch_over_spend_limit() {
2552 let mut rng = TestRng::default();
2553
2554 let (accounts, committee) = sample_committee(&mut rng);
2556 let primary = primary_with_committee(
2557 0,
2558 &accounts,
2559 committee.clone(),
2560 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2561 );
2562
2563 assert!(primary.proposed_batch.read().is_none());
2565 primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2567
2568 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2570 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2571
2572 for _i in 0..5 {
2573 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2574 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2576 }
2577
2578 assert!(primary.propose_batch().await.is_ok());
2580 assert_eq!(primary.proposed_batch.read().as_proposal().unwrap().transmissions().len(), 3);
2582 assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2584 }
2585
2586 #[test_log::test(tokio::test)]
2587 async fn test_batch_propose_from_peer() {
2588 let mut rng = TestRng::default();
2589 let (primary, accounts) = primary_without_handlers(&mut rng);
2590
2591 let round = 1;
2593 let peer_account = &accounts[1];
2594 let peer_ip = peer_account.0;
2595 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2596 let proposal = create_test_proposal(
2597 &peer_account.1,
2598 primary.ledger.current_committee().unwrap(),
2599 round,
2600 Default::default(),
2601 timestamp,
2602 1,
2603 &mut rng,
2604 );
2605
2606 for (transmission_id, transmission) in proposal.transmissions() {
2608 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2609 }
2610
2611 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2613
2614 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2617 primary.sync.testing_only_set_sync_height_testing_only(20);
2618
2619 assert!(
2621 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2622 );
2623 }
2624
2625 #[test_log::test(tokio::test)]
2626 async fn test_batch_propose_from_peer_when_not_synced() {
2627 let mut rng = TestRng::default();
2628 let (primary, accounts) = primary_without_handlers(&mut rng);
2629
2630 let round = 1;
2632 let peer_account = &accounts[1];
2633 let peer_ip = peer_account.0;
2634 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2635 let proposal = create_test_proposal(
2636 &peer_account.1,
2637 primary.ledger.current_committee().unwrap(),
2638 round,
2639 Default::default(),
2640 timestamp,
2641 1,
2642 &mut rng,
2643 );
2644
2645 for (transmission_id, transmission) in proposal.transmissions() {
2647 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2648 }
2649
2650 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2652
2653 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2655
2656 assert!(
2658 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2659 );
2660 }
2661
2662 #[test_log::test(tokio::test)]
2663 async fn test_batch_propose_from_peer_in_round() {
2664 let round = 2;
2665 let mut rng = TestRng::default();
2666 let (primary, accounts) = primary_without_handlers(&mut rng);
2667
2668 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2670
2671 let peer_account = &accounts[1];
2673 let peer_ip = peer_account.0;
2674 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2675 let proposal = create_test_proposal(
2676 &peer_account.1,
2677 primary.ledger.current_committee().unwrap(),
2678 round,
2679 previous_certificates,
2680 timestamp,
2681 1,
2682 &mut rng,
2683 );
2684
2685 for (transmission_id, transmission) in proposal.transmissions() {
2687 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2688 }
2689
2690 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2692
2693 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2696 primary.sync.testing_only_set_sync_height_testing_only(20);
2697
2698 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2700 }
2701
2702 #[test_log::test(tokio::test)]
2703 async fn test_batch_propose_from_peer_wrong_round() {
2704 let mut rng = TestRng::default();
2705 let (primary, accounts) = primary_without_handlers(&mut rng);
2706
2707 let round = 1;
2709 let peer_account = &accounts[1];
2710 let peer_ip = peer_account.0;
2711 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2712 let proposal = create_test_proposal(
2713 &peer_account.1,
2714 primary.ledger.current_committee().unwrap(),
2715 round,
2716 Default::default(),
2717 timestamp,
2718 1,
2719 &mut rng,
2720 );
2721
2722 for (transmission_id, transmission) in proposal.transmissions() {
2724 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2725 }
2726
2727 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2729 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2731 primary.sync.testing_only_set_sync_height_testing_only(20);
2732
2733 assert!(
2735 primary
2736 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2737 round: round + 1,
2738 batch_header: Data::Object(proposal.batch_header().clone())
2739 })
2740 .await
2741 .is_err()
2742 );
2743 }
2744
2745 #[test_log::test(tokio::test)]
2746 async fn test_batch_propose_from_peer_in_round_wrong_round() {
2747 let round = 4;
2748 let mut rng = TestRng::default();
2749 let (primary, accounts) = primary_without_handlers(&mut rng);
2750
2751 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2753
2754 let peer_account = &accounts[1];
2756 let peer_ip = peer_account.0;
2757 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2758 let proposal = create_test_proposal(
2759 &peer_account.1,
2760 primary.ledger.current_committee().unwrap(),
2761 round,
2762 previous_certificates,
2763 timestamp,
2764 1,
2765 &mut rng,
2766 );
2767
2768 for (transmission_id, transmission) in proposal.transmissions() {
2770 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2771 }
2772
2773 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2775 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2777 primary.sync.testing_only_set_sync_height_testing_only(0);
2778
2779 assert!(
2781 primary
2782 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2783 round: round + 1,
2784 batch_header: Data::Object(proposal.batch_header().clone())
2785 })
2786 .await
2787 .is_err()
2788 );
2789 }
2790
2791 #[test_log::test(tokio::test)]
2793 async fn test_batch_propose_from_peer_with_past_timestamp() {
2794 let round = 2;
2795 let mut rng = TestRng::default();
2796 let (primary, accounts) = primary_without_handlers(&mut rng);
2797
2798 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2800
2801 let peer_account = &accounts[1];
2803 let peer_ip = peer_account.0;
2804
2805 let last_timestamp = primary
2809 .storage
2810 .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2811 .expect("No previous proposal exists")
2812 .timestamp();
2813 let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY.as_secs() as i64) - 1;
2814
2815 let proposal = create_test_proposal(
2816 &peer_account.1,
2817 primary.ledger.current_committee().unwrap(),
2818 round,
2819 previous_certificates,
2820 invalid_timestamp,
2821 1,
2822 &mut rng,
2823 );
2824
2825 for (transmission_id, transmission) in proposal.transmissions() {
2827 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2828 }
2829
2830 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2832 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2834 primary.sync.testing_only_set_sync_height_testing_only(0);
2835
2836 assert!(
2838 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2839 );
2840 }
2841
2842 #[test_log::test(tokio::test)]
2843 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2844 let round = 3;
2845 let mut rng = TestRng::default();
2846 let (primary, _) = primary_without_handlers(&mut rng);
2847
2848 assert!(primary.proposed_batch.read().is_none());
2850
2851 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2853 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2854
2855 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2857 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2858
2859 let (old_proposal_round, old_proposal_timestamp) = primary
2861 .latest_proposal_timestamp
2862 .read()
2863 .await
2864 .map(|(round, timestamp)| (round, timestamp))
2865 .unwrap_or((0, 0));
2866 *primary.latest_proposal_timestamp.write().await =
2867 Some((round + 1, old_proposal_timestamp + MIN_BATCH_DELAY.as_secs() as i64));
2868
2869 assert!(primary.propose_batch().await.is_ok());
2871 assert!(primary.proposed_batch.read().is_none());
2872
2873 *primary.latest_proposal_timestamp.write().await = Some((old_proposal_round, old_proposal_timestamp));
2875
2876 assert!(primary.propose_batch().await.is_ok());
2878 assert!(primary.proposed_batch.read().is_proposed());
2879 }
2880
2881 #[test_log::test(tokio::test)]
2882 async fn test_propose_batch_with_storage_round_behind_proposal() {
2883 let round = 5;
2884 let mut rng = TestRng::default();
2885 let (primary, accounts) = primary_without_handlers(&mut rng);
2886
2887 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2889
2890 let timestamp = now();
2892 let proposal = create_test_proposal(
2893 primary.gateway.account(),
2894 primary.ledger.current_committee().unwrap(),
2895 round + 1,
2896 previous_certificates,
2897 timestamp,
2898 1,
2899 &mut rng,
2900 );
2901
2902 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2904
2905 assert!(primary.propose_batch().await.is_ok());
2907 assert!(primary.proposed_batch.read().is_proposed());
2908 assert!(primary.proposed_batch.read().as_proposal().unwrap().round() > primary.current_round());
2909 }
2910
2911 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2912 async fn test_batch_signature_from_peer() {
2913 let mut rng = TestRng::default();
2914 let (primary, accounts) = primary_without_handlers(&mut rng);
2915 map_account_addresses(&primary, &accounts);
2916
2917 let round = 1;
2919 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2920 let proposal = create_test_proposal(
2921 primary.gateway.account(),
2922 primary.ledger.current_committee().unwrap(),
2923 round,
2924 Default::default(),
2925 timestamp,
2926 1,
2927 &mut rng,
2928 );
2929
2930 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2932
2933 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2935
2936 for (socket_addr, signature) in signatures {
2938 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2939 }
2940
2941 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2943 primary.try_increment_to_the_next_round(round + 1).await.unwrap();
2945 assert_eq!(primary.current_round(), round + 1);
2947 }
2948
2949 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2950 async fn test_batch_signature_from_peer_in_round() {
2951 let round = 5;
2952 let mut rng = TestRng::default();
2953 let (primary, accounts) = primary_without_handlers(&mut rng);
2954 map_account_addresses(&primary, &accounts);
2955
2956 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2958
2959 let timestamp = now();
2961 let proposal = create_test_proposal(
2962 primary.gateway.account(),
2963 primary.ledger.current_committee().unwrap(),
2964 round,
2965 previous_certificates,
2966 timestamp,
2967 1,
2968 &mut rng,
2969 );
2970
2971 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2973
2974 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2976
2977 for (socket_addr, signature) in signatures {
2979 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2980 }
2981
2982 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2984 primary.try_increment_to_the_next_round(round + 1).await.unwrap();
2986 assert_eq!(primary.current_round(), round + 1);
2988 }
2989
2990 #[test_log::test(tokio::test)]
2991 async fn test_batch_signature_from_peer_no_quorum() {
2992 let mut rng = TestRng::default();
2993 let (primary, accounts) = primary_without_handlers(&mut rng);
2994 map_account_addresses(&primary, &accounts);
2995
2996 let round = 1;
2998 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2999 let proposal = create_test_proposal(
3000 primary.gateway.account(),
3001 primary.ledger.current_committee().unwrap(),
3002 round,
3003 Default::default(),
3004 timestamp,
3005 1,
3006 &mut rng,
3007 );
3008
3009 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
3011
3012 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
3014
3015 let (socket_addr, signature) = signatures.first().unwrap();
3017 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
3018
3019 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3021 assert_eq!(primary.current_round(), round);
3023 }
3024
3025 #[test_log::test(tokio::test)]
3026 async fn test_batch_signature_from_peer_in_round_no_quorum() {
3027 let round = 7;
3028 let mut rng = TestRng::default();
3029 let (primary, accounts) = primary_without_handlers(&mut rng);
3030 map_account_addresses(&primary, &accounts);
3031
3032 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
3034
3035 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3037 let proposal = create_test_proposal(
3038 primary.gateway.account(),
3039 primary.ledger.current_committee().unwrap(),
3040 round,
3041 previous_certificates,
3042 timestamp,
3043 1,
3044 &mut rng,
3045 );
3046
3047 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
3049
3050 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
3052
3053 let (socket_addr, signature) = signatures.first().unwrap();
3055 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
3056
3057 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3059 assert_eq!(primary.current_round(), round);
3061 }
3062
3063 #[test_log::test(tokio::test)]
3068 async fn test_batch_signature_from_peer_batch_being_certified() {
3069 let mut rng = TestRng::default();
3070 let (primary, accounts) = primary_without_handlers(&mut rng);
3071 map_account_addresses(&primary, &accounts);
3072
3073 let round = 1;
3075 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3076 let proposal = create_test_proposal(
3077 primary.gateway.account(),
3078 primary.ledger.current_committee().unwrap(),
3079 round,
3080 Default::default(),
3081 timestamp,
3082 1,
3083 &mut rng,
3084 );
3085 let batch_id = proposal.batch_id();
3086
3087 *primary.proposed_batch.write() = ProposedBatchState::Certified(batch_id);
3089
3090 let (socket_addr, account) =
3092 accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3093 let signature = account.sign(&[batch_id], &mut rng).unwrap();
3094 let batch_signature = BatchSignature::new(batch_id, signature);
3095
3096 assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_ok());
3098 assert!(matches!(&*primary.proposed_batch.read(), ProposedBatchState::Certified(id) if *id == batch_id));
3100 }
3101
3102 #[test_log::test(tokio::test)]
3105 async fn test_batch_signature_from_peer_unknown_id_while_certifying() {
3106 let mut rng = TestRng::default();
3107 let (primary, accounts) = primary_without_handlers(&mut rng);
3108 map_account_addresses(&primary, &accounts);
3109
3110 let round = 1;
3112 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3113 let proposal_a = create_test_proposal(
3114 primary.gateway.account(),
3115 primary.ledger.current_committee().unwrap(),
3116 round,
3117 Default::default(),
3118 timestamp,
3119 1,
3120 &mut rng,
3121 );
3122 let proposal_b = create_test_proposal(
3123 primary.gateway.account(),
3124 primary.ledger.current_committee().unwrap(),
3125 round,
3126 Default::default(),
3127 timestamp,
3128 1,
3129 &mut rng,
3130 );
3131 let batch_id_a = proposal_a.batch_id();
3132 let batch_id_b = proposal_b.batch_id();
3133 assert_ne!(batch_id_a, batch_id_b);
3134
3135 *primary.proposed_batch.write() = ProposedBatchState::Certified(batch_id_a);
3137
3138 let (socket_addr, account) =
3140 accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3141 let signature = account.sign(&[batch_id_b], &mut rng).unwrap();
3142 let batch_signature = BatchSignature::new(batch_id_b, signature);
3143
3144 assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_err());
3146 }
3147
3148 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3151 async fn test_batch_signature_from_peer_already_certified() {
3152 let mut rng = TestRng::default();
3153 let (primary, accounts) = primary_without_handlers(&mut rng);
3154 map_account_addresses(&primary, &accounts);
3155
3156 let round = 1;
3158 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3159 let old_proposal = create_test_proposal(
3160 primary.gateway.account(),
3161 primary.ledger.current_committee().unwrap(),
3162 round,
3163 Default::default(),
3164 timestamp,
3165 1,
3166 &mut rng,
3167 );
3168 let old_batch_id = old_proposal.batch_id();
3169 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(old_proposal));
3170 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
3171 for (socket_addr, signature) in signatures {
3172 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
3173 }
3174 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3176
3177 let new_proposal = create_test_proposal(
3179 primary.gateway.account(),
3180 primary.ledger.current_committee().unwrap(),
3181 round,
3182 Default::default(),
3183 timestamp,
3184 1,
3185 &mut rng,
3186 );
3187 assert_ne!(new_proposal.batch_id(), old_batch_id);
3188 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(new_proposal));
3189
3190 let (socket_addr, account) =
3192 accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3193 let signature = account.sign(&[old_batch_id], &mut rng).unwrap();
3194 let batch_signature = BatchSignature::new(old_batch_id, signature);
3195
3196 assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_ok());
3198 }
3199
3200 #[test_log::test(tokio::test)]
3201 async fn test_insert_certificate_with_aborted_transmissions() {
3202 let round = 3;
3203 let prev_round = round - 1;
3204 let mut rng = TestRng::default();
3205 let (primary, accounts) = primary_without_handlers(&mut rng);
3206 let peer_account = &accounts[1];
3207 let peer_ip = peer_account.0;
3208
3209 store_certificate_chain(&primary, &accounts, round, &mut rng);
3211
3212 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
3214
3215 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
3217 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
3218
3219 primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
3221 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
3222
3223 assert_eq!(primary.workers()[0].num_transmissions(), 2);
3225
3226 let account = accounts[0].1.clone();
3228 let (certificate, transmissions) =
3229 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3230 let certificate_id = certificate.id();
3231
3232 let mut aborted_transmissions = HashSet::new();
3234 let mut transmissions_without_aborted = HashMap::new();
3235 for (transmission_id, transmission) in transmissions.clone() {
3236 match rng.random::<bool>() || aborted_transmissions.is_empty() {
3237 true => {
3238 aborted_transmissions.insert(transmission_id);
3240 }
3241 false => {
3242 transmissions_without_aborted.insert(transmission_id, transmission);
3244 }
3245 };
3246 }
3247
3248 for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3250 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3251 }
3252
3253 assert!(
3255 primary
3256 .storage
3257 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3258 .is_err()
3259 );
3260 assert!(
3261 primary
3262 .storage
3263 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3264 .is_err()
3265 );
3266
3267 primary
3269 .storage
3270 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3271 .unwrap();
3272
3273 assert!(primary.storage.contains_certificate(certificate_id));
3275 for aborted_transmission_id in aborted_transmissions {
3277 assert!(primary.storage.contains_transmission(aborted_transmission_id));
3278 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3279 }
3280 }
3281
3282 #[test]
3288 fn test_add_signature_to_batch_none_state() {
3289 let mut rng = TestRng::default();
3290 let (primary, accounts) = primary_without_handlers(&mut rng);
3291
3292 let peer_ip = accounts[1].0;
3293 let batch_id = Field::rand(&mut rng);
3294 let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3295
3296 let (result, new_state) =
3297 primary.add_signature_to_batch(ProposedBatchState::None, peer_ip, batch_id, signature);
3298
3299 assert!(result.is_err());
3300 assert_eq!(new_state, ProposedBatchState::None);
3301 }
3302
3303 #[test]
3305 fn test_add_signature_to_batch_certified_matching_id() {
3306 let mut rng = TestRng::default();
3307 let (primary, accounts) = primary_without_handlers(&mut rng);
3308
3309 let peer_ip = accounts[1].0;
3310 let batch_id = Field::rand(&mut rng);
3311 let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3312
3313 let (result, new_state) =
3314 primary.add_signature_to_batch(ProposedBatchState::Certified(batch_id), peer_ip, batch_id, signature);
3315
3316 assert!(result.unwrap().is_none());
3317 assert_eq!(new_state, ProposedBatchState::Certified(batch_id));
3318 }
3319
3320 #[test]
3322 fn test_add_signature_to_batch_certified_different_id() {
3323 let mut rng = TestRng::default();
3324 let (primary, accounts) = primary_without_handlers(&mut rng);
3325
3326 let peer_ip = accounts[1].0;
3327 let certified_id = Field::rand(&mut rng);
3328 let other_id = Field::rand(&mut rng);
3329 let signature = accounts[1].1.sign(&[other_id], &mut rng).unwrap();
3330
3331 let (result, new_state) =
3332 primary.add_signature_to_batch(ProposedBatchState::Certified(certified_id), peer_ip, other_id, signature);
3333
3334 assert!(result.is_err());
3335 assert_eq!(new_state, ProposedBatchState::Certified(certified_id));
3336 }
3337
3338 #[tokio::test(flavor = "multi_thread")]
3341 async fn test_add_signature_to_batch_certifying_different_id_in_storage() {
3342 let round = 1;
3343 let mut rng = TestRng::default();
3344 let (primary, accounts) = primary_without_handlers(&mut rng);
3345 map_account_addresses(&primary, &accounts);
3346
3347 let proposal = create_test_proposal(
3349 primary.gateway.account(),
3350 primary.ledger.current_committee().unwrap(),
3351 round,
3352 Default::default(),
3353 now(),
3354 0,
3355 &mut rng,
3356 );
3357 let proposal_batch_id = proposal.batch_id();
3358
3359 let (certificate, transmissions) =
3361 create_batch_certificate(accounts[1].1.address(), &accounts, round, Default::default(), &mut rng);
3362 let stored_batch_id = certificate.batch_id();
3363 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
3364
3365 let peer_ip = accounts[1].0;
3366 let signature = accounts[1].1.sign(&[stored_batch_id], &mut rng).unwrap();
3367
3368 let (result, new_state) = primary.add_signature_to_batch(
3369 ProposedBatchState::Certifying(Box::new(proposal)),
3370 peer_ip,
3371 stored_batch_id,
3372 signature,
3373 );
3374
3375 assert!(result.unwrap().is_none());
3376 assert_eq!(new_state.as_proposal().unwrap().batch_id(), proposal_batch_id);
3378 }
3379
3380 #[test]
3383 fn test_add_signature_to_batch_certifying_different_id_unknown() {
3384 let mut rng = TestRng::default();
3385 let (primary, accounts) = primary_without_handlers(&mut rng);
3386
3387 let proposal = create_test_proposal(
3388 primary.gateway.account(),
3389 primary.ledger.current_committee().unwrap(),
3390 1,
3391 Default::default(),
3392 now(),
3393 0,
3394 &mut rng,
3395 );
3396 let proposal_batch_id = proposal.batch_id();
3397
3398 let peer_ip = accounts[1].0;
3399 let unknown_id = Field::rand(&mut rng);
3400 let signature = accounts[1].1.sign(&[unknown_id], &mut rng).unwrap();
3401
3402 let (result, new_state) = primary.add_signature_to_batch(
3403 ProposedBatchState::Certifying(Box::new(proposal)),
3404 peer_ip,
3405 unknown_id,
3406 signature,
3407 );
3408
3409 assert!(result.is_err());
3410 assert_eq!(new_state.as_proposal().unwrap().batch_id(), proposal_batch_id);
3411 }
3412
3413 #[test]
3415 fn test_add_signature_to_batch_certifying_matching_no_quorum() {
3416 let mut rng = TestRng::default();
3417 let (primary, accounts) = primary_without_handlers(&mut rng);
3418 map_account_addresses(&primary, &accounts);
3419
3420 let proposal = create_test_proposal(
3421 primary.gateway.account(),
3422 primary.ledger.current_committee().unwrap(),
3423 1,
3424 Default::default(),
3425 now(),
3426 0,
3427 &mut rng,
3428 );
3429 let batch_id = proposal.batch_id();
3430
3431 let peer_ip = accounts[1].0;
3433 let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3434
3435 let (result, new_state) = primary.add_signature_to_batch(
3436 ProposedBatchState::Certifying(Box::new(proposal)),
3437 peer_ip,
3438 batch_id,
3439 signature,
3440 );
3441
3442 assert!(result.unwrap().is_none());
3443 assert_eq!(new_state.as_proposal().unwrap().batch_id(), batch_id);
3444 }
3445
3446 #[test]
3449 fn test_add_signature_to_batch_certifying_matching_quorum_reached() {
3450 let mut rng = TestRng::default();
3451 let (primary, accounts) = primary_without_handlers(&mut rng);
3452 map_account_addresses(&primary, &accounts);
3453
3454 let proposal = create_test_proposal(
3455 primary.gateway.account(),
3456 primary.ledger.current_committee().unwrap(),
3457 1,
3458 Default::default(),
3459 now(),
3460 0,
3461 &mut rng,
3462 );
3463 let batch_id = proposal.batch_id();
3464
3465 let peers: Vec<_> =
3467 accounts.iter().filter(|(_, a)| a.address() != primary.gateway.account().address()).collect();
3468 let mut state = ProposedBatchState::Certifying(Box::new(proposal));
3469 let mut final_result = None;
3470
3471 for (peer_ip, peer_account) in &peers {
3472 let signature = peer_account.sign(&[batch_id], &mut rng).unwrap();
3473 let (result, new_state) = primary.add_signature_to_batch(state, *peer_ip, batch_id, signature);
3474 state = new_state;
3475 if result.as_ref().unwrap().is_some() {
3476 final_result = Some(result);
3477 break;
3478 }
3479 }
3480
3481 let proposal = final_result.expect("quorum should be reached").unwrap().unwrap();
3483 assert_eq!(proposal.batch_id(), batch_id);
3484 assert_eq!(state, ProposedBatchState::Certified(batch_id));
3485 }
3486}