1use crate::{
17 DEVELOPMENT_MODE_RNG_SEED,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 Sync,
22 Worker,
23 helpers::{
24 BFTSender,
25 PrimaryReceiver,
26 PrimarySender,
27 Proposal,
28 ProposalCache,
29 SignedProposals,
30 Storage,
31 assign_to_worker,
32 assign_to_workers,
33 fmt_id,
34 now,
35 },
36 spawn_blocking,
37};
38use amareleo_chain_account::Account;
39use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
40use amareleo_node_bft_ledger_service::LedgerService;
41use amareleo_node_sync::DUMMY_SELF_IP;
42use snarkvm::{
43 console::{prelude::*, types::Address},
44 ledger::{
45 block::Transaction,
46 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
47 puzzle::{Solution, SolutionID},
48 },
49 prelude::{Signature, committee::Committee},
50};
51
52use aleo_std::StorageMode;
53use colored::Colorize;
54use futures::stream::{FuturesUnordered, StreamExt};
55use indexmap::IndexMap;
56#[cfg(feature = "locktick")]
57use locktick::{
58 parking_lot::{Mutex, RwLock},
59 tokio::Mutex as TMutex,
60};
61#[cfg(not(feature = "locktick"))]
62use parking_lot::{Mutex, RwLock};
63use rand::SeedableRng;
64use rand_chacha::ChaChaRng;
65use snarkvm::console::account::PrivateKey;
66
67use std::{
68 collections::{HashMap, HashSet},
69 future::Future,
70 net::SocketAddr,
71 sync::{
72 Arc,
73 atomic::{AtomicBool, Ordering},
74 },
75 time::Duration,
76};
77#[cfg(not(feature = "locktick"))]
78use tokio::sync::Mutex as TMutex;
79use tokio::{sync::OnceCell, task::JoinHandle};
80use tracing::subscriber::DefaultGuard;
81
82pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
84
85#[derive(Clone)]
86pub struct Primary<N: Network> {
87 sync: Sync<N>,
89 account: Account<N>,
91 storage: Storage<N>,
93 keep_state: bool,
95 save_pending: Arc<AtomicBool>,
97 storage_mode: StorageMode,
99 ledger: Arc<dyn LedgerService<N>>,
101 workers: Arc<[Worker<N>]>,
103 bft_sender: Arc<OnceCell<BFTSender<N>>>,
105 proposed_batch: Arc<ProposedBatch<N>>,
107 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
109 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
111 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
113 tracing: Option<TracingHandler>,
115 propose_lock: Arc<TMutex<u64>>,
117}
118
119impl<N: Network> TracingHandlerGuard for Primary<N> {
120 fn get_tracing_guard(&self) -> Option<DefaultGuard> {
122 self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
123 }
124}
125
126impl<N: Network> Primary<N> {
127 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
129
130 pub fn new(
132 account: Account<N>,
133 storage: Storage<N>,
134 keep_state: bool,
135 storage_mode: StorageMode,
136 ledger: Arc<dyn LedgerService<N>>,
137 tracing: Option<TracingHandler>,
138 ) -> Result<Self> {
139 let sync = Sync::new(storage.clone(), ledger.clone());
141
142 Ok(Self {
144 sync,
145 account,
146 storage,
147 keep_state,
148 save_pending: Arc::new(AtomicBool::new(false)),
149 storage_mode,
150 ledger,
151 workers: Arc::from(vec![]),
152 bft_sender: Default::default(),
153 proposed_batch: Default::default(),
154 latest_proposed_batch_timestamp: Default::default(),
155 signed_proposals: Default::default(),
156 handles: Default::default(),
157 tracing,
158 propose_lock: Default::default(),
159 })
160 }
161
162 async fn load_proposal_cache(&self) -> Result<()> {
164 match ProposalCache::<N>::exists(&self.storage_mode) {
166 true => match ProposalCache::<N>::load(self.account.address(), &self.storage_mode, self) {
168 Ok(proposal_cache) => {
169 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
171 proposal_cache.into();
172
173 *self.proposed_batch.write() = proposed_batch;
175 *self.signed_proposals.write() = signed_proposals;
177 *self.propose_lock.lock().await = latest_certificate_round;
179
180 for certificate in pending_certificates {
182 let batch_id = certificate.batch_id();
183 if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
187 {
188 guard_warn!(
189 self,
190 "Failed to load stored certificate {} from proposal cache - {err}",
191 fmt_id(batch_id)
192 );
193 }
194 }
195 Ok(())
196 }
197 Err(err) => {
198 bail!("Failed to read the signed proposals from the file system - {err}.");
199 }
200 },
201 false => Ok(()),
203 }
204 }
205
206 pub async fn run(
208 &mut self,
209 bft_sender: Option<BFTSender<N>>,
210 _primary_sender: PrimarySender<N>,
211 primary_receiver: PrimaryReceiver<N>,
212 ) -> Result<()> {
213 guard_info!(self, "Starting the primary instance of the memory pool...");
214
215 if let Some(bft_sender) = &bft_sender {
217 if self.bft_sender.set(bft_sender.clone()).is_err() {
219 guard_error!(self, "Unexpected: BFT sender already set");
220 bail!("Unexpected: BFT sender already set");
221 }
222 }
223
224 let mut workers = Vec::new();
226 for id in 0..MAX_WORKERS {
228 let worker = Worker::new(
230 id,
231 self.storage.clone(),
232 self.ledger.clone(),
233 self.proposed_batch.clone(),
234 self.tracing.clone(),
235 )?;
236
237 workers.push(worker);
239 }
240 self.workers = Arc::from(workers);
242
243 self.sync.initialize(bft_sender).await?;
245 self.load_proposal_cache().await?;
247 self.sync.run().await?;
249
250 self.start_handlers(primary_receiver);
253
254 if self.keep_state {
256 self.save_pending.store(true, Ordering::Relaxed);
257 }
258
259 Ok(())
260 }
261
262 pub fn current_round(&self) -> u64 {
264 self.storage.current_round()
265 }
266
267 pub fn is_synced(&self) -> bool {
269 self.sync.is_synced()
270 }
271
272 pub const fn account(&self) -> &Account<N> {
274 &self.account
275 }
276
277 pub const fn storage(&self) -> &Storage<N> {
279 &self.storage
280 }
281
282 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
284 &self.ledger
285 }
286
287 pub fn num_workers(&self) -> u8 {
289 u8::try_from(self.workers.len()).expect("Too many workers")
290 }
291}
292
293impl<N: Network> Primary<N> {
294 pub fn num_unconfirmed_transmissions(&self) -> usize {
296 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
297 }
298
299 pub fn num_unconfirmed_ratifications(&self) -> usize {
301 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
302 }
303
304 pub fn num_unconfirmed_solutions(&self) -> usize {
306 self.workers.iter().map(|worker| worker.num_solutions()).sum()
307 }
308
309 pub fn num_unconfirmed_transactions(&self) -> usize {
311 self.workers.iter().map(|worker| worker.num_transactions()).sum()
312 }
313}
314
315impl<N: Network> Primary<N> {
316 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
318 self.workers.iter().flat_map(|worker| worker.transmission_ids())
319 }
320
321 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
323 self.workers.iter().flat_map(|worker| worker.transmissions())
324 }
325
326 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
328 self.workers.iter().flat_map(|worker| worker.solutions())
329 }
330
331 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
333 self.workers.iter().flat_map(|worker| worker.transactions())
334 }
335}
336
337impl<N: Network> Primary<N> {
338 pub fn clear_worker_solutions(&self) {
340 self.workers.iter().for_each(Worker::clear_solutions);
341 }
342}
343
344impl<N: Network> Primary<N> {
345 pub async fn propose_batch(&self, complete: bool) -> Result<()> {
346 let mut rng = ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
347 let mut all_acc: Vec<Account<N>> = Vec::new();
348 for _ in 0u64..4u64 {
349 let private_key = PrivateKey::<N>::new(&mut rng)?;
350 let acc = Account::<N>::try_from(private_key)?;
351 all_acc.push(acc);
352 }
353
354 let other_acc: Vec<&Account<N>> = all_acc.iter().skip(1).collect();
356 let round = self.propose_batch_lite(&other_acc, complete).await?;
357 if !complete || round == 0u64 {
358 return Ok(());
359 }
360
361 for vid in 1u64..4u64 {
363 let primary_acc = &all_acc[vid as usize];
364 let other_acc: Vec<&Account<N>> =
365 all_acc.iter().filter(|acc| acc.address() != primary_acc.address()).collect();
366
367 self.fake_proposal(vid, primary_acc, &other_acc, round).await?;
368 }
369 Ok(())
370 }
371
372 pub async fn propose_batch_lite(&self, other_acc: &[&Account<N>], complete: bool) -> Result<u64> {
373 let mut lock_guard = self.propose_lock.lock().await;
375
376 let round = self.current_round();
378 let previous_round = round.saturating_sub(1);
380
381 ensure!(round > 0, "Round 0 cannot have transaction batches");
383
384 if round < *lock_guard {
386 guard_warn!(
387 self,
388 "Cannot propose a batch for round {round} - the latest proposal cache round is {}",
389 *lock_guard
390 );
391 return Ok(0u64);
392 }
393
394 #[cfg(feature = "metrics")]
395 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
396
397 if let Err(e) = self.check_proposal_timestamp(previous_round, self.account.address(), now()) {
399 guard_debug!(
400 self,
401 "Primary is safely skipping a batch proposal for round {round} - {}",
402 format!("{e}").dimmed()
403 );
404 return Ok(0u64);
405 }
406
407 if self.storage.contains_certificate_in_round_from(round, self.account.address()) {
409 if let Some(bft_sender) = self.bft_sender.get() {
411 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
412 Ok(true) => (), Ok(false) => return Ok(0u64),
416 Err(e) => {
418 guard_warn!(self, "Failed to update the BFT to the next round - {e}");
419 return Err(e);
420 }
421 }
422 }
423 guard_debug!(
424 self,
425 "Primary is safely skipping {}",
426 format!("(round {round} was already certified)").dimmed()
427 );
428 return Ok(0u64);
429 }
430
431 if round == *lock_guard {
437 guard_warn!(self, "Primary is safely skipping a batch proposal - round {round} already proposed");
438 return Ok(0u64);
439 }
440
441 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
443 {
445 let mut connected_validators: HashSet<Address<N>> = other_acc.iter().map(|acc| acc.address()).collect();
447
448 connected_validators.insert(self.account.address());
450
451 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
453 guard_debug!(
454 self,
455 "Primary is safely skipping a batch proposal for round {round} {}",
456 "(please connect to more validators)".dimmed()
457 );
458 guard_trace!(self, "Primary is connected to {} validators", connected_validators.len() - 1);
459 return Ok(0u64);
460 }
461 }
462
463 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
465
466 let mut is_ready = previous_round == 0;
469 if previous_round > 0 {
471 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
473 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
474 };
475 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
477 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
479 is_ready = true;
480 }
481 }
482 if !is_ready {
484 guard_debug!(
485 self,
486 "Primary is safely skipping a batch proposal for round {round} {}",
487 format!("(previous round {previous_round} has not reached quorum)").dimmed()
488 );
489 return Ok(0u64);
490 }
491
492 let mut transmissions: IndexMap<_, _> = Default::default();
494 let mut proposal_cost = 0u64;
496 debug_assert_eq!(MAX_WORKERS, 1);
500
501 'outer: for worker in self.workers.iter() {
502 let mut num_worker_transmissions = 0usize;
503
504 while let Some((id, transmission)) = worker.remove_front() {
505 if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
507 break 'outer;
508 }
509
510 if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
512 continue 'outer;
513 }
514
515 if self.ledger.contains_transmission(&id).unwrap_or(true) {
517 guard_trace!(self, "Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
518 continue;
519 }
520
521 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
525 guard_trace!(self, "Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
526 continue;
527 }
528
529 match (id, transmission.clone()) {
531 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
532 if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
534 {
535 guard_trace!(
536 self,
537 "Proposing - Skipping solution '{}' - Checksum mismatch",
538 fmt_id(solution_id)
539 );
540 continue;
541 }
542 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
544 guard_trace!(self, "Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
545 continue;
546 }
547 }
548 (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
549 if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
551 {
552 guard_trace!(
553 self,
554 "Proposing - Skipping transaction '{}' - Checksum mismatch",
555 fmt_id(transaction_id)
556 );
557 continue;
558 }
559
560 let transaction = spawn_blocking!({
562 match transaction {
563 Data::Object(transaction) => Ok(transaction),
564 Data::Buffer(bytes) => {
565 Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
566 }
567 }
568 })?;
569
570 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await {
573 guard_trace!(self, "Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
574 continue;
575 }
576
577 let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(transaction_id, transaction)
580 else {
581 guard_debug!(
582 self,
583 "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
584 fmt_id(transaction_id)
585 );
586 continue;
587 };
588
589 let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
592 guard_debug!(
593 self,
594 "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
595 fmt_id(transaction_id)
596 );
597 continue;
598 };
599
600 if next_proposal_cost > BatchHeader::<N>::BATCH_SPEND_LIMIT {
602 guard_trace!(
603 self,
604 "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
605 fmt_id(transaction_id),
606 BatchHeader::<N>::BATCH_SPEND_LIMIT
607 );
608
609 worker.insert_front(id, transmission);
611 break 'outer;
612 }
613
614 proposal_cost = next_proposal_cost;
616 }
617
618 (TransmissionID::Ratification, Transmission::Ratification) => continue,
621 _ => continue,
623 }
624
625 transmissions.insert(id, transmission);
627 num_worker_transmissions = num_worker_transmissions.saturating_add(1);
628 }
629 }
630
631 let current_timestamp = now();
633
634 *lock_guard = round;
635
636 guard_info!(self, "Proposing a batch with {} transmissions for round {round}...", transmissions.len());
638
639 let private_key = *self.account.private_key();
641 let committee_id = committee_lookback.id();
643 let transmission_ids = transmissions.keys().copied().collect();
645 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
647 let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
649 &private_key,
650 round,
651 current_timestamp,
652 committee_id,
653 transmission_ids,
654 previous_certificate_ids,
655 &mut rand::thread_rng()
656 ))
657 .and_then(|batch_header| {
658 Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
659 .map(|proposal| (batch_header, proposal))
660 })
661 .inspect_err(|_| {
662 if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
664 guard_error!(self, "Failed to reinsert transmissions: {e:?}");
665 }
666 })?;
667 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
669
670 if complete {
675 self.complete_batch_lite(other_acc, round, batch_header, proposal, committee_lookback).await?;
676 }
677
678 Ok(round)
679 }
680
681 pub async fn complete_batch_lite(
682 &self,
683 other_acc: &[&Account<N>],
684 round: u64,
685 batch_header: BatchHeader<N>,
686 mut proposal: Proposal<N>,
687 committee_lookback: Committee<N>,
688 ) -> Result<()> {
689 guard_info!(self, "Quorum threshold reached - Preparing to certify our batch for round {round}...");
690
691 let batch_id = batch_header.batch_id();
693
694 for acc in other_acc.iter() {
696 let signer_acc = (*acc).clone();
698 let signer = signer_acc.address();
699 let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
700
701 proposal.add_signature(signer, signature, &committee_lookback)?;
703 }
704
705 if let Err(e) = self.store_and_broadcast_certificate_lite(&proposal, &committee_lookback).await {
708 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
710 return Err(e);
711 }
712
713 #[cfg(feature = "metrics")]
714 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
715 Ok(())
716 }
717
718 pub async fn fake_proposal(
719 &self,
720 vid: u64,
721 primary_acc: &Account<N>,
722 other_acc: &[&Account<N>],
723 round: u64,
724 ) -> Result<()> {
725 let transmissions: IndexMap<_, _> = Default::default();
726 let transmission_ids = transmissions.keys().copied().collect();
727
728 let private_key = *primary_acc.private_key();
729 let current_timestamp = now();
730
731 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
732 let committee_id = committee_lookback.id();
733
734 let previous_round = round.saturating_sub(1);
735 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
736 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
737
738 let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
739 &private_key,
740 round,
741 current_timestamp,
742 committee_id,
743 transmission_ids,
744 previous_certificate_ids,
745 &mut rand::thread_rng()
746 ))
747 .and_then(|batch_header| {
748 Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
749 .map(|proposal| (batch_header, proposal))
750 })?;
751
752 let batch_id = batch_header.batch_id();
754 let mut our_sign: Option<Signature<N>> = None;
755
756 for acc in other_acc.iter() {
758 let signer_acc = (*acc).clone();
760 let signer = signer_acc.address();
761 let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
762
763 if signer == self.account.address() {
764 our_sign = Some(signature);
765 }
766
767 proposal.add_signature(signer, signature, &committee_lookback)?;
769 }
770
771 let our_sign = match our_sign {
773 Some(sign) => sign,
774 None => bail!("Fake Proposal generation failed. Validator 0 signature missing."),
775 };
776
777 let (certificate, transmissions) =
779 tokio::task::block_in_place(|| proposal.to_certificate(&committee_lookback))?;
780
781 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
784
785 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
787 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
788 guard_info!(self, "Stored a batch certificate for validator/round {vid}/{round}");
789
790 match self.signed_proposals.write().0.entry(primary_acc.address()) {
791 std::collections::hash_map::Entry::Occupied(mut entry) => {
792 if entry.get().0 == round {
797 return Ok(());
798 }
799 entry.insert((round, batch_id, our_sign));
801 guard_info!(self, "Inserted signature to signed_proposals {vid}/{round}");
802 }
803 std::collections::hash_map::Entry::Vacant(entry) => {
805 entry.insert((round, batch_id, our_sign));
807 guard_info!(self, "Inserted signature to signed_proposals {vid}/{round}");
808 }
809 };
810
811 if let Some(bft_sender) = self.bft_sender.get() {
812 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
814 guard_warn!(self, "Failed to update the BFT DAG from sync: {e}");
815 return Err(e);
816 };
817 }
818
819 Ok(())
820 }
821}
822
823impl<N: Network> Primary<N> {
824 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
826 let PrimaryReceiver { mut rx_unconfirmed_solution, mut rx_unconfirmed_transaction } = primary_receiver;
827
828 let self_ = self.clone();
830 self.spawn(async move {
831 loop {
832 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
834 let current_round = self_.current_round();
835 if !self_.is_synced() {
837 guard_debug!(
838 self_,
839 "Skipping batch proposal for round {current_round} {}",
840 "(node is syncing)".dimmed()
841 );
842 continue;
843 }
844 if self_.propose_lock.try_lock().is_err() {
847 guard_trace!(
848 self_,
849 "Skipping batch proposal for round {current_round} {}",
850 "(node is already proposing)".dimmed()
851 );
852 continue;
853 };
854 if let Err(e) = self_.propose_batch(true).await {
858 guard_warn!(self_, "Cannot propose a batch - {e}");
859 }
860 }
861 });
862
863 let self_ = self.clone();
867 self.spawn(async move {
868 loop {
869 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
871 if !self_.is_synced() {
873 guard_trace!(self_, "Skipping round increment {}", "(node is syncing)".dimmed());
874 continue;
875 }
876 let next_round = self_.current_round().saturating_add(1);
878 let is_quorum_threshold_reached = {
880 let authors = self_.storage.get_certificate_authors_for_round(next_round);
882 if authors.is_empty() {
884 continue;
885 }
886 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
887 guard_warn!(self_, "Failed to retrieve the committee lookback for round {next_round}");
888 continue;
889 };
890 committee_lookback.is_quorum_threshold_reached(&authors)
891 };
892 if is_quorum_threshold_reached {
894 guard_debug!(self_, "Quorum threshold reached for round {}", next_round);
895 if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
896 guard_warn!(self_, "Failed to increment to the next round - {e}");
897 }
898 }
899 }
900 });
901
902 let self_ = self.clone();
904 self.spawn(async move {
905 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
906 let Ok(checksum) = solution.to_checksum::<N>() else {
908 guard_error!(self_, "Failed to compute the checksum for the unconfirmed solution");
909 continue;
910 };
911 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
913 guard_error!(self_, "Unable to determine the worker ID for the unconfirmed solution");
914 continue;
915 };
916
917 let worker = &self_.workers[worker_id as usize];
919 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
921 callback.send(result).ok();
923 }
924 });
925
926 let self_ = self.clone();
928 self.spawn(async move {
929 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
930 guard_trace!(self_, "Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
931 let Ok(checksum) = transaction.to_checksum::<N>() else {
933 guard_error!(self_, "Failed to compute the checksum for the unconfirmed transaction");
934 continue;
935 };
936 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
938 guard_error!(self_, "Unable to determine the worker ID for the unconfirmed transaction");
939 continue;
940 };
941
942 let worker = &self_.workers[worker_id as usize];
944 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
946 callback.send(result).ok();
948 }
949 });
950 }
951
952 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
954 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
956 let mut fast_forward_round = self.current_round();
957 while fast_forward_round < next_round.saturating_sub(1) {
959 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
961 *self.proposed_batch.write() = None;
963 }
964 }
965
966 let current_round = self.current_round();
968 if current_round < next_round {
970 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
972 match bft_sender.send_primary_round_to_bft(current_round).await {
973 Ok(is_ready) => is_ready,
974 Err(e) => {
975 guard_warn!(self, "Failed to update the BFT to the next round - {e}");
976 return Err(e);
977 }
978 }
979 }
980 else {
982 self.storage.increment_to_next_round(current_round)?;
984 true
986 };
987
988 match is_ready {
990 true => guard_debug!(self, "Primary is ready to propose the next round"),
991 false => guard_debug!(self, "Primary is not ready to propose the next round"),
992 }
993
994 if is_ready {
996 self.propose_batch(true).await?;
997 }
998 }
999 Ok(())
1000 }
1001
1002 async fn try_increment_to_the_next_round_lite(&self, next_round: u64) -> Result<()> {
1004 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1006 let mut fast_forward_round = self.current_round();
1007 while fast_forward_round < next_round.saturating_sub(1) {
1009 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1011 *self.proposed_batch.write() = None;
1013 }
1014 }
1015
1016 let current_round = self.current_round();
1018 if current_round < next_round {
1020 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1022 match bft_sender.send_primary_round_to_bft(current_round).await {
1023 Ok(is_ready) => is_ready,
1024 Err(e) => {
1025 guard_warn!(self, "Failed to update the BFT to the next round - {e}");
1026 return Err(e);
1027 }
1028 }
1029 }
1030 else {
1032 self.storage.increment_to_next_round(current_round)?;
1034 true
1036 };
1037
1038 match is_ready {
1040 true => guard_debug!(self, "Primary is ready to propose the next round"),
1041 false => guard_debug!(self, "Primary is not ready to propose the next round"),
1042 }
1043
1044 }
1049 Ok(())
1050 }
1051
1052 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1055 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1057 Some(certificate) => certificate.timestamp(),
1059 None => *self.latest_proposed_batch_timestamp.read(),
1060 };
1061
1062 let elapsed = timestamp
1064 .checked_sub(previous_timestamp)
1065 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1066 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1068 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1069 false => Ok(()),
1070 }
1071 }
1072
1073 async fn store_and_broadcast_certificate_lite(
1075 &self,
1076 proposal: &Proposal<N>,
1077 committee: &Committee<N>,
1078 ) -> Result<()> {
1079 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1081 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1084 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1086 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1087 guard_debug!(self, "Stored a batch certificate for round {}", certificate.round());
1088 if let Some(bft_sender) = self.bft_sender.get() {
1090 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1092 guard_warn!(self, "Failed to update the BFT DAG from primary - {e}");
1093 return Err(e);
1094 };
1095 }
1096 let num_transmissions = certificate.transmission_ids().len();
1098 let round = certificate.round();
1099 guard_info!(self, "\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1100 self.try_increment_to_the_next_round_lite(round + 1).await
1102 }
1103
1104 fn reinsert_transmissions_into_workers(
1107 &self,
1108 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1109 ) -> Result<()> {
1110 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1112 worker.reinsert(transmission_id, transmission);
1113 })
1114 }
1115
1116 #[async_recursion::async_recursion]
1126 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1127 &self,
1128 peer_ip: SocketAddr,
1129 certificate: BatchCertificate<N>,
1130 ) -> Result<()> {
1131 let batch_header = certificate.batch_header();
1133 let batch_round = batch_header.round();
1135
1136 if batch_round <= self.storage.gc_round() {
1138 return Ok(());
1139 }
1140 if self.storage.contains_certificate(certificate.id()) {
1142 return Ok(());
1143 }
1144
1145 if !IS_SYNCING && !self.is_synced() {
1147 bail!(
1148 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1149 fmt_id(certificate.id())
1150 );
1151 }
1152
1153 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1155
1156 if !self.storage.contains_certificate(certificate.id()) {
1158 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1160 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1161 guard_debug!(self, "Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1162 if let Some(bft_sender) = self.bft_sender.get() {
1164 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1166 guard_warn!(self, "Failed to update the BFT DAG from sync: {e}");
1167 return Err(e);
1168 };
1169 }
1170 }
1171 Ok(())
1172 }
1173
1174 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1176 &self,
1177 peer_ip: SocketAddr,
1178 batch_header: &BatchHeader<N>,
1179 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1180 let batch_round = batch_header.round();
1182
1183 if batch_round <= self.storage.gc_round() {
1185 bail!("Round {batch_round} is too far in the past")
1186 }
1187
1188 if !IS_SYNCING && !self.is_synced() {
1190 bail!(
1191 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1192 fmt_id(batch_header.batch_id())
1193 );
1194 }
1195
1196 let is_quorum_threshold_reached = {
1198 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1199 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1200 committee_lookback.is_quorum_threshold_reached(&authors)
1201 };
1202
1203 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1208 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1210 if is_behind_schedule || is_peer_far_in_future {
1212 self.try_increment_to_the_next_round(batch_round).await?;
1214 }
1215
1216 let missing_transmissions = self.fetch_missing_transmissions(batch_header).await.map_err(|e| {
1218 anyhow!("Failed to fetch missing transmissions for round {batch_round} from '{peer_ip}' - {e}")
1219 })?;
1220
1221 Ok(missing_transmissions)
1222 }
1223
1224 async fn fetch_missing_transmissions(
1227 &self,
1228 batch_header: &BatchHeader<N>,
1229 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1230 if batch_header.round() <= self.storage.gc_round() {
1232 return Ok(Default::default());
1233 }
1234
1235 if self.storage.contains_batch(batch_header.batch_id()) {
1237 guard_trace!(self, "Batch for round {} from peer has already been processed", batch_header.round());
1238 return Ok(Default::default());
1239 }
1240
1241 let workers = self.workers.clone();
1243
1244 let mut fetch_transmissions = FuturesUnordered::new();
1246
1247 let num_workers = self.num_workers();
1249 for transmission_id in batch_header.transmission_ids() {
1251 if !self.storage.contains_transmission(*transmission_id) {
1253 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1255 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1256 };
1257 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1259 fetch_transmissions.push(worker.get_or_fetch_transmission(*transmission_id));
1261 }
1262 }
1263
1264 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1266 while let Some(result) = fetch_transmissions.next().await {
1268 let (transmission_id, transmission) = result?;
1270 transmissions.insert(transmission_id, transmission);
1272 }
1273 Ok(transmissions)
1275 }
1276}
1277
1278impl<N: Network> Primary<N> {
1279 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1281 self.handles.lock().push(tokio::spawn(future));
1282 }
1283
1284 pub async fn shut_down(&self) {
1286 guard_info!(self, "Shutting down the primary...");
1287 {
1288 let mut handles = self.handles.lock();
1290 handles.iter().for_each(|handle| handle.abort());
1291 handles.clear();
1292 }
1293
1294 let save_pending = self.save_pending.swap(false, Ordering::AcqRel);
1298 if save_pending {
1299 let proposal_cache = {
1300 let proposal = self.proposed_batch.write().take();
1301 let signed_proposals = self.signed_proposals.read().clone();
1302 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1303 let pending_certificates = self.storage.get_pending_certificates();
1304 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1305 };
1306
1307 if let Err(err) = proposal_cache.store(&self.storage_mode, self) {
1308 guard_error!(self, "Failed to store the current proposal cache: {err}");
1309 }
1310 }
1311 }
1312}
1313
1314#[cfg(test)]
1315mod tests {
1316 use super::*;
1317 use amareleo_node_bft_ledger_service::MockLedgerService;
1318 use amareleo_node_bft_storage_service::BFTMemoryService;
1319 use snarkvm::{
1320 console::types::Field,
1321 ledger::{
1322 committee::{Committee, MIN_VALIDATOR_STAKE},
1323 ledger_test_helpers::sample_execution_transaction_with_fee,
1324 },
1325 prelude::{Address, Signature},
1326 };
1327
1328 use bytes::Bytes;
1329 use indexmap::IndexSet;
1330 use rand::RngCore;
1331
1332 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1333
1334 fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
1335 const COMMITTEE_SIZE: usize = 4;
1337 let mut rng_validator = ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1338 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1339 let mut members = IndexMap::new();
1340
1341 for i in 0..COMMITTEE_SIZE {
1342 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1343 let account = Account::new(&mut rng_validator).unwrap();
1344
1345 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1346 accounts.push((socket_addr, account));
1347 }
1348
1349 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1350 }
1351
1352 fn primary_with_committee(
1354 account_index: usize,
1355 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1356 committee: Committee<CurrentNetwork>,
1357 height: u32,
1358 ) -> Primary<CurrentNetwork> {
1359 let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
1360 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1361
1362 let account = accounts[account_index].1.clone();
1364 let mut primary = Primary::new(account, storage, false, StorageMode::Development(0), ledger, None).unwrap();
1365
1366 primary.workers = Arc::from([Worker::new(
1368 0, primary.storage.clone(),
1370 primary.ledger.clone(),
1371 primary.proposed_batch.clone(),
1372 None,
1373 )
1374 .unwrap()]);
1375
1376 primary
1377 }
1378
1379 fn primary_without_handlers(
1380 rng: &mut TestRng,
1381 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1382 let (accounts, committee) = sample_committee(rng);
1383 let primary = primary_with_committee(
1384 0, &accounts,
1386 committee,
1387 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
1388 );
1389
1390 (primary, accounts)
1391 }
1392
1393 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1395 let solution_id = rng.gen::<u64>().into();
1397 let size = rng.gen_range(1024..10 * 1024);
1399 let mut vec = vec![0u8; size];
1401 rng.fill_bytes(&mut vec);
1402 let solution = Data::Buffer(Bytes::from(vec));
1403 (solution_id, solution)
1405 }
1406
1407 fn sample_unconfirmed_transaction(
1409 rng: &mut TestRng,
1410 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1411 let transaction = sample_execution_transaction_with_fee(false, rng);
1412 let id = transaction.id();
1413
1414 (id, Data::Object(transaction))
1415 }
1416
1417 fn create_test_proposal(
1419 author: &Account<CurrentNetwork>,
1420 committee: Committee<CurrentNetwork>,
1421 round: u64,
1422 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1423 timestamp: i64,
1424 num_transactions: u64,
1425 rng: &mut TestRng,
1426 ) -> Proposal<CurrentNetwork> {
1427 let mut transmission_ids = IndexSet::new();
1428 let mut transmissions = IndexMap::new();
1429
1430 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1432 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1433 let solution_transmission_id = (solution_id, solution_checksum).into();
1434 transmission_ids.insert(solution_transmission_id);
1435 transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
1436
1437 for _ in 0..num_transactions {
1439 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1440 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1441 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1442 transmission_ids.insert(transaction_transmission_id);
1443 transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
1444 }
1445
1446 let private_key = author.private_key();
1448 let batch_header = BatchHeader::new(
1450 private_key,
1451 round,
1452 timestamp,
1453 committee.id(),
1454 transmission_ids,
1455 previous_certificate_ids,
1456 rng,
1457 )
1458 .unwrap();
1459 Proposal::new(committee, batch_header, transmissions).unwrap()
1461 }
1462
1463 fn peer_signatures_for_batch(
1465 primary_address: Address<CurrentNetwork>,
1466 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1467 batch_id: Field<CurrentNetwork>,
1468 rng: &mut TestRng,
1469 ) -> IndexSet<Signature<CurrentNetwork>> {
1470 let mut signatures = IndexSet::new();
1471 for (_, account) in accounts {
1472 if account.address() == primary_address {
1473 continue;
1474 }
1475 let signature = account.sign(&[batch_id], rng).unwrap();
1476 signatures.insert(signature);
1477 }
1478 signatures
1479 }
1480
1481 fn create_batch_certificate(
1483 primary_address: Address<CurrentNetwork>,
1484 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1485 round: u64,
1486 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1487 rng: &mut TestRng,
1488 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1489 let timestamp = now();
1490
1491 let author =
1492 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1493 let private_key = author.private_key();
1494
1495 let committee_id = Field::rand(rng);
1496 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1497 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1498 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1499 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1500
1501 let solution_transmission_id = (solution_id, solution_checksum).into();
1502 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1503
1504 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1505 let transmissions = [
1506 (solution_transmission_id, Transmission::Solution(solution)),
1507 (transaction_transmission_id, Transmission::Transaction(transaction)),
1508 ]
1509 .into();
1510
1511 let batch_header = BatchHeader::new(
1512 private_key,
1513 round,
1514 timestamp,
1515 committee_id,
1516 transmission_ids,
1517 previous_certificate_ids,
1518 rng,
1519 )
1520 .unwrap();
1521 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1522 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1523 (certificate, transmissions)
1524 }
1525
1526 fn store_certificate_chain(
1528 primary: &Primary<CurrentNetwork>,
1529 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1530 round: u64,
1531 rng: &mut TestRng,
1532 ) -> IndexSet<Field<CurrentNetwork>> {
1533 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1534 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1535 for cur_round in 1..round {
1536 for (_, account) in accounts.iter() {
1537 let (certificate, transmissions) = create_batch_certificate(
1538 account.address(),
1539 accounts,
1540 cur_round,
1541 previous_certificates.clone(),
1542 rng,
1543 );
1544 next_certificates.insert(certificate.id());
1545 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1546 }
1547
1548 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
1549 previous_certificates = next_certificates;
1550 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1551 }
1552
1553 previous_certificates
1554 }
1555
1556 #[tokio::test]
1557 async fn test_propose_batch() {
1558 let mut rng = TestRng::default();
1559 let (primary, _) = primary_without_handlers(&mut rng);
1560
1561 assert!(primary.proposed_batch.read().is_none());
1563
1564 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1566 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1567
1568 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1570 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1571
1572 assert!(primary.propose_batch(false).await.is_ok());
1574
1575 }
1578
1579 #[tokio::test]
1580 async fn test_propose_batch_with_no_transmissions() {
1581 let mut rng = TestRng::default();
1582 let (primary, _) = primary_without_handlers(&mut rng);
1583
1584 assert!(primary.proposed_batch.read().is_none());
1586
1587 assert!(primary.propose_batch(false).await.is_ok());
1589
1590 }
1593
1594 #[tokio::test]
1595 async fn test_propose_batch_in_round() {
1596 let round = 3;
1597 let mut rng = TestRng::default();
1598 let (primary, accounts) = primary_without_handlers(&mut rng);
1599
1600 store_certificate_chain(&primary, &accounts, round, &mut rng);
1602
1603 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
1605
1606 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1608 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1609
1610 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1612 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1613
1614 assert!(primary.propose_batch(false).await.is_ok());
1616
1617 }
1620
1621 #[tokio::test]
1622 async fn test_propose_batch_over_spend_limit() {
1623 let mut rng = TestRng::default();
1624
1625 let (accounts, committee) = sample_committee(&mut rng);
1627 let primary = primary_with_committee(
1628 0,
1629 &accounts,
1630 committee.clone(),
1631 CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
1632 );
1633
1634 assert!(primary.proposed_batch.read().is_none());
1636 primary.workers.iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
1638
1639 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1641 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1642
1643 for _i in 0..5 {
1644 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1645 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1647 }
1648
1649 assert!(primary.propose_batch(false).await.is_ok());
1651
1652 assert_eq!(primary.workers.iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
1658 }
1659
1660 #[tokio::test]
1661 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
1662 let round = 3;
1663 let mut rng = TestRng::default();
1664 let (primary, _) = primary_without_handlers(&mut rng);
1665
1666 assert!(primary.proposed_batch.read().is_none());
1668
1669 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1671 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1672
1673 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1675 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1676
1677 let old_proposal_lock_round = *primary.propose_lock.lock().await;
1679 *primary.propose_lock.lock().await = round + 1;
1680
1681 assert!(primary.propose_batch(false).await.is_ok());
1683 assert!(primary.proposed_batch.read().is_none());
1684
1685 *primary.propose_lock.lock().await = old_proposal_lock_round;
1687
1688 assert!(primary.propose_batch(false).await.is_ok());
1690
1691 }
1694
1695 #[tokio::test]
1696 async fn test_propose_batch_with_storage_round_behind_proposal() {
1697 let round = 5;
1698 let mut rng = TestRng::default();
1699 let (primary, accounts) = primary_without_handlers(&mut rng);
1700
1701 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
1703
1704 let timestamp = now();
1706 let proposal = create_test_proposal(
1707 &primary.account,
1708 primary.ledger.current_committee().unwrap(),
1709 round + 1,
1710 previous_certificates,
1711 timestamp,
1712 1,
1713 &mut rng,
1714 );
1715
1716 *primary.proposed_batch.write() = Some(proposal);
1718
1719 assert!(primary.propose_batch(false).await.is_ok());
1721 assert!(primary.proposed_batch.read().is_some());
1722 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
1723 }
1724}