1mod proposal_task;
17pub use proposal_task::ProposalTask;
18
19use crate::{
20 Gateway,
21 MAX_BATCH_DELAY,
22 MAX_LEADER_CERTIFICATE_DELAY,
23 MAX_WORKERS,
24 MIN_BATCH_DELAY,
25 PRIMARY_PING_INTERVAL,
26 Sync,
27 Transport,
28 WORKER_PING_INTERVAL,
29 Worker,
30 events::{BatchPropose, BatchSignature, Event},
31 helpers::{
32 PrimaryReceiver,
33 PrimarySender,
34 Proposal,
35 ProposalCache,
36 SignedProposals,
37 Storage,
38 assign_to_worker,
39 assign_to_workers,
40 fmt_id,
41 init_sync_channels,
42 init_worker_channels,
43 now,
44 },
45 spawn_blocking,
46 sync::SyncCallback,
47};
48
49use snarkos_account::Account;
50use snarkos_node_bft_events::PrimaryPing;
51use snarkos_node_bft_ledger_service::LedgerService;
52#[cfg(test)]
53use snarkos_node_network::ConnectionMode;
54use snarkos_node_network::PeerPoolHandling;
55use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
56use snarkos_utilities::{CallbackHandle, NodeDataDir};
57
58use snarkvm::{
59 console::{
60 prelude::*,
61 types::{Address, Field},
62 },
63 ledger::{
64 block::Transaction,
65 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
66 puzzle::{Solution, SolutionID},
67 },
68 prelude::{Signature, committee::Committee},
69 utilities::flatten_error,
70};
71
72use anyhow::Context;
73use colored::Colorize;
74use futures::stream::{FuturesUnordered, StreamExt};
75use indexmap::{IndexMap, IndexSet};
76#[cfg(feature = "locktick")]
77use locktick::{
78 parking_lot::{Mutex, RwLock},
79 tokio::RwLock as TRwLock,
80};
81#[cfg(not(feature = "locktick"))]
82use parking_lot::{Mutex, RwLock};
83#[cfg(not(feature = "serial"))]
84use rayon::prelude::*;
85use std::{
86 collections::{HashMap, HashSet},
87 future::Future,
88 net::SocketAddr,
89 pin::Pin,
90 sync::{Arc, OnceLock},
91 time::Instant,
92};
93#[cfg(not(feature = "locktick"))]
94use tokio::sync::RwLock as TRwLock;
95use tokio::{sync::Notify, task::JoinHandle};
96
97#[derive(Debug, PartialEq, Eq)]
99pub enum ProposedBatchState<N: Network> {
100 None,
102 Certifying(Box<Proposal<N>>),
104 Certified(Field<N>),
107}
108
109impl<N: Network> Default for ProposedBatchState<N> {
110 fn default() -> Self {
111 Self::None
112 }
113}
114
115impl<N: Network> ProposedBatchState<N> {
116 pub fn is_none(&self) -> bool {
118 matches!(self, Self::None)
119 }
120
121 pub fn is_proposed(&self) -> bool {
123 matches!(self, Self::Certifying(_))
124 }
125
126 pub fn as_proposal(&self) -> Option<&Proposal<N>> {
128 match self {
129 Self::Certifying(p) => Some(p.as_ref()),
130 _ => None,
131 }
132 }
133}
134
135pub type ProposedBatch<N> = RwLock<ProposedBatchState<N>>;
137
138#[async_trait::async_trait]
141pub trait PrimaryCallback<N: Network>: Send + std::marker::Sync {
142 fn try_advance_to_next_round(&self, current_round: u64) -> bool;
150
151 async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()>;
153}
154
155#[derive(Clone)]
158pub struct Primary<N: Network> {
159 sync: Sync<N>,
161 gateway: Gateway<N>,
163 storage: Storage<N>,
165 ledger: Arc<dyn LedgerService<N>>,
167 workers: Arc<OnceLock<Vec<Worker<N>>>>,
169
170 primary_callback: Arc<CallbackHandle<Arc<dyn PrimaryCallback<N>>>>,
172
173 proposed_batch: Arc<ProposedBatch<N>>,
175
176 #[cfg(feature = "metrics")]
179 batch_propose_start: Arc<Mutex<Option<Instant>>>,
180
181 latest_proposal_timestamp: Arc<TRwLock<Option<(u64, i64)>>>,
185
186 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
188
189 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
191
192 node_data_dir: NodeDataDir,
194
195 proposal_task: ProposalTask<N>,
197
198 round_increment_notify: Arc<Notify>,
201}
202
203impl<N: Network> Primary<N> {
204 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
206
207 #[allow(clippy::too_many_arguments)]
209 pub fn new(
210 account: Account<N>,
211 storage: Storage<N>,
212 ledger: Arc<dyn LedgerService<N>>,
213 block_sync: Arc<BlockSync<N>>,
214 ip: Option<SocketAddr>,
215 trusted_validators: &[SocketAddr],
216 trusted_peers_only: bool,
217 node_data_dir: NodeDataDir,
218 dev: Option<u16>,
219 ) -> Result<Self> {
220 let gateway = Gateway::new(
222 account,
223 storage.clone(),
224 ledger.clone(),
225 ip,
226 trusted_validators,
227 trusted_peers_only,
228 node_data_dir.clone(),
229 dev,
230 )?;
231 let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
233
234 Ok(Self {
236 sync,
237 gateway,
238 storage,
239 ledger,
240 node_data_dir,
241 workers: Default::default(),
242 primary_callback: Default::default(),
243 proposed_batch: Default::default(),
244 #[cfg(feature = "metrics")]
245 batch_propose_start: Default::default(),
246 latest_proposal_timestamp: Default::default(),
247 signed_proposals: Default::default(),
248 handles: Default::default(),
249 proposal_task: Default::default(),
250 round_increment_notify: Default::default(),
251 })
252 }
253
254 async fn load_proposal_cache(&self) -> Result<()> {
256 match ProposalCache::<N>::exists(&self.node_data_dir) {
258 true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.node_data_dir) {
260 Ok(proposal_cache) => {
261 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
263 proposal_cache.into();
264
265 *self.latest_proposal_timestamp.write().await = Some((latest_certificate_round, now()));
266 *self.proposed_batch.write() = match proposed_batch {
267 Some(p) => ProposedBatchState::Certifying(Box::new(p)),
268 None => ProposedBatchState::None,
269 };
270 *self.signed_proposals.write() = signed_proposals;
271
272 for certificate in pending_certificates {
274 let batch_id = certificate.batch_id();
275 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
279 {
280 let err = err.context(format!(
281 "Failed to load stored certificate {} from proposal cache",
282 fmt_id(batch_id)
283 ));
284 warn!("{}", &flatten_error(err));
285 }
286 }
287 Ok(())
288 }
289 Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
290 },
291 false => Ok(()),
293 }
294 }
295
296 pub async fn run(
298 &self,
299 ping: Option<Arc<Ping<N>>>,
300 primary_callback: Option<Arc<dyn PrimaryCallback<N>>>,
301 sync_callback: Option<Arc<dyn SyncCallback<N>>>,
302 primary_sender: PrimarySender<N>,
303 primary_receiver: PrimaryReceiver<N>,
304 ) -> Result<()> {
305 info!("Starting the primary instance of the memory pool...");
306
307 if let Some(callback) = primary_callback {
309 self.primary_callback.set(callback)?;
310 }
311
312 let mut worker_senders = IndexMap::new();
314 let mut workers = Vec::new();
316 for id in 0..MAX_WORKERS {
318 let (tx_worker, rx_worker) = init_worker_channels();
320 let worker = Worker::new(
322 id,
323 Arc::new(self.gateway.clone()),
324 self.storage.clone(),
325 self.ledger.clone(),
326 self.proposed_batch.clone(),
327 )?;
328 worker.run(rx_worker);
330 workers.push(worker);
332 worker_senders.insert(id, tx_worker);
334 }
335 if self.workers.set(workers).is_err() {
337 bail!("Workers already set. `Primary::run` cannot be called more than once.");
338 }
339
340 let (sync_sender, sync_receiver) = init_sync_channels();
342 self.sync.initialize(sync_callback)?;
344 self.load_proposal_cache().await?;
346 self.sync.run(ping, sync_receiver).await?;
348 self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
350 self.start_handlers(primary_receiver);
353
354 Ok(())
355 }
356
357 pub fn current_round(&self) -> u64 {
359 self.storage.current_round()
360 }
361
362 pub fn is_synced(&self) -> bool {
364 self.sync.is_synced()
365 }
366
367 pub const fn gateway(&self) -> &Gateway<N> {
369 &self.gateway
370 }
371
372 pub const fn storage(&self) -> &Storage<N> {
374 &self.storage
375 }
376
377 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
379 &self.ledger
380 }
381
382 pub fn num_workers(&self) -> u8 {
384 u8::try_from(self.workers.get().expect("Primary is not running yet").len()).expect("Too many workers")
385 }
386
387 pub fn workers(&self) -> &[Worker<N>] {
389 self.workers.get().expect("Primary is not running yet")
390 }
391}
392
393impl<N: Network> Primary<N> {
394 pub fn num_unconfirmed_transmissions(&self) -> usize {
396 self.workers().iter().map(|worker| worker.num_transmissions()).sum()
397 }
398
399 pub fn num_unconfirmed_ratifications(&self) -> usize {
401 self.workers().iter().map(|worker| worker.num_ratifications()).sum()
402 }
403
404 pub fn num_unconfirmed_solutions(&self) -> usize {
406 self.workers().iter().map(|worker| worker.num_solutions()).sum()
407 }
408
409 pub fn num_unconfirmed_transactions(&self) -> usize {
411 self.workers().iter().map(|worker| worker.num_transactions()).sum()
412 }
413}
414
415impl<N: Network> Primary<N> {
416 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
418 self.workers().iter().flat_map(|worker| worker.transmission_ids())
419 }
420
421 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
423 self.workers().iter().flat_map(|worker| worker.transmissions())
424 }
425
426 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
428 self.workers().iter().flat_map(|worker| worker.solutions())
429 }
430
431 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
433 self.workers().iter().flat_map(|worker| worker.transactions())
434 }
435}
436
437impl<N: Network> Primary<N> {
438 pub fn clear_worker_solutions(&self) {
440 self.workers().iter().for_each(Worker::clear_solutions);
441 }
442}
443
444#[async_trait::async_trait]
445impl<N: Network> proposal_task::BatchPropose for Primary<N> {
446 fn current_round(&self) -> u64 {
447 Primary::current_round(self)
448 }
449
450 fn wait_for_synced_if_syncing(&self) -> Option<futures::future::BoxFuture<'_, ()>> {
451 self.sync.wait_for_synced_if_syncing()
452 }
453
454 fn is_synced(&self) -> bool {
455 self.sync.is_synced()
456 }
457
458 async fn propose_batch(&self) -> Result<bool> {
471 let mut lock_guard = self.latest_proposal_timestamp.write().await;
476
477 if let Err(err) = self
479 .check_proposed_batch_for_expiration()
480 .with_context(|| "Failed to check the proposed batch for expiration")
481 {
482 warn!("{}", flatten_error(&err));
483 return Ok(false);
484 }
485
486 let round = self.current_round();
488 let previous_round = round.saturating_sub(1);
490
491 ensure!(round > 0, "Round 0 cannot have transaction batches");
495
496 if let Some((latest_round, _)) = &*lock_guard
498 && round < *latest_round
499 {
500 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {latest_round}");
501 return Ok(false);
502 }
503
504 match &*self.proposed_batch.read() {
506 ProposedBatchState::Certifying(proposal) => {
507 if round < proposal.round()
509 || proposal
510 .batch_header()
511 .previous_certificate_ids()
512 .iter()
513 .any(|id| !self.storage.contains_certificate(*id))
514 {
515 warn!(
516 "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
517 proposal.round(),
518 );
519 return Ok(false);
520 }
521 let event = Event::BatchPropose(proposal.batch_header().clone().into());
524 for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
526 match self.gateway.resolver().read().get_peer_ip_for_address(address) {
528 Some(peer_ip) => {
530 let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
531 tokio::spawn(async move {
532 debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
533 if gateway.send(peer_ip, event_).await.is_none() {
535 warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
536 }
537 });
538 }
539 None => continue,
540 }
541 }
542 debug!("Proposed batch for round {} is still valid", proposal.round());
543 return Ok(false);
544 }
545 ProposedBatchState::Certified(_) => {
547 debug!("Cannot propose a batch for round {round} - a batch is currently being certified");
548 return Ok(false);
549 }
550 ProposedBatchState::None => {
551 }
553 }
554
555 #[cfg(feature = "metrics")]
556 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
557
558 if let Some((_, latest_timestamp)) = &*lock_guard
560 && !self.check_own_proposal_timestamp(previous_round, *latest_timestamp, now())?
561 {
562 return Ok(false);
563 }
564
565 if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
567 if let Some(cb) = &*self.primary_callback.get_ref() {
569 match cb.try_advance_to_next_round(self.current_round()) {
570 true => (), false => return Ok(false),
572 }
573 }
574 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
575 return Ok(false);
576 }
577
578 if let Some((latest_round, _)) = &*lock_guard
584 && *latest_round == round
585 {
586 debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
587 return Ok(false);
588 }
589
590 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
592 {
594 let mut connected_validators = self.gateway.connected_addresses();
596 connected_validators.insert(self.gateway.account().address());
598 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
600 debug!(
601 "Primary is safely skipping a batch proposal for round {round} {}",
602 "(please connect to more validators)".dimmed()
603 );
604 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
605 return Ok(false);
606 }
607 }
608
609 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
611
612 let mut is_ready = previous_round == 0;
615 if previous_round > 0 {
617 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
619 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
620 };
621 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
623 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
625 is_ready = true;
626 }
627 #[cfg(feature = "test_network")]
628 {
629 if let Some(dev_committee) = self.ledger.dev_committee_for_round(previous_round)? {
631 if round <= dev_committee.starting_round() {
632 is_ready = true;
633 }
634 }
635 }
636 }
637 if !is_ready {
639 debug!(
640 "Primary is safely skipping a batch proposal for round {round} {}",
641 format!("(previous round {previous_round} has not reached quorum)").dimmed()
642 );
643 return Ok(false);
644 }
645
646 let mut transmissions: IndexMap<_, _> = Default::default();
648 let mut proposal_cost = 0u64;
650 debug_assert_eq!(MAX_WORKERS, 1);
654
655 'outer: for worker in self.workers().iter() {
656 let mut num_worker_transmissions = 0usize;
657
658 while let Some((id, transmission)) = worker.remove_front() {
659 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
661 worker.insert_front(id, transmission);
663 break 'outer;
664 }
665
666 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
668 worker.insert_front(id, transmission);
670 continue 'outer;
671 }
672
673 if self.ledger.contains_transmission(&id).unwrap_or(true) {
675 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
676 continue;
677 }
678
679 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
683 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
684 continue;
685 }
686
687 match (id, transmission.clone()) {
689 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
690 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
692 {
693 trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
694 continue;
695 }
696 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
698 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
699 continue;
700 }
701 }
702 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
703 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
705 {
706 trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
707 continue;
708 }
709
710 let transaction = spawn_blocking!({
712 match transaction {
713 Data::Object(transaction) => Ok(transaction),
714 Data::Buffer(bytes) => Ok(Transaction::<N>::read_le(
715 &mut bytes.take(N::LATEST_MAX_TRANSACTION_SIZE() as u64),
716 )?),
717 }
718 })?;
719
720 let current_block_height = self.ledger.latest_block_height();
722 let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
723
724 let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
727 else {
728 debug!(
729 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
730 fmt_id(transaction_id)
731 );
732 continue;
733 };
734
735 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
737 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
738 continue;
739 }
740
741 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
744 debug!(
745 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
746 fmt_id(transaction_id)
747 );
748 continue;
749 };
750
751 let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
753 if next_proposal_cost > batch_spend_limit {
754 debug!(
755 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
756 fmt_id(transaction_id),
757 batch_spend_limit
758 );
759
760 worker.insert_front(id, transmission);
762 break 'outer;
763 }
764
765 proposal_cost = next_proposal_cost;
767 }
768
769 (TransmissionID::Ratification, Transmission::Ratification) => continue,
772 _ => continue,
774 }
775
776 transmissions.insert(id, transmission);
778 num_worker_transmissions = num_worker_transmissions.saturating_add(1);
779 }
780 }
781
782 let current_timestamp = now();
784
785 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
787
788 *lock_guard = Some((round, current_timestamp));
790 let private_key = *self.gateway.account().private_key();
792 let committee_id = committee_lookback.id();
794 let transmission_ids = transmissions.keys().copied().collect();
796 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
798 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
800 &private_key,
801 round,
802 current_timestamp,
803 committee_id,
804 transmission_ids,
805 previous_certificate_ids,
806 &mut rand::rng()
807 ))
808 .and_then(|batch_header| {
809 Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
810 .map(|proposal| (batch_header, proposal))
811 })
812 .inspect_err(|_| {
813 if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
815 error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
816 }
817 })?;
818
819 self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
821 *self.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
823 #[cfg(feature = "metrics")]
825 {
826 *self.batch_propose_start.lock() = Some(Instant::now());
827 }
828
829 Ok(true)
830 }
831}
832
833impl<N: Network> Primary<N> {
834 async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
844 let BatchPropose { round: batch_round, batch_header } = batch_propose;
845
846 let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
848 if batch_round != batch_header.round() {
850 self.gateway.disconnect(peer_ip);
852 bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
853 }
854
855 let batch_author = batch_header.author();
857
858 match self.gateway.resolve_to_aleo_addr(peer_ip) {
860 Some(address) => {
862 if address != batch_author {
863 self.gateway.disconnect(peer_ip);
865 bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
866 }
867 }
868 None => bail!("Batch proposal from a disconnected validator"),
869 }
870 if !self.gateway.is_authorized_validator_address(batch_author) {
872 self.gateway.disconnect(peer_ip);
874 bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
875 }
876 if self.gateway.account().address() == batch_author {
878 bail!("Invalid peer - proposed batch from myself ({batch_author})");
879 }
880
881 let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
887 if expected_committee_id != batch_header.committee_id() {
888 self.gateway.disconnect(peer_ip);
890 bail!(
891 "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
892 batch_header.committee_id()
893 );
894 }
895
896 if let Some((signed_round, signed_batch_id, signature)) =
898 self.signed_proposals.read().get(&batch_author).copied()
899 {
900 if signed_round > batch_header.round() {
903 bail!(
904 "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
905 batch_header.round()
906 );
907 }
908
909 if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
911 bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
912 }
913 if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
916 let gateway = self.gateway.clone();
917 tokio::spawn(async move {
918 debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
919 let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
920 if gateway.send(peer_ip, event).await.is_none() {
922 warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
923 }
924 });
925 return Ok(());
927 }
928 }
929
930 if self.storage.contains_batch(batch_header.batch_id()) {
933 debug!(
934 "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
935 format!("batch for round {batch_round} already exists in storage").dimmed()
936 );
937 return Ok(());
938 }
939
940 let previous_round = batch_round.saturating_sub(1);
942 if let Err(err) = self.check_peer_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
944 self.gateway.disconnect(peer_ip);
946 return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
947 }
948
949 if batch_header.contains(TransmissionID::Ratification) {
951 self.gateway.disconnect(peer_ip);
953 bail!(
954 "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
955 );
956 }
957
958 let mut missing_transmissions =
960 self.sync_with_batch_header_from_peer::<false, true>(peer_ip, &batch_header).await?;
961
962 if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
964 self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
966 }) {
967 let err = err.context(format!(
968 "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
969 ));
970 debug!("{}", flatten_error(err));
971 return Ok(());
972 }
973
974 if let Err(e) = self.ensure_is_signing_round(batch_round) {
978 debug!("{e} from '{peer_ip}'");
980 return Ok(());
981 }
982
983 let (storage, header) = (self.storage.clone(), batch_header.clone());
985
986 let Some(missing_transmissions) =
988 spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
989 else {
990 return Ok(());
991 };
992
993 self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
995
996 let batch_id = batch_header.batch_id();
1000 let account = self.gateway.account().clone();
1002 let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::rng()))?;
1003
1004 match self.signed_proposals.write().0.entry(batch_author) {
1010 std::collections::hash_map::Entry::Occupied(mut entry) => {
1011 if entry.get().0 == batch_round {
1016 return Ok(());
1017 }
1018 entry.insert((batch_round, batch_id, signature));
1020 }
1021 std::collections::hash_map::Entry::Vacant(entry) => {
1023 entry.insert((batch_round, batch_id, signature));
1025 }
1026 };
1027
1028 let self_ = self.clone();
1030 tokio::spawn(async move {
1031 let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1032 if self_.gateway.send(peer_ip, event).await.is_some() {
1034 debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1035 }
1036 });
1037
1038 Ok(())
1039 }
1040
1041 fn add_signature_to_batch(
1052 &self,
1053 state: ProposedBatchState<N>,
1054 peer_ip: SocketAddr,
1055 batch_id: Field<N>,
1056 signature: Signature<N>,
1057 ) -> (Result<Option<Proposal<N>>>, ProposedBatchState<N>) {
1058 match state {
1059 ProposedBatchState::Certifying(mut proposal) if proposal.batch_id() == batch_id => {
1060 let inner: Result<bool> = (|| {
1063 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1064 let Some(signer) = self.gateway.resolve_to_aleo_addr(peer_ip) else {
1065 bail!("Signature is from a disconnected validator");
1066 };
1067 let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1068 if new_signature {
1069 info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1070 Ok(proposal.is_quorum_threshold_reached(&committee_lookback))
1071 } else {
1072 debug!(
1073 "Received duplicated signature from '{peer_ip}' for batch \
1074 {batch_id} in round {round}",
1075 round = proposal.round()
1076 );
1077 Ok(false)
1078 }
1079 })();
1080 match inner {
1081 Ok(true) => {
1082 let certified_id = proposal.batch_id();
1083 (Ok(Some(*proposal)), ProposedBatchState::Certified(certified_id))
1084 }
1085 Ok(false) => (Ok(None), ProposedBatchState::Certifying(proposal)),
1086 Err(e) => (Err(e), ProposedBatchState::Certifying(proposal)),
1087 }
1088 }
1089 ProposedBatchState::Certifying(proposal) => {
1090 if self.storage.contains_batch(batch_id) {
1092 debug!(
1093 "Primary is safely skipping a batch signature from {peer_ip} for \
1094 round {} - batch is already certified",
1095 proposal.round()
1096 );
1097 (Ok(None), ProposedBatchState::Certifying(proposal))
1098 } else {
1099 let expected_id = proposal.batch_id();
1100 let round = proposal.round();
1101 (
1102 Err(anyhow!("Unknown batch ID '{batch_id}', expected '{expected_id}' for round {round}")),
1103 ProposedBatchState::Certifying(proposal),
1104 )
1105 }
1106 }
1107 ProposedBatchState::Certified(id) if id == batch_id => {
1108 debug!(
1110 "Skipping batch signature from {peer_ip} for batch '{batch_id}' - \
1111 already received sufficient signatures"
1112 );
1113 (Ok(None), ProposedBatchState::Certified(id))
1114 }
1115 ProposedBatchState::Certified(id) => {
1116 let result = if self.storage.contains_batch(batch_id) {
1117 warn!("Received signature for an older batch {batch_id}");
1119 Ok(None)
1120 } else {
1121 Err(anyhow!("Unknown batch ID '{batch_id}'"))
1122 };
1123
1124 (result, ProposedBatchState::Certified(id))
1125 }
1126 ProposedBatchState::None => {
1127 let result = if self.storage.contains_batch(batch_id) {
1128 warn!("Received signature for an older batch {batch_id}");
1130 Ok(None)
1131 } else {
1132 Err(anyhow!("Unknown batch ID '{batch_id}'"))
1133 };
1134
1135 (result, ProposedBatchState::None)
1136 }
1137 }
1138 }
1139
1140 async fn process_batch_signature_from_peer(
1149 &self,
1150 peer_ip: SocketAddr,
1151 batch_signature: BatchSignature<N>,
1152 ) -> Result<()> {
1153 self.check_proposed_batch_for_expiration()?;
1155
1156 let BatchSignature { batch_id, signature } = batch_signature;
1158
1159 let signer = signature.to_address();
1161
1162 if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1164 self.gateway.disconnect(peer_ip);
1166 bail!("Malicious peer - batch signature is from a different validator ({signer})");
1167 }
1168 if self.gateway.account().address() == signer {
1170 bail!("Invalid peer - received a batch signature from myself ({signer})");
1171 }
1172
1173 let self_ = self.clone();
1174 let Some(proposal) = spawn_blocking!({
1175 let mut proposed_batch = self_.proposed_batch.write();
1177
1178 let (result, new_state) =
1179 self_.add_signature_to_batch(std::mem::take(&mut *proposed_batch), peer_ip, batch_id, signature);
1180 *proposed_batch = new_state;
1181 result
1182 })?
1183 else {
1184 return Ok(());
1185 };
1186
1187 info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1190
1191 let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1193 if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1196 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1198 return Err(e);
1199 }
1200
1201 #[cfg(feature = "metrics")]
1202 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1203 Ok(())
1204 }
1205
1206 async fn process_batch_certificate_from_peer(
1213 &self,
1214 peer_ip: SocketAddr,
1215 certificate: BatchCertificate<N>,
1216 ) -> Result<()> {
1217 if !self.gateway.is_authorized_validator_ip(peer_ip) {
1219 self.gateway.disconnect(peer_ip);
1221 bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1222 }
1223 if self.storage.contains_certificate(certificate.id()) {
1225 return Ok(());
1226 } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1228 self.storage.insert_unprocessed_certificate(certificate.clone())?;
1229 }
1230
1231 let author = certificate.author();
1233 let certificate_round = certificate.round();
1235 let committee_id = certificate.committee_id();
1237
1238 if self.gateway.account().address() == author {
1240 bail!("Received a batch certificate for myself ({author})");
1241 }
1242
1243 self.storage.check_incoming_certificate(&certificate)?;
1245
1246 self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1258
1259 let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1264
1265 let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1267 let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1269
1270 let expected_committee_id = committee_lookback.id();
1272 if expected_committee_id != committee_id {
1273 self.gateway.disconnect(peer_ip);
1275 bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1276 }
1277
1278 let should_advance = match &*self.latest_proposal_timestamp.read().await {
1282 Some((latest_round, _)) => *latest_round < certificate_round,
1284 None => true,
1286 };
1287
1288 let current_round = self.current_round();
1290
1291 if is_quorum && should_advance && certificate_round >= current_round {
1293 self.round_increment_notify.notify_one();
1295 }
1296 Ok(())
1297 }
1298}
1299
1300impl<N: Network> Primary<N> {
1301 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1310 let PrimaryReceiver {
1311 mut rx_batch_propose,
1312 mut rx_batch_signature,
1313 mut rx_batch_certified,
1314 mut rx_primary_ping,
1315 mut rx_unconfirmed_solution,
1316 mut rx_unconfirmed_transaction,
1317 } = primary_receiver;
1318
1319 let self_ = self.clone();
1321 self.spawn(async move {
1322 loop {
1323 tokio::time::sleep(PRIMARY_PING_INTERVAL).await;
1325
1326 let self__ = self_.clone();
1328 let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1329 Ok(block_locators) => block_locators,
1330 Err(e) => {
1331 warn!("Failed to retrieve block locators - {e}");
1332 continue;
1333 }
1334 };
1335
1336 let primary_certificate = {
1338 let primary_address = self_.gateway.account().address();
1340
1341 let mut certificate = None;
1343 let mut current_round = self_.current_round();
1344 while certificate.is_none() {
1345 if current_round == 0 {
1347 break;
1348 }
1349 if let Some(primary_certificate) =
1351 self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1352 {
1353 certificate = Some(primary_certificate);
1354 } else {
1356 current_round = current_round.saturating_sub(1);
1357 }
1358 }
1359
1360 match certificate {
1362 Some(certificate) => certificate,
1363 None => continue,
1365 }
1366 };
1367
1368 let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1370 self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1372 }
1373 });
1374
1375 let self_ = self.clone();
1377 self.spawn(async move {
1378 while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1379 if self_.sync.is_synced() {
1381 trace!("Processing new primary ping from '{peer_ip}'");
1382 } else {
1383 trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1384 continue;
1385 }
1386
1387 {
1389 let self_ = self_.clone();
1390 tokio::spawn(async move {
1391 let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1393 else {
1394 warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1395 return;
1396 };
1397 let id = fmt_id(primary_certificate.id());
1399 let round = primary_certificate.round();
1400 if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1401 warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1402 }
1403 });
1404 }
1405 }
1406 });
1407
1408 let self_ = self.clone();
1410 self.spawn(async move {
1411 loop {
1412 tokio::time::sleep(WORKER_PING_INTERVAL).await;
1413 if !self_.sync.is_synced() {
1415 trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1416 continue;
1417 }
1418 for worker in self_.workers() {
1420 worker.broadcast_ping();
1421 }
1422 }
1423 });
1424
1425 let proposal_task = self.proposal_task.clone();
1427 let self_ = self.clone();
1428 self.spawn(async move { proposal_task.run(self_).await });
1429
1430 let self_ = self.clone();
1432 self.spawn(async move {
1433 while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1434 if !self_.sync.is_synced() {
1436 trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1437 continue;
1438 }
1439
1440 let self_ = self_.clone();
1442 tokio::spawn(async move {
1443 let round = batch_propose.round;
1445 if let Err(err) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1446 let err = err.context(format!("Cannot sign a batch at round {round} from '{peer_ip}'"));
1447 warn!("{}", flatten_error(err));
1448 }
1449 });
1450 }
1451 });
1452
1453 let self_ = self.clone();
1455 self.spawn(async move {
1456 while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1457 if !self_.sync.is_synced() {
1459 trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1460 continue;
1461 }
1462 let id = fmt_id(batch_signature.batch_id);
1468 if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1469 let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1470 warn!("{}", flatten_error(err));
1471 }
1472 }
1473 });
1474
1475 let self_ = self.clone();
1477 self.spawn(async move {
1478 while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1479 if !self_.sync.is_synced() {
1481 trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1482 continue;
1483 }
1484 let self_ = self_.clone();
1486 tokio::spawn(async move {
1487 let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1489 warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1490 return;
1491 };
1492 let id = fmt_id(batch_certificate.id());
1494 let round = batch_certificate.round();
1495 if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1496 warn!(
1497 "{}",
1498 flatten_error(err.context(format!(
1499 "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1500 )))
1501 );
1502 }
1503 });
1504 }
1505 });
1506
1507 let self_ = self.clone();
1510 self.spawn(async move {
1511 loop {
1512 let round_start = Instant::now();
1513 let current_round = self_.current_round();
1514
1515 while self_.current_round() == current_round {
1517 let mut futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> =
1518 vec![Box::pin(self_.round_increment_notify.notified())];
1519
1520 if let Some(remaining_delay) = MAX_BATCH_DELAY.checked_sub(round_start.elapsed())
1521 && !remaining_delay.is_zero()
1522 {
1523 futures.push(Box::pin(tokio::time::sleep(remaining_delay)));
1524 }
1525 futures.push(Box::pin(tokio::time::sleep(MAX_LEADER_CERTIFICATE_DELAY)));
1530 if !self_.sync.is_synced() {
1531 futures.push(Box::pin(self_.sync.wait_for_synced()));
1532 }
1533 let _ = futures::future::select_all(futures).await;
1534
1535 if !self_.sync.is_synced() {
1536 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1537 continue;
1538 }
1539
1540 let next_round = current_round.saturating_add(1);
1541 let is_quorum_threshold_reached = {
1542 let authors = self_.storage.get_certificate_authors_for_round(current_round);
1543 if authors.is_empty() {
1544 continue;
1545 }
1546 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round)
1547 else {
1548 warn!("Failed to retrieve the committee lookback for round {current_round}");
1549 continue;
1550 };
1551 committee_lookback.is_quorum_threshold_reached(&authors)
1552 };
1553
1554 if is_quorum_threshold_reached {
1555 debug!("Quorum threshold reached for round {current_round}");
1556 if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1557 warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1558 }
1559 }
1560 }
1561 }
1562 });
1563
1564 let self_ = self.clone();
1566 self.spawn(async move {
1567 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1568 let Ok(checksum) = solution.to_checksum::<N>() else {
1570 error!("Failed to compute the checksum for the unconfirmed solution");
1571 continue;
1572 };
1573 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1575 error!("Unable to determine the worker ID for the unconfirmed solution");
1576 continue;
1577 };
1578 let self_ = self_.clone();
1579 tokio::spawn(async move {
1580 let worker = &self_.workers()[worker_id as usize];
1582 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1584 callback.send(result).ok();
1586 });
1587 }
1588 });
1589
1590 let self_ = self.clone();
1592 self.spawn(async move {
1593 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1594 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1595 let Ok(checksum) = transaction.to_checksum::<N>() else {
1597 error!("Failed to compute the checksum for the unconfirmed transaction");
1598 continue;
1599 };
1600 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1602 error!("Unable to determine the worker ID for the unconfirmed transaction");
1603 continue;
1604 };
1605 let self_ = self_.clone();
1606 tokio::spawn(async move {
1607 let worker = &self_.workers().get(worker_id as usize).expect("Invalid worker ID");
1609 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1611 callback.send(result).ok();
1613 });
1614 }
1615 });
1616 }
1617
1618 fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1620 let is_expired = match &*self.proposed_batch.read() {
1623 ProposedBatchState::Certifying(proposal) => proposal.round() < self.current_round(),
1624 _ => false,
1625 };
1626 if is_expired {
1628 let old = std::mem::replace(&mut *self.proposed_batch.write(), ProposedBatchState::None);
1630 if let ProposedBatchState::Certifying(proposal) = old {
1631 debug!("Cleared expired proposal for round {}", proposal.round());
1632 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1633 }
1634 }
1635 Ok(())
1636 }
1637
1638 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1640 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1642 let mut fast_forward_round = self.current_round();
1643 while fast_forward_round < next_round.saturating_sub(1) {
1645 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1647 *self.proposed_batch.write() = ProposedBatchState::None;
1649 }
1650 }
1651
1652 let current_round = self.current_round();
1654 if current_round < next_round {
1656 let is_ready = if let Some(cb) = self.primary_callback.get() {
1658 cb.try_advance_to_next_round(current_round)
1659 }
1660 else {
1662 self.storage.increment_to_next_round(current_round)?;
1664 true
1666 };
1667
1668 if is_ready && self.is_synced() {
1670 debug!("Primary is ready to propose the next round");
1671 self.proposal_task.signal();
1672 } else {
1673 debug!("Primary is not ready to propose the next round");
1674 }
1675 }
1676 Ok(())
1677 }
1678
1679 fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1683 let current_round = self.current_round();
1685 if current_round + self.storage.max_gc_rounds() <= batch_round {
1687 bail!("Round {batch_round} is too far in the future")
1688 }
1689 if current_round > batch_round + 1 {
1693 bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1694 }
1695 if let ProposedBatchState::Certifying(proposal) = &*self.proposed_batch.read()
1697 && proposal.round() > batch_round
1698 {
1699 bail!("Our primary at round {} is no longer signing for round {batch_round}", proposal.round())
1700 }
1701 Ok(())
1702 }
1703
1704 fn check_peer_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1707 ensure!(author != self.gateway.account().address(), "Peer cannot propose a batch that is authored by myself");
1708
1709 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1711 Some(certificate) => certificate.timestamp(),
1713 None => return Ok(()),
1715 };
1716
1717 let elapsed = timestamp
1719 .checked_sub(previous_timestamp)
1720 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1721 match elapsed < MIN_BATCH_DELAY.as_secs() as i64 {
1723 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1724 false => Ok(()),
1725 }
1726 }
1727
1728 fn check_own_proposal_timestamp(
1736 &self,
1737 previous_round: u64,
1738 previous_timestamp: i64,
1739 timestamp: i64,
1740 ) -> Result<bool> {
1741 let elapsed = timestamp
1743 .checked_sub(previous_timestamp)
1744 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1745
1746 Ok(elapsed >= MIN_BATCH_DELAY.as_secs() as i64)
1747 }
1748
1749 async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1751 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1753
1754 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1757
1758 let round = certificate.round();
1760 let num_transmissions = certificate.transmission_ids().len();
1761
1762 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1764 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1765 debug!("Stored a batch certificate for round {}", certificate.round());
1766 *self.proposed_batch.write() = ProposedBatchState::None;
1769
1770 if let Some(cb) = self.primary_callback.get() {
1772 cb.add_new_certificate(certificate.clone()).await.with_context(|| {
1774 format!("Failed to insert our newly certified batch for round {round} into the DAG")
1775 })?;
1776 }
1777 self.gateway.broadcast(Event::BatchCertified(certificate.into()));
1779
1780 info!("Our batch with {num_transmissions} transmissions for round {round} was certified!");
1782
1783 #[cfg(feature = "metrics")]
1785 if let Some(start) = self.batch_propose_start.lock().take() {
1786 metrics::histogram(metrics::bft::BATCH_CERTIFICATION_LATENCY, start.elapsed().as_secs_f64());
1787 }
1788
1789 self.round_increment_notify.notify_one();
1791
1792 Ok(())
1793 }
1794
1795 fn insert_missing_transmissions_into_workers(
1797 &self,
1798 peer_ip: SocketAddr,
1799 transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1800 ) -> Result<()> {
1801 assign_to_workers(self.workers(), transmissions, |worker, transmission_id, transmission| {
1803 worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1804 })
1805 }
1806
1807 fn reinsert_transmissions_into_workers(
1809 &self,
1810 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1811 ) -> Result<()> {
1812 assign_to_workers(self.workers(), transmissions.into_iter(), |worker, transmission_id, transmission| {
1814 worker.reinsert(transmission_id, transmission);
1815 })
1816 }
1817
1818 #[async_recursion::async_recursion]
1828 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1829 &self,
1830 peer_ip: SocketAddr,
1831 certificate: BatchCertificate<N>,
1832 ) -> Result<()> {
1833 let batch_header = certificate.batch_header();
1835 let batch_round = batch_header.round();
1837
1838 if batch_round <= self.storage.gc_round() {
1840 return Ok(());
1841 }
1842 if self.storage.contains_certificate(certificate.id()) {
1844 return Ok(());
1845 }
1846
1847 if !IS_SYNCING && !self.is_synced() {
1849 bail!(
1850 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1851 fmt_id(certificate.id())
1852 );
1853 }
1854
1855 let missing_transmissions =
1857 self.sync_with_batch_header_from_peer::<IS_SYNCING, false>(peer_ip, batch_header).await?;
1858
1859 if !self.storage.contains_certificate(certificate.id()) {
1861 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1863 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1864 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1865 if let Some(cb) = self.primary_callback.get() {
1867 cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?;
1868 }
1869 self.round_increment_notify.notify_one();
1871 }
1872 Ok(())
1873 }
1874
1875 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool, const CHECK_PREVIOUS_CERTIFICATES: bool>(
1877 &self,
1878 peer_ip: SocketAddr,
1879 batch_header: &BatchHeader<N>,
1880 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1881 let batch_round = batch_header.round();
1883
1884 if batch_round <= self.storage.gc_round() {
1886 bail!("Round {batch_round} is too far in the past")
1887 }
1888
1889 if !IS_SYNCING && !self.is_synced() {
1891 bail!(
1892 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1893 fmt_id(batch_header.batch_id())
1894 );
1895 }
1896
1897 let is_quorum_threshold_reached = {
1899 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1900 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1901 committee_lookback.is_quorum_threshold_reached(&authors)
1902 };
1903
1904 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1909 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1911 if is_behind_schedule || is_peer_far_in_future {
1913 self.try_increment_to_the_next_round(batch_round)
1915 .await
1916 .with_context(|| "Failed to fast forward current round")?;
1917 }
1918
1919 let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1921
1922 let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1924
1925 let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1927 missing_transmissions_handle,
1928 missing_previous_certificates_handle,
1929 ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1930
1931 for batch_certificate in missing_previous_certificates {
1935 if CHECK_PREVIOUS_CERTIFICATES {
1940 self.storage.check_incoming_certificate(&batch_certificate)?;
1941 }
1942 self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1944 }
1945 Ok(missing_transmissions)
1946 }
1947
1948 async fn fetch_missing_transmissions(
1951 &self,
1952 peer_ip: SocketAddr,
1953 batch_header: &BatchHeader<N>,
1954 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1955 if batch_header.round() <= self.storage.gc_round() {
1957 return Ok(Default::default());
1958 }
1959
1960 if self.storage.contains_batch(batch_header.batch_id()) {
1962 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1963 return Ok(Default::default());
1964 }
1965
1966 let workers = self.workers.clone();
1968
1969 let mut fetch_transmissions = FuturesUnordered::new();
1971
1972 let num_workers = self.num_workers();
1974 for transmission_id in batch_header.transmission_ids() {
1976 if !self.storage.contains_transmission(*transmission_id) {
1978 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1980 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1981 };
1982 let Some(worker) = workers.get().expect("No workers set").get(worker_id as usize) else {
1984 bail!("Unable to find worker {worker_id}")
1985 };
1986 fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1988 }
1989 }
1990
1991 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1993 while let Some(result) = fetch_transmissions.next().await {
1995 let (transmission_id, transmission) = result?;
1997 transmissions.insert(transmission_id, transmission);
1999 }
2000 Ok(transmissions)
2002 }
2003
2004 async fn fetch_missing_previous_certificates(
2006 &self,
2007 peer_ip: SocketAddr,
2008 batch_header: &BatchHeader<N>,
2009 ) -> Result<HashSet<BatchCertificate<N>>> {
2010 let round = batch_header.round();
2012 if round == 1 || round <= self.storage.gc_round() + 1 {
2014 return Ok(Default::default());
2015 }
2016
2017 let missing_previous_certificates =
2019 self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
2020 if !missing_previous_certificates.is_empty() {
2021 debug!(
2022 "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
2023 missing_previous_certificates.len(),
2024 );
2025 }
2026 Ok(missing_previous_certificates)
2028 }
2029
2030 async fn fetch_missing_certificates(
2032 &self,
2033 peer_ip: SocketAddr,
2034 round: u64,
2035 certificate_ids: &IndexSet<Field<N>>,
2036 ) -> Result<HashSet<BatchCertificate<N>>> {
2037 let mut fetch_certificates = FuturesUnordered::new();
2039 let mut missing_certificates = HashSet::default();
2041 for certificate_id in certificate_ids {
2043 if self.ledger.contains_certificate(certificate_id)? {
2045 continue;
2046 }
2047 if self.storage.contains_certificate(*certificate_id) {
2049 continue;
2050 }
2051 if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
2053 missing_certificates.insert(certificate);
2054 } else {
2055 trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
2057 fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
2060 }
2061 }
2062
2063 match fetch_certificates.is_empty() {
2065 true => return Ok(missing_certificates),
2066 false => trace!(
2067 "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
2068 fetch_certificates.len(),
2069 ),
2070 }
2071
2072 while let Some(result) = fetch_certificates.next().await {
2074 missing_certificates.insert(result?);
2076 }
2077 Ok(missing_certificates)
2079 }
2080}
2081
2082impl<N: Network> Primary<N> {
2083 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
2085 self.handles.lock().push(tokio::spawn(future));
2086 }
2087
2088 pub async fn shut_down(&self) {
2090 info!("Shutting down the primary...");
2091 self.primary_callback.clear();
2093 self.sync.shut_down().await;
2095 self.workers().iter().for_each(|worker| worker.shut_down());
2097 self.handles.lock().drain(..).for_each(|handle| handle.abort());
2099 let proposal_cache = {
2101 let proposal = match std::mem::replace(&mut *self.proposed_batch.write(), ProposedBatchState::None) {
2105 ProposedBatchState::Certifying(p) => Some(*p),
2106 _ => None,
2107 };
2108 let signed_proposals = self.signed_proposals.read().clone();
2109 let latest_round = proposal
2110 .as_ref()
2111 .map(Proposal::round)
2112 .unwrap_or(self.latest_proposal_timestamp.read().await.map(|(round, _)| round).unwrap_or(0));
2113 let pending_certificates = self.storage.get_pending_certificates();
2114 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
2115 };
2116 if let Err(err) = proposal_cache.store(&self.node_data_dir) {
2117 error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
2118 }
2119 self.gateway.shut_down().await;
2121 }
2122}
2123
2124#[cfg(test)]
2125mod tests {
2126 use super::{proposal_task::BatchPropose as _, *};
2127
2128 use snarkos_node_bft_ledger_service::MockLedgerService;
2129 use snarkos_node_bft_storage_service::BFTMemoryService;
2130 use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
2131 use snarkvm::{
2132 ledger::{
2133 committee::{Committee, MIN_VALIDATOR_STAKE},
2134 test_helpers::sample_execution_transaction_with_fee,
2135 },
2136 prelude::{Address, Signature},
2137 };
2138
2139 use bytes::Bytes;
2140 use indexmap::IndexSet;
2141 use rand::RngExt;
2142
2143 type CurrentNetwork = snarkvm::prelude::MainnetV0;
2144
2145 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2146 const COMMITTEE_SIZE: usize = 4;
2148 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2149 let mut members = IndexMap::new();
2150
2151 for i in 0..COMMITTEE_SIZE {
2152 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2153 let account = Account::new(rng).unwrap();
2154
2155 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.random_range(0..100)));
2156 accounts.push((socket_addr, account));
2157 }
2158
2159 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2160 }
2161
2162 fn primary_with_committee(
2164 account_index: usize,
2165 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2166 committee: Committee<CurrentNetwork>,
2167 height: u32,
2168 ) -> Primary<CurrentNetwork> {
2169 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2170 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10).unwrap();
2171
2172 let account = accounts[account_index].1.clone();
2174 let block_sync = Arc::new(BlockSync::new(ledger.clone(), ConnectionMode::Gateway));
2175 let primary =
2176 Primary::new(account, storage, ledger, block_sync, None, &[], false, NodeDataDir::new_test(None), None)
2177 .unwrap();
2178
2179 let worker = Worker::new(
2181 0, Arc::new(primary.gateway.clone()),
2183 primary.storage.clone(),
2184 primary.ledger.clone(),
2185 primary.proposed_batch.clone(),
2186 )
2187 .unwrap();
2188 let _ = primary.workers.set(vec![worker]);
2189 for a in accounts.iter().skip(account_index) {
2190 primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2191 }
2192
2193 primary
2194 }
2195
2196 fn primary_without_handlers(
2197 rng: &mut TestRng,
2198 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2199 let (accounts, committee) = sample_committee(rng);
2200 let primary = primary_with_committee(
2201 0, &accounts,
2203 committee,
2204 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2205 );
2206
2207 (primary, accounts)
2208 }
2209
2210 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2212 let solution_id = rng.random::<u64>().into();
2214 let size = rng.random_range(1024..10 * 1024);
2216 let vec: Vec<u8> = (0..size).map(|_| rng.random::<u8>()).collect();
2218 let solution = Data::Buffer(Bytes::from(vec));
2219 (solution_id, solution)
2221 }
2222
2223 fn sample_unconfirmed_transaction(
2225 rng: &mut TestRng,
2226 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2227 let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2228 let id = transaction.id();
2229
2230 (id, Data::Object(transaction))
2231 }
2232
2233 fn create_test_proposal(
2235 author: &Account<CurrentNetwork>,
2236 committee: Committee<CurrentNetwork>,
2237 round: u64,
2238 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2239 timestamp: i64,
2240 num_transactions: u64,
2241 rng: &mut TestRng,
2242 ) -> Proposal<CurrentNetwork> {
2243 let mut transmission_ids = IndexSet::new();
2244 let mut transmissions = IndexMap::new();
2245
2246 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2248 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2249 let solution_transmission_id = (solution_id, solution_checksum).into();
2250 transmission_ids.insert(solution_transmission_id);
2251 transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2252
2253 for _ in 0..num_transactions {
2255 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2256 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2257 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2258 transmission_ids.insert(transaction_transmission_id);
2259 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2260 }
2261
2262 let private_key = author.private_key();
2264 let batch_header = BatchHeader::new(
2266 private_key,
2267 round,
2268 timestamp,
2269 committee.id(),
2270 transmission_ids,
2271 previous_certificate_ids,
2272 rng,
2273 )
2274 .unwrap();
2275 Proposal::new(committee, batch_header, transmissions).unwrap()
2277 }
2278
2279 fn peer_signatures_for_proposal(
2282 primary: &Primary<CurrentNetwork>,
2283 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2284 rng: &mut TestRng,
2285 ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2286 let mut signatures = Vec::with_capacity(accounts.len() - 1);
2288 for (socket_addr, account) in accounts {
2289 if account.address() == primary.gateway.account().address() {
2290 continue;
2291 }
2292 let batch_id = primary.proposed_batch.read().as_proposal().unwrap().batch_id();
2293 let signature = account.sign(&[batch_id], rng).unwrap();
2294 signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2295 }
2296
2297 signatures
2298 }
2299
2300 fn peer_signatures_for_batch(
2302 primary_address: Address<CurrentNetwork>,
2303 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2304 batch_id: Field<CurrentNetwork>,
2305 rng: &mut TestRng,
2306 ) -> IndexSet<Signature<CurrentNetwork>> {
2307 let mut signatures = IndexSet::new();
2308 for (_, account) in accounts {
2309 if account.address() == primary_address {
2310 continue;
2311 }
2312 let signature = account.sign(&[batch_id], rng).unwrap();
2313 signatures.insert(signature);
2314 }
2315 signatures
2316 }
2317
2318 fn create_batch_certificate(
2320 primary_address: Address<CurrentNetwork>,
2321 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2322 round: u64,
2323 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2324 rng: &mut TestRng,
2325 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2326 let timestamp = now();
2327
2328 let author =
2329 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2330 let private_key = author.private_key();
2331
2332 let committee_id = Field::rand(rng);
2333 let (solution_id, solution) = sample_unconfirmed_solution(rng);
2334 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2335 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2336 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2337
2338 let solution_transmission_id = (solution_id, solution_checksum).into();
2339 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2340
2341 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2342 let transmissions = [
2343 (solution_transmission_id, Transmission::Solution(solution)),
2344 (transaction_transmission_id, Transmission::Transaction(transaction)),
2345 ]
2346 .into();
2347
2348 let batch_header = BatchHeader::new(
2349 private_key,
2350 round,
2351 timestamp,
2352 committee_id,
2353 transmission_ids,
2354 previous_certificate_ids,
2355 rng,
2356 )
2357 .unwrap();
2358 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2359 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2360 (certificate, transmissions)
2361 }
2362
2363 fn store_certificate_chain(
2365 primary: &Primary<CurrentNetwork>,
2366 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2367 round: u64,
2368 rng: &mut TestRng,
2369 ) -> IndexSet<Field<CurrentNetwork>> {
2370 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2371 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2372 for cur_round in 1..round {
2373 for (_, account) in accounts.iter() {
2374 let (certificate, transmissions) = create_batch_certificate(
2375 account.address(),
2376 accounts,
2377 cur_round,
2378 previous_certificates.clone(),
2379 rng,
2380 );
2381 next_certificates.insert(certificate.id());
2382 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2383 }
2384
2385 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2386 previous_certificates = next_certificates;
2387 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2388 }
2389
2390 previous_certificates
2391 }
2392
2393 fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2396 for (addr, acct) in accounts.iter().skip(1) {
2398 primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2399 }
2400 }
2401
2402 #[test_log::test(tokio::test)]
2403 async fn test_propose_batch() {
2404 let mut rng = TestRng::default();
2405 let (primary, _) = primary_without_handlers(&mut rng);
2406
2407 assert!(primary.proposed_batch.read().is_none());
2409
2410 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2412 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2413
2414 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2416 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2417
2418 assert!(primary.propose_batch().await.is_ok());
2420 assert!(primary.proposed_batch.read().is_proposed());
2421 }
2422
2423 #[test_log::test(tokio::test)]
2424 async fn test_propose_batch_with_no_transmissions() {
2425 let mut rng = TestRng::default();
2426 let (primary, _) = primary_without_handlers(&mut rng);
2427
2428 assert!(primary.proposed_batch.read().is_none());
2430
2431 assert!(primary.propose_batch().await.is_ok());
2433 assert!(primary.proposed_batch.read().is_proposed());
2434 }
2435
2436 #[test_log::test(tokio::test)]
2437 async fn test_propose_batch_in_round() {
2438 let round = 3;
2439 let mut rng = TestRng::default();
2440 let (primary, accounts) = primary_without_handlers(&mut rng);
2441
2442 store_certificate_chain(&primary, &accounts, round, &mut rng);
2444
2445 tokio::time::sleep(MIN_BATCH_DELAY).await;
2447
2448 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2450 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2451
2452 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2454 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2455
2456 assert!(primary.propose_batch().await.is_ok());
2458 assert!(primary.proposed_batch.read().is_proposed());
2459 }
2460
2461 #[test_log::test(tokio::test)]
2462 async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2463 let round = 3;
2464 let prev_round = round - 1;
2465 let mut rng = TestRng::default();
2466 let (primary, accounts) = primary_without_handlers(&mut rng);
2467 let peer_account = &accounts[1];
2468 let peer_ip = peer_account.0;
2469
2470 store_certificate_chain(&primary, &accounts, round, &mut rng);
2472
2473 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2475
2476 let mut num_transmissions_in_previous_round = 0;
2478
2479 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2481 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2482 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2483 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2484
2485 primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2487 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2488
2489 assert_eq!(primary.workers()[0].num_transmissions(), 2);
2491
2492 for (_, account) in accounts.iter() {
2494 let (certificate, transmissions) = create_batch_certificate(
2495 account.address(),
2496 &accounts,
2497 round,
2498 previous_certificate_ids.clone(),
2499 &mut rng,
2500 );
2501
2502 for (transmission_id, transmission) in transmissions.iter() {
2504 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2505 }
2506
2507 num_transmissions_in_previous_round += transmissions.len();
2509 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2510 }
2511
2512 tokio::time::sleep(MIN_BATCH_DELAY).await;
2514
2515 assert!(primary.storage.increment_to_next_round(round).is_ok());
2517
2518 assert_eq!(primary.workers()[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2520
2521 assert!(primary.propose_batch().await.is_ok());
2523
2524 let proposed_transmissions = primary.proposed_batch.read().as_proposal().unwrap().transmissions().clone();
2526 assert_eq!(proposed_transmissions.len(), 2);
2527 assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2528 assert!(
2529 proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2530 );
2531 }
2532
2533 #[test_log::test(tokio::test)]
2534 async fn test_propose_batch_over_spend_limit() {
2535 let mut rng = TestRng::default();
2536
2537 let (accounts, committee) = sample_committee(&mut rng);
2539 let primary = primary_with_committee(
2540 0,
2541 &accounts,
2542 committee.clone(),
2543 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2544 );
2545
2546 assert!(primary.proposed_batch.read().is_none());
2548 primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2550
2551 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2553 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2554
2555 for _i in 0..5 {
2556 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2557 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2559 }
2560
2561 assert!(primary.propose_batch().await.is_ok());
2563 assert_eq!(primary.proposed_batch.read().as_proposal().unwrap().transmissions().len(), 3);
2565 assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2567 }
2568
2569 #[test_log::test(tokio::test)]
2570 async fn test_batch_propose_from_peer() {
2571 let mut rng = TestRng::default();
2572 let (primary, accounts) = primary_without_handlers(&mut rng);
2573
2574 let round = 1;
2576 let peer_account = &accounts[1];
2577 let peer_ip = peer_account.0;
2578 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2579 let proposal = create_test_proposal(
2580 &peer_account.1,
2581 primary.ledger.current_committee().unwrap(),
2582 round,
2583 Default::default(),
2584 timestamp,
2585 1,
2586 &mut rng,
2587 );
2588
2589 for (transmission_id, transmission) in proposal.transmissions() {
2591 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2592 }
2593
2594 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2596
2597 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2600 primary.sync.testing_only_set_sync_height_testing_only(20);
2601
2602 assert!(
2604 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2605 );
2606 }
2607
2608 #[test_log::test(tokio::test)]
2609 async fn test_batch_propose_from_peer_when_not_synced() {
2610 let mut rng = TestRng::default();
2611 let (primary, accounts) = primary_without_handlers(&mut rng);
2612
2613 let round = 1;
2615 let peer_account = &accounts[1];
2616 let peer_ip = peer_account.0;
2617 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2618 let proposal = create_test_proposal(
2619 &peer_account.1,
2620 primary.ledger.current_committee().unwrap(),
2621 round,
2622 Default::default(),
2623 timestamp,
2624 1,
2625 &mut rng,
2626 );
2627
2628 for (transmission_id, transmission) in proposal.transmissions() {
2630 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2631 }
2632
2633 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2635
2636 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2638
2639 assert!(
2641 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2642 );
2643 }
2644
2645 #[test_log::test(tokio::test)]
2646 async fn test_batch_propose_from_peer_in_round() {
2647 let round = 2;
2648 let mut rng = TestRng::default();
2649 let (primary, accounts) = primary_without_handlers(&mut rng);
2650
2651 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2653
2654 let peer_account = &accounts[1];
2656 let peer_ip = peer_account.0;
2657 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2658 let proposal = create_test_proposal(
2659 &peer_account.1,
2660 primary.ledger.current_committee().unwrap(),
2661 round,
2662 previous_certificates,
2663 timestamp,
2664 1,
2665 &mut rng,
2666 );
2667
2668 for (transmission_id, transmission) in proposal.transmissions() {
2670 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2671 }
2672
2673 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2675
2676 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2679 primary.sync.testing_only_set_sync_height_testing_only(20);
2680
2681 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2683 }
2684
2685 #[test_log::test(tokio::test)]
2686 async fn test_batch_propose_from_peer_wrong_round() {
2687 let mut rng = TestRng::default();
2688 let (primary, accounts) = primary_without_handlers(&mut rng);
2689
2690 let round = 1;
2692 let peer_account = &accounts[1];
2693 let peer_ip = peer_account.0;
2694 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2695 let proposal = create_test_proposal(
2696 &peer_account.1,
2697 primary.ledger.current_committee().unwrap(),
2698 round,
2699 Default::default(),
2700 timestamp,
2701 1,
2702 &mut rng,
2703 );
2704
2705 for (transmission_id, transmission) in proposal.transmissions() {
2707 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2708 }
2709
2710 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2712 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2714 primary.sync.testing_only_set_sync_height_testing_only(20);
2715
2716 assert!(
2718 primary
2719 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2720 round: round + 1,
2721 batch_header: Data::Object(proposal.batch_header().clone())
2722 })
2723 .await
2724 .is_err()
2725 );
2726 }
2727
2728 #[test_log::test(tokio::test)]
2729 async fn test_batch_propose_from_peer_in_round_wrong_round() {
2730 let round = 4;
2731 let mut rng = TestRng::default();
2732 let (primary, accounts) = primary_without_handlers(&mut rng);
2733
2734 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2736
2737 let peer_account = &accounts[1];
2739 let peer_ip = peer_account.0;
2740 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2741 let proposal = create_test_proposal(
2742 &peer_account.1,
2743 primary.ledger.current_committee().unwrap(),
2744 round,
2745 previous_certificates,
2746 timestamp,
2747 1,
2748 &mut rng,
2749 );
2750
2751 for (transmission_id, transmission) in proposal.transmissions() {
2753 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2754 }
2755
2756 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2758 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2760 primary.sync.testing_only_set_sync_height_testing_only(0);
2761
2762 assert!(
2764 primary
2765 .process_batch_propose_from_peer(peer_ip, BatchPropose {
2766 round: round + 1,
2767 batch_header: Data::Object(proposal.batch_header().clone())
2768 })
2769 .await
2770 .is_err()
2771 );
2772 }
2773
2774 #[test_log::test(tokio::test)]
2776 async fn test_batch_propose_from_peer_with_past_timestamp() {
2777 let round = 2;
2778 let mut rng = TestRng::default();
2779 let (primary, accounts) = primary_without_handlers(&mut rng);
2780
2781 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2783
2784 let peer_account = &accounts[1];
2786 let peer_ip = peer_account.0;
2787
2788 let last_timestamp = primary
2792 .storage
2793 .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2794 .expect("No previous proposal exists")
2795 .timestamp();
2796 let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY.as_secs() as i64) - 1;
2797
2798 let proposal = create_test_proposal(
2799 &peer_account.1,
2800 primary.ledger.current_committee().unwrap(),
2801 round,
2802 previous_certificates,
2803 invalid_timestamp,
2804 1,
2805 &mut rng,
2806 );
2807
2808 for (transmission_id, transmission) in proposal.transmissions() {
2810 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2811 }
2812
2813 primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2815 primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2817 primary.sync.testing_only_set_sync_height_testing_only(0);
2818
2819 assert!(
2821 primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2822 );
2823 }
2824
2825 #[test_log::test(tokio::test)]
2826 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2827 let round = 3;
2828 let mut rng = TestRng::default();
2829 let (primary, _) = primary_without_handlers(&mut rng);
2830
2831 assert!(primary.proposed_batch.read().is_none());
2833
2834 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2836 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2837
2838 primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2840 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2841
2842 let (old_proposal_round, old_proposal_timestamp) = primary
2844 .latest_proposal_timestamp
2845 .read()
2846 .await
2847 .map(|(round, timestamp)| (round, timestamp))
2848 .unwrap_or((0, 0));
2849 *primary.latest_proposal_timestamp.write().await =
2850 Some((round + 1, old_proposal_timestamp + MIN_BATCH_DELAY.as_secs() as i64));
2851
2852 assert!(primary.propose_batch().await.is_ok());
2854 assert!(primary.proposed_batch.read().is_none());
2855
2856 *primary.latest_proposal_timestamp.write().await = Some((old_proposal_round, old_proposal_timestamp));
2858
2859 assert!(primary.propose_batch().await.is_ok());
2861 assert!(primary.proposed_batch.read().is_proposed());
2862 }
2863
2864 #[test_log::test(tokio::test)]
2865 async fn test_propose_batch_with_storage_round_behind_proposal() {
2866 let round = 5;
2867 let mut rng = TestRng::default();
2868 let (primary, accounts) = primary_without_handlers(&mut rng);
2869
2870 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2872
2873 let timestamp = now();
2875 let proposal = create_test_proposal(
2876 primary.gateway.account(),
2877 primary.ledger.current_committee().unwrap(),
2878 round + 1,
2879 previous_certificates,
2880 timestamp,
2881 1,
2882 &mut rng,
2883 );
2884
2885 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2887
2888 assert!(primary.propose_batch().await.is_ok());
2890 assert!(primary.proposed_batch.read().is_proposed());
2891 assert!(primary.proposed_batch.read().as_proposal().unwrap().round() > primary.current_round());
2892 }
2893
2894 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2895 async fn test_batch_signature_from_peer() {
2896 let mut rng = TestRng::default();
2897 let (primary, accounts) = primary_without_handlers(&mut rng);
2898 map_account_addresses(&primary, &accounts);
2899
2900 let round = 1;
2902 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2903 let proposal = create_test_proposal(
2904 primary.gateway.account(),
2905 primary.ledger.current_committee().unwrap(),
2906 round,
2907 Default::default(),
2908 timestamp,
2909 1,
2910 &mut rng,
2911 );
2912
2913 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2915
2916 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2918
2919 for (socket_addr, signature) in signatures {
2921 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2922 }
2923
2924 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2926 primary.try_increment_to_the_next_round(round + 1).await.unwrap();
2928 assert_eq!(primary.current_round(), round + 1);
2930 }
2931
2932 #[test_log::test(tokio::test(flavor = "multi_thread"))]
2933 async fn test_batch_signature_from_peer_in_round() {
2934 let round = 5;
2935 let mut rng = TestRng::default();
2936 let (primary, accounts) = primary_without_handlers(&mut rng);
2937 map_account_addresses(&primary, &accounts);
2938
2939 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2941
2942 let timestamp = now();
2944 let proposal = create_test_proposal(
2945 primary.gateway.account(),
2946 primary.ledger.current_committee().unwrap(),
2947 round,
2948 previous_certificates,
2949 timestamp,
2950 1,
2951 &mut rng,
2952 );
2953
2954 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2956
2957 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2959
2960 for (socket_addr, signature) in signatures {
2962 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2963 }
2964
2965 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2967 primary.try_increment_to_the_next_round(round + 1).await.unwrap();
2969 assert_eq!(primary.current_round(), round + 1);
2971 }
2972
2973 #[test_log::test(tokio::test)]
2974 async fn test_batch_signature_from_peer_no_quorum() {
2975 let mut rng = TestRng::default();
2976 let (primary, accounts) = primary_without_handlers(&mut rng);
2977 map_account_addresses(&primary, &accounts);
2978
2979 let round = 1;
2981 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2982 let proposal = create_test_proposal(
2983 primary.gateway.account(),
2984 primary.ledger.current_committee().unwrap(),
2985 round,
2986 Default::default(),
2987 timestamp,
2988 1,
2989 &mut rng,
2990 );
2991
2992 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2994
2995 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2997
2998 let (socket_addr, signature) = signatures.first().unwrap();
3000 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
3001
3002 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3004 assert_eq!(primary.current_round(), round);
3006 }
3007
3008 #[test_log::test(tokio::test)]
3009 async fn test_batch_signature_from_peer_in_round_no_quorum() {
3010 let round = 7;
3011 let mut rng = TestRng::default();
3012 let (primary, accounts) = primary_without_handlers(&mut rng);
3013 map_account_addresses(&primary, &accounts);
3014
3015 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
3017
3018 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3020 let proposal = create_test_proposal(
3021 primary.gateway.account(),
3022 primary.ledger.current_committee().unwrap(),
3023 round,
3024 previous_certificates,
3025 timestamp,
3026 1,
3027 &mut rng,
3028 );
3029
3030 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
3032
3033 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
3035
3036 let (socket_addr, signature) = signatures.first().unwrap();
3038 primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
3039
3040 assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3042 assert_eq!(primary.current_round(), round);
3044 }
3045
3046 #[test_log::test(tokio::test)]
3051 async fn test_batch_signature_from_peer_batch_being_certified() {
3052 let mut rng = TestRng::default();
3053 let (primary, accounts) = primary_without_handlers(&mut rng);
3054 map_account_addresses(&primary, &accounts);
3055
3056 let round = 1;
3058 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3059 let proposal = create_test_proposal(
3060 primary.gateway.account(),
3061 primary.ledger.current_committee().unwrap(),
3062 round,
3063 Default::default(),
3064 timestamp,
3065 1,
3066 &mut rng,
3067 );
3068 let batch_id = proposal.batch_id();
3069
3070 *primary.proposed_batch.write() = ProposedBatchState::Certified(batch_id);
3072
3073 let (socket_addr, account) =
3075 accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3076 let signature = account.sign(&[batch_id], &mut rng).unwrap();
3077 let batch_signature = BatchSignature::new(batch_id, signature);
3078
3079 assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_ok());
3081 assert!(matches!(&*primary.proposed_batch.read(), ProposedBatchState::Certified(id) if *id == batch_id));
3083 }
3084
3085 #[test_log::test(tokio::test)]
3088 async fn test_batch_signature_from_peer_unknown_id_while_certifying() {
3089 let mut rng = TestRng::default();
3090 let (primary, accounts) = primary_without_handlers(&mut rng);
3091 map_account_addresses(&primary, &accounts);
3092
3093 let round = 1;
3095 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3096 let proposal_a = create_test_proposal(
3097 primary.gateway.account(),
3098 primary.ledger.current_committee().unwrap(),
3099 round,
3100 Default::default(),
3101 timestamp,
3102 1,
3103 &mut rng,
3104 );
3105 let proposal_b = create_test_proposal(
3106 primary.gateway.account(),
3107 primary.ledger.current_committee().unwrap(),
3108 round,
3109 Default::default(),
3110 timestamp,
3111 1,
3112 &mut rng,
3113 );
3114 let batch_id_a = proposal_a.batch_id();
3115 let batch_id_b = proposal_b.batch_id();
3116 assert_ne!(batch_id_a, batch_id_b);
3117
3118 *primary.proposed_batch.write() = ProposedBatchState::Certified(batch_id_a);
3120
3121 let (socket_addr, account) =
3123 accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3124 let signature = account.sign(&[batch_id_b], &mut rng).unwrap();
3125 let batch_signature = BatchSignature::new(batch_id_b, signature);
3126
3127 assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_err());
3129 }
3130
3131 #[test_log::test(tokio::test(flavor = "multi_thread"))]
3134 async fn test_batch_signature_from_peer_already_certified() {
3135 let mut rng = TestRng::default();
3136 let (primary, accounts) = primary_without_handlers(&mut rng);
3137 map_account_addresses(&primary, &accounts);
3138
3139 let round = 1;
3141 let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3142 let old_proposal = create_test_proposal(
3143 primary.gateway.account(),
3144 primary.ledger.current_committee().unwrap(),
3145 round,
3146 Default::default(),
3147 timestamp,
3148 1,
3149 &mut rng,
3150 );
3151 let old_batch_id = old_proposal.batch_id();
3152 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(old_proposal));
3153 let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
3154 for (socket_addr, signature) in signatures {
3155 primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
3156 }
3157 assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3159
3160 let new_proposal = create_test_proposal(
3162 primary.gateway.account(),
3163 primary.ledger.current_committee().unwrap(),
3164 round,
3165 Default::default(),
3166 timestamp,
3167 1,
3168 &mut rng,
3169 );
3170 assert_ne!(new_proposal.batch_id(), old_batch_id);
3171 *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(new_proposal));
3172
3173 let (socket_addr, account) =
3175 accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3176 let signature = account.sign(&[old_batch_id], &mut rng).unwrap();
3177 let batch_signature = BatchSignature::new(old_batch_id, signature);
3178
3179 assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_ok());
3181 }
3182
3183 #[test_log::test(tokio::test)]
3184 async fn test_insert_certificate_with_aborted_transmissions() {
3185 let round = 3;
3186 let prev_round = round - 1;
3187 let mut rng = TestRng::default();
3188 let (primary, accounts) = primary_without_handlers(&mut rng);
3189 let peer_account = &accounts[1];
3190 let peer_ip = peer_account.0;
3191
3192 store_certificate_chain(&primary, &accounts, round, &mut rng);
3194
3195 let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
3197
3198 let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
3200 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
3201
3202 primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
3204 primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
3205
3206 assert_eq!(primary.workers()[0].num_transmissions(), 2);
3208
3209 let account = accounts[0].1.clone();
3211 let (certificate, transmissions) =
3212 create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3213 let certificate_id = certificate.id();
3214
3215 let mut aborted_transmissions = HashSet::new();
3217 let mut transmissions_without_aborted = HashMap::new();
3218 for (transmission_id, transmission) in transmissions.clone() {
3219 match rng.random::<bool>() || aborted_transmissions.is_empty() {
3220 true => {
3221 aborted_transmissions.insert(transmission_id);
3223 }
3224 false => {
3225 transmissions_without_aborted.insert(transmission_id, transmission);
3227 }
3228 };
3229 }
3230
3231 for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3233 primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3234 }
3235
3236 assert!(
3238 primary
3239 .storage
3240 .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3241 .is_err()
3242 );
3243 assert!(
3244 primary
3245 .storage
3246 .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3247 .is_err()
3248 );
3249
3250 primary
3252 .storage
3253 .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3254 .unwrap();
3255
3256 assert!(primary.storage.contains_certificate(certificate_id));
3258 for aborted_transmission_id in aborted_transmissions {
3260 assert!(primary.storage.contains_transmission(aborted_transmission_id));
3261 assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3262 }
3263 }
3264
3265 #[test]
3271 fn test_add_signature_to_batch_none_state() {
3272 let mut rng = TestRng::default();
3273 let (primary, accounts) = primary_without_handlers(&mut rng);
3274
3275 let peer_ip = accounts[1].0;
3276 let batch_id = Field::rand(&mut rng);
3277 let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3278
3279 let (result, new_state) =
3280 primary.add_signature_to_batch(ProposedBatchState::None, peer_ip, batch_id, signature);
3281
3282 assert!(result.is_err());
3283 assert_eq!(new_state, ProposedBatchState::None);
3284 }
3285
3286 #[test]
3288 fn test_add_signature_to_batch_certified_matching_id() {
3289 let mut rng = TestRng::default();
3290 let (primary, accounts) = primary_without_handlers(&mut rng);
3291
3292 let peer_ip = accounts[1].0;
3293 let batch_id = Field::rand(&mut rng);
3294 let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3295
3296 let (result, new_state) =
3297 primary.add_signature_to_batch(ProposedBatchState::Certified(batch_id), peer_ip, batch_id, signature);
3298
3299 assert!(result.unwrap().is_none());
3300 assert_eq!(new_state, ProposedBatchState::Certified(batch_id));
3301 }
3302
3303 #[test]
3305 fn test_add_signature_to_batch_certified_different_id() {
3306 let mut rng = TestRng::default();
3307 let (primary, accounts) = primary_without_handlers(&mut rng);
3308
3309 let peer_ip = accounts[1].0;
3310 let certified_id = Field::rand(&mut rng);
3311 let other_id = Field::rand(&mut rng);
3312 let signature = accounts[1].1.sign(&[other_id], &mut rng).unwrap();
3313
3314 let (result, new_state) =
3315 primary.add_signature_to_batch(ProposedBatchState::Certified(certified_id), peer_ip, other_id, signature);
3316
3317 assert!(result.is_err());
3318 assert_eq!(new_state, ProposedBatchState::Certified(certified_id));
3319 }
3320
3321 #[tokio::test(flavor = "multi_thread")]
3324 async fn test_add_signature_to_batch_certifying_different_id_in_storage() {
3325 let round = 1;
3326 let mut rng = TestRng::default();
3327 let (primary, accounts) = primary_without_handlers(&mut rng);
3328 map_account_addresses(&primary, &accounts);
3329
3330 let proposal = create_test_proposal(
3332 primary.gateway.account(),
3333 primary.ledger.current_committee().unwrap(),
3334 round,
3335 Default::default(),
3336 now(),
3337 0,
3338 &mut rng,
3339 );
3340 let proposal_batch_id = proposal.batch_id();
3341
3342 let (certificate, transmissions) =
3344 create_batch_certificate(accounts[1].1.address(), &accounts, round, Default::default(), &mut rng);
3345 let stored_batch_id = certificate.batch_id();
3346 primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
3347
3348 let peer_ip = accounts[1].0;
3349 let signature = accounts[1].1.sign(&[stored_batch_id], &mut rng).unwrap();
3350
3351 let (result, new_state) = primary.add_signature_to_batch(
3352 ProposedBatchState::Certifying(Box::new(proposal)),
3353 peer_ip,
3354 stored_batch_id,
3355 signature,
3356 );
3357
3358 assert!(result.unwrap().is_none());
3359 assert_eq!(new_state.as_proposal().unwrap().batch_id(), proposal_batch_id);
3361 }
3362
3363 #[test]
3366 fn test_add_signature_to_batch_certifying_different_id_unknown() {
3367 let mut rng = TestRng::default();
3368 let (primary, accounts) = primary_without_handlers(&mut rng);
3369
3370 let proposal = create_test_proposal(
3371 primary.gateway.account(),
3372 primary.ledger.current_committee().unwrap(),
3373 1,
3374 Default::default(),
3375 now(),
3376 0,
3377 &mut rng,
3378 );
3379 let proposal_batch_id = proposal.batch_id();
3380
3381 let peer_ip = accounts[1].0;
3382 let unknown_id = Field::rand(&mut rng);
3383 let signature = accounts[1].1.sign(&[unknown_id], &mut rng).unwrap();
3384
3385 let (result, new_state) = primary.add_signature_to_batch(
3386 ProposedBatchState::Certifying(Box::new(proposal)),
3387 peer_ip,
3388 unknown_id,
3389 signature,
3390 );
3391
3392 assert!(result.is_err());
3393 assert_eq!(new_state.as_proposal().unwrap().batch_id(), proposal_batch_id);
3394 }
3395
3396 #[test]
3398 fn test_add_signature_to_batch_certifying_matching_no_quorum() {
3399 let mut rng = TestRng::default();
3400 let (primary, accounts) = primary_without_handlers(&mut rng);
3401 map_account_addresses(&primary, &accounts);
3402
3403 let proposal = create_test_proposal(
3404 primary.gateway.account(),
3405 primary.ledger.current_committee().unwrap(),
3406 1,
3407 Default::default(),
3408 now(),
3409 0,
3410 &mut rng,
3411 );
3412 let batch_id = proposal.batch_id();
3413
3414 let peer_ip = accounts[1].0;
3416 let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3417
3418 let (result, new_state) = primary.add_signature_to_batch(
3419 ProposedBatchState::Certifying(Box::new(proposal)),
3420 peer_ip,
3421 batch_id,
3422 signature,
3423 );
3424
3425 assert!(result.unwrap().is_none());
3426 assert_eq!(new_state.as_proposal().unwrap().batch_id(), batch_id);
3427 }
3428
3429 #[test]
3432 fn test_add_signature_to_batch_certifying_matching_quorum_reached() {
3433 let mut rng = TestRng::default();
3434 let (primary, accounts) = primary_without_handlers(&mut rng);
3435 map_account_addresses(&primary, &accounts);
3436
3437 let proposal = create_test_proposal(
3438 primary.gateway.account(),
3439 primary.ledger.current_committee().unwrap(),
3440 1,
3441 Default::default(),
3442 now(),
3443 0,
3444 &mut rng,
3445 );
3446 let batch_id = proposal.batch_id();
3447
3448 let peers: Vec<_> =
3450 accounts.iter().filter(|(_, a)| a.address() != primary.gateway.account().address()).collect();
3451 let mut state = ProposedBatchState::Certifying(Box::new(proposal));
3452 let mut final_result = None;
3453
3454 for (peer_ip, peer_account) in &peers {
3455 let signature = peer_account.sign(&[batch_id], &mut rng).unwrap();
3456 let (result, new_state) = primary.add_signature_to_batch(state, *peer_ip, batch_id, signature);
3457 state = new_state;
3458 if result.as_ref().unwrap().is_some() {
3459 final_result = Some(result);
3460 break;
3461 }
3462 }
3463
3464 let proposal = final_result.expect("quorum should be reached").unwrap().unwrap();
3466 assert_eq!(proposal.batch_id(), batch_id);
3467 assert_eq!(state, ProposedBatchState::Certified(batch_id));
3468 }
3469}