1use crate::{
17 MAX_BATCH_DELAY_IN_MS,
18 MAX_WORKERS,
19 MIN_BATCH_DELAY_IN_SECS,
20 Sync,
21 Worker,
22 helpers::{
23 BFTSender,
24 PrimaryReceiver,
25 PrimarySender,
26 Proposal,
27 ProposalCache,
28 SignedProposals,
29 Storage,
30 assign_to_worker,
31 assign_to_workers,
32 fmt_id,
33 now,
34 },
35 spawn_blocking,
36};
37use amareleo_chain_account::Account;
38use amareleo_node_bft_ledger_service::LedgerService;
39use amareleo_node_sync::DUMMY_SELF_IP;
40use snarkvm::{
41 console::{prelude::*, types::Address},
42 ledger::{
43 block::Transaction,
44 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
45 puzzle::{Solution, SolutionID},
46 },
47 prelude::{Signature, committee::Committee},
48};
49
50use aleo_std::StorageMode;
51use colored::Colorize;
52use futures::stream::{FuturesUnordered, StreamExt};
53use indexmap::IndexMap;
54use parking_lot::{Mutex, RwLock};
55
56use rand::SeedableRng;
57use rand_chacha::ChaChaRng;
58use snarkvm::console::account::PrivateKey;
59
60use std::{
61 collections::{HashMap, HashSet},
62 future::Future,
63 net::SocketAddr,
64 sync::Arc,
65 time::Duration,
66};
67use tokio::{
68 sync::{Mutex as TMutex, OnceCell},
69 task::JoinHandle,
70};
71
72pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
74
75#[derive(Clone)]
76pub struct Primary<N: Network> {
77 sync: Sync<N>,
79 account: Account<N>,
81 storage: Storage<N>,
83 keep_state: bool,
85 storage_mode: StorageMode,
87 ledger: Arc<dyn LedgerService<N>>,
89 workers: Arc<[Worker<N>]>,
91 bft_sender: Arc<OnceCell<BFTSender<N>>>,
93 proposed_batch: Arc<ProposedBatch<N>>,
95 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
97 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
99 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
101 propose_lock: Arc<TMutex<u64>>,
103}
104
105impl<N: Network> Primary<N> {
106 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
108
109 pub fn new(
111 account: Account<N>,
112 storage: Storage<N>,
113 keep_state: bool,
114 storage_mode: StorageMode,
115 ledger: Arc<dyn LedgerService<N>>,
116 ) -> Result<Self> {
117 let sync = Sync::new(storage.clone(), ledger.clone());
119
120 Ok(Self {
122 sync,
123 account,
124 storage,
125 keep_state,
126 storage_mode,
127 ledger,
128 workers: Arc::from(vec![]),
129 bft_sender: Default::default(),
130 proposed_batch: Default::default(),
131 latest_proposed_batch_timestamp: Default::default(),
132 signed_proposals: Default::default(),
133 handles: Default::default(),
134 propose_lock: Default::default(),
135 })
136 }
137
138 async fn load_proposal_cache(&self) -> Result<()> {
140 match ProposalCache::<N>::exists(self.keep_state, &self.storage_mode) {
142 true => {
144 match ProposalCache::<N>::load(self.account.address(), self.keep_state, &self.storage_mode) {
145 Ok(proposal_cache) => {
146 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
148 proposal_cache.into();
149
150 *self.proposed_batch.write() = proposed_batch;
152 *self.signed_proposals.write() = signed_proposals;
154 *self.propose_lock.lock().await = latest_certificate_round;
156
157 for certificate in pending_certificates {
159 let batch_id = certificate.batch_id();
160 if let Err(err) =
164 self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
165 {
166 warn!(
167 "Failed to load stored certificate {} from proposal cache - {err}",
168 fmt_id(batch_id)
169 );
170 }
171 }
172 Ok(())
173 }
174 Err(err) => {
175 bail!("Failed to read the signed proposals from the file system - {err}.");
176 }
177 }
178 }
179 false => Ok(()),
181 }
182 }
183
184 pub async fn run(
186 &mut self,
187 bft_sender: Option<BFTSender<N>>,
188 _primary_sender: PrimarySender<N>,
189 primary_receiver: PrimaryReceiver<N>,
190 ) -> Result<()> {
191 info!("Starting the primary instance of the memory pool...");
192
193 if let Some(bft_sender) = &bft_sender {
195 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
197 }
198
199 let mut workers = Vec::new();
201 for id in 0..MAX_WORKERS {
203 let worker = Worker::new(id, self.storage.clone(), self.ledger.clone(), self.proposed_batch.clone())?;
205
206 workers.push(worker);
208 }
209 self.workers = Arc::from(workers);
211
212 self.sync.initialize(bft_sender).await?;
214 self.load_proposal_cache().await?;
216 self.sync.run().await?;
218 self.start_handlers(primary_receiver);
221
222 Ok(())
223 }
224
225 pub fn current_round(&self) -> u64 {
227 self.storage.current_round()
228 }
229
230 pub fn is_synced(&self) -> bool {
232 self.sync.is_synced()
233 }
234
235 pub const fn account(&self) -> &Account<N> {
237 &self.account
238 }
239
240 pub const fn storage(&self) -> &Storage<N> {
242 &self.storage
243 }
244
245 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
247 &self.ledger
248 }
249
250 pub fn num_workers(&self) -> u8 {
252 u8::try_from(self.workers.len()).expect("Too many workers")
253 }
254
255 pub const fn workers(&self) -> &Arc<[Worker<N>]> {
257 &self.workers
258 }
259
260 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
262 &self.proposed_batch
263 }
264}
265
266impl<N: Network> Primary<N> {
267 pub fn num_unconfirmed_transmissions(&self) -> usize {
269 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
270 }
271
272 pub fn num_unconfirmed_ratifications(&self) -> usize {
274 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
275 }
276
277 pub fn num_unconfirmed_solutions(&self) -> usize {
279 self.workers.iter().map(|worker| worker.num_solutions()).sum()
280 }
281
282 pub fn num_unconfirmed_transactions(&self) -> usize {
284 self.workers.iter().map(|worker| worker.num_transactions()).sum()
285 }
286}
287
288impl<N: Network> Primary<N> {
289 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
291 self.workers.iter().flat_map(|worker| worker.transmission_ids())
292 }
293
294 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
296 self.workers.iter().flat_map(|worker| worker.transmissions())
297 }
298
299 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
301 self.workers.iter().flat_map(|worker| worker.solutions())
302 }
303
304 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
306 self.workers.iter().flat_map(|worker| worker.transactions())
307 }
308}
309
310impl<N: Network> Primary<N> {
311 pub fn clear_worker_solutions(&self) {
313 self.workers.iter().for_each(Worker::clear_solutions);
314 }
315}
316
317impl<N: Network> Primary<N> {
318 pub async fn propose_batch(&self) -> Result<()> {
319 let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
320 let mut all_acc: Vec<Account<N>> = Vec::new();
321
322 for _ in 0u64..4u64 {
323 let private_key = PrivateKey::<N>::new(&mut rng)?;
324 let acc = Account::<N>::try_from(private_key).expect("Failed to initialize account with private key");
325 all_acc.push(acc);
326 }
327
328 let primary_addr = all_acc[0].address();
330 let other_acc: Vec<&Account<N>> = all_acc.iter().filter(|acc| acc.address() != primary_addr).collect();
331
332 let round = self.propose_batch_lite(&other_acc).await?;
333 if round == 0u64 {
334 return Ok(());
335 }
336
337 for vid in 1..all_acc.len() {
339 let primary_acc = &all_acc[vid];
340 let other_acc: Vec<&Account<N>> =
341 all_acc.iter().filter(|acc| acc.address() != primary_acc.address()).collect();
342
343 self.fake_proposal(vid.try_into().unwrap(), primary_acc, &other_acc, round).await?;
344 }
345 Ok(())
346 }
347
348 pub async fn propose_batch_lite(&self, other_acc: &[&Account<N>]) -> Result<u64> {
349 let mut lock_guard = self.propose_lock.lock().await;
351
352 let round = self.current_round();
354 let previous_round = round.saturating_sub(1);
356
357 ensure!(round > 0, "Round 0 cannot have transaction batches");
359
360 if round < *lock_guard {
362 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
363 return Ok(0u64);
364 }
365
366 #[cfg(feature = "metrics")]
367 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
368
369 if let Err(e) = self.check_proposal_timestamp(previous_round, self.account.address(), now()) {
371 debug!("Primary is safely skipping a batch proposal - {}", format!("{e}").dimmed());
372 return Ok(0u64);
373 }
374
375 if self.storage.contains_certificate_in_round_from(round, self.account.address()) {
377 if let Some(bft_sender) = self.bft_sender.get() {
379 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
380 Ok(true) => (), Ok(false) => return Ok(0u64),
384 Err(e) => {
386 warn!("Failed to update the BFT to the next round - {e}");
387 return Err(e);
388 }
389 }
390 }
391 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
392 return Ok(0u64);
393 }
394
395 if round == *lock_guard {
401 warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
402 return Ok(0u64);
403 }
404
405 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
407 {
409 let mut connected_validators: HashSet<Address<N>> = other_acc.iter().map(|acc| acc.address()).collect();
411
412 connected_validators.insert(self.account.address());
414
415 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
417 debug!(
418 "Primary is safely skipping a batch proposal {}",
419 "(please connect to more validators)".dimmed()
420 );
421 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
422 return Ok(0u64);
423 }
424 }
425
426 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
428
429 let mut is_ready = previous_round == 0;
432 if previous_round > 0 {
434 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
436 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
437 };
438 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
440 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
442 is_ready = true;
443 }
444 }
445 if !is_ready {
447 debug!(
448 "Primary is safely skipping a batch proposal {}",
449 format!("(previous round {previous_round} has not reached quorum)").dimmed()
450 );
451 return Ok(0u64);
452 }
453
454 let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
456 let mut transmissions: IndexMap<_, _> = Default::default();
458 for worker in self.workers.iter() {
460 let mut num_transmissions_included_for_worker = 0;
462 'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
464 let num_remaining_transmissions =
466 num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
467 let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
469 if worker_transmissions.peek().is_none() {
471 break 'outer;
472 }
473 'inner: for (id, transmission) in worker_transmissions {
475 if self.ledger.contains_transmission(&id).unwrap_or(true) {
477 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
478 continue 'inner;
479 }
480 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
484 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
485 continue 'inner;
486 }
487 match (id, transmission.clone()) {
489 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
490 match solution.to_checksum::<N>() {
492 Ok(solution_checksum) if solution_checksum == checksum => (),
493 _ => {
494 trace!(
495 "Proposing - Skipping solution '{}' - Checksum mismatch",
496 fmt_id(solution_id)
497 );
498 continue 'inner;
499 }
500 }
501 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
503 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
504 continue 'inner;
505 }
506 }
507 (
508 TransmissionID::Transaction(transaction_id, checksum),
509 Transmission::Transaction(transaction),
510 ) => {
511 match transaction.to_checksum::<N>() {
513 Ok(transaction_checksum) if transaction_checksum == checksum => (),
514 _ => {
515 trace!(
516 "Proposing - Skipping transaction '{}' - Checksum mismatch",
517 fmt_id(transaction_id)
518 );
519 continue 'inner;
520 }
521 }
522 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
524 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
525 continue 'inner;
526 }
527 }
528 (TransmissionID::Ratification, Transmission::Ratification) => continue,
531 _ => continue 'inner,
533 }
534 transmissions.insert(id, transmission);
536 num_transmissions_included_for_worker += 1;
537 }
538 }
539 }
540
541 let current_timestamp = now();
543
544 *lock_guard = round;
545
546 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
548
549 let private_key = *self.account.private_key();
551 let committee_id = committee_lookback.id();
553 let transmission_ids = transmissions.keys().copied().collect();
555 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
557 let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
559 &private_key,
560 round,
561 current_timestamp,
562 committee_id,
563 transmission_ids,
564 previous_certificate_ids,
565 &mut rand::thread_rng()
566 ))
567 .and_then(|batch_header| {
568 Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
569 .map(|proposal| (batch_header, proposal))
570 })
571 .inspect_err(|_| {
572 if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
574 error!("Failed to reinsert transmissions: {e:?}");
575 }
576 })?;
577 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
579
580 info!("Quorum threshold reached - Preparing to certify our batch for round {round}...");
587
588 let batch_id = batch_header.batch_id();
590
591 for acc in other_acc.iter() {
593 let signer_acc = (*acc).clone();
595 let signer = signer_acc.address();
596 let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
597
598 proposal.add_signature(signer, signature, &committee_lookback)?;
600 }
601
602 if let Err(e) = self.store_and_broadcast_certificate_lite(&proposal, &committee_lookback).await {
605 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
607 return Err(e);
608 }
609
610 #[cfg(feature = "metrics")]
611 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
612 Ok(round)
613 }
614
615 pub async fn fake_proposal(
616 &self,
617 vid: u64,
618 primary_acc: &Account<N>,
619 other_acc: &[&Account<N>],
620 round: u64,
621 ) -> Result<()> {
622 let transmissions: IndexMap<_, _> = Default::default();
623 let transmission_ids = transmissions.keys().copied().collect();
624
625 let private_key = *primary_acc.private_key();
626 let current_timestamp = now();
627
628 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
629 let committee_id = committee_lookback.id();
630
631 let previous_round = round.saturating_sub(1);
632 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
633 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
634
635 let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
636 &private_key,
637 round,
638 current_timestamp,
639 committee_id,
640 transmission_ids,
641 previous_certificate_ids,
642 &mut rand::thread_rng()
643 ))
644 .and_then(|batch_header| {
645 Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
646 .map(|proposal| (batch_header, proposal))
647 })?;
648
649 let batch_id = batch_header.batch_id();
651 let mut our_sign: Option<Signature<N>> = None;
652
653 for acc in other_acc.iter() {
655 let signer_acc = (*acc).clone();
657 let signer = signer_acc.address();
658 let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
659
660 if signer == self.account.address() {
661 our_sign = Some(signature);
662 }
663
664 proposal.add_signature(signer, signature, &committee_lookback)?;
666 }
667
668 let our_sign = match our_sign {
670 Some(sign) => sign,
671 None => bail!("Fake Proposal generation failed. Validator 0 signature missing."),
672 };
673
674 let (certificate, transmissions) =
676 tokio::task::block_in_place(|| proposal.to_certificate(&committee_lookback))?;
677
678 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
681
682 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
684 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
685 info!("Stored a batch certificate for validator/round {vid}/{round}");
686
687 match self.signed_proposals.write().0.entry(primary_acc.address()) {
688 std::collections::hash_map::Entry::Occupied(mut entry) => {
689 if entry.get().0 == round {
694 return Ok(());
695 }
696 entry.insert((round, batch_id, our_sign));
698 info!("Inserted signature to signed_proposals {vid}/{round}");
699 }
700 std::collections::hash_map::Entry::Vacant(entry) => {
702 entry.insert((round, batch_id, our_sign));
704 info!("Inserted signature to signed_proposals {vid}/{round}");
705 }
706 };
707
708 if let Some(bft_sender) = self.bft_sender.get() {
709 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
711 warn!("Failed to update the BFT DAG from sync: {e}");
712 return Err(e);
713 };
714 }
715
716 Ok(())
717 }
718}
719
720impl<N: Network> Primary<N> {
721 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
723 let PrimaryReceiver {
724 rx_batch_propose: _,
725 rx_batch_signature: _,
726 rx_batch_certified: _,
727 rx_primary_ping: _,
728 mut rx_unconfirmed_solution,
729 mut rx_unconfirmed_transaction,
730 } = primary_receiver;
731
732 let self_ = self.clone();
734 self.spawn(async move {
735 loop {
736 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
738 if !self_.is_synced() {
740 debug!("Skipping batch proposal {}", "(node is syncing)".dimmed());
741 continue;
742 }
743 if self_.propose_lock.try_lock().is_err() {
746 trace!("Skipping batch proposal {}", "(node is already proposing)".dimmed());
747 continue;
748 };
749 if let Err(e) = self_.propose_batch().await {
753 warn!("Cannot propose a batch - {e}");
754 }
755 }
756 });
757
758 let self_ = self.clone();
762 self.spawn(async move {
763 loop {
764 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
766 if !self_.is_synced() {
768 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
769 continue;
770 }
771 let next_round = self_.current_round().saturating_add(1);
773 let is_quorum_threshold_reached = {
775 let authors = self_.storage.get_certificate_authors_for_round(next_round);
777 if authors.is_empty() {
779 continue;
780 }
781 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
782 warn!("Failed to retrieve the committee lookback for round {next_round}");
783 continue;
784 };
785 committee_lookback.is_quorum_threshold_reached(&authors)
786 };
787 if is_quorum_threshold_reached {
789 debug!("Quorum threshold reached for round {}", next_round);
790 if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
791 warn!("Failed to increment to the next round - {e}");
792 }
793 }
794 }
795 });
796
797 let self_ = self.clone();
799 self.spawn(async move {
800 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
801 let Ok(checksum) = solution.to_checksum::<N>() else {
803 error!("Failed to compute the checksum for the unconfirmed solution");
804 continue;
805 };
806 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
808 error!("Unable to determine the worker ID for the unconfirmed solution");
809 continue;
810 };
811 let self_ = self_.clone();
812 tokio::spawn(async move {
813 let worker = &self_.workers[worker_id as usize];
815 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
817 callback.send(result).ok();
819 });
820 }
821 });
822
823 let self_ = self.clone();
825 self.spawn(async move {
826 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
827 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
828 let Ok(checksum) = transaction.to_checksum::<N>() else {
830 error!("Failed to compute the checksum for the unconfirmed transaction");
831 continue;
832 };
833 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
835 error!("Unable to determine the worker ID for the unconfirmed transaction");
836 continue;
837 };
838 let self_ = self_.clone();
839 tokio::spawn(async move {
840 let worker = &self_.workers[worker_id as usize];
842 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
844 callback.send(result).ok();
846 });
847 }
848 });
849 }
850
851 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
853 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
855 let mut fast_forward_round = self.current_round();
856 while fast_forward_round < next_round.saturating_sub(1) {
858 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
860 *self.proposed_batch.write() = None;
862 }
863 }
864
865 let current_round = self.current_round();
867 if current_round < next_round {
869 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
871 match bft_sender.send_primary_round_to_bft(current_round).await {
872 Ok(is_ready) => is_ready,
873 Err(e) => {
874 warn!("Failed to update the BFT to the next round - {e}");
875 return Err(e);
876 }
877 }
878 }
879 else {
881 self.storage.increment_to_next_round(current_round)?;
883 true
885 };
886
887 match is_ready {
889 true => debug!("Primary is ready to propose the next round"),
890 false => debug!("Primary is not ready to propose the next round"),
891 }
892
893 if is_ready {
895 self.propose_batch().await?;
896 }
897 }
898 Ok(())
899 }
900
901 async fn try_increment_to_the_next_round_lite(&self, next_round: u64) -> Result<()> {
903 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
905 let mut fast_forward_round = self.current_round();
906 while fast_forward_round < next_round.saturating_sub(1) {
908 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
910 *self.proposed_batch.write() = None;
912 }
913 }
914
915 let current_round = self.current_round();
917 if current_round < next_round {
919 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
921 match bft_sender.send_primary_round_to_bft(current_round).await {
922 Ok(is_ready) => is_ready,
923 Err(e) => {
924 warn!("Failed to update the BFT to the next round - {e}");
925 return Err(e);
926 }
927 }
928 }
929 else {
931 self.storage.increment_to_next_round(current_round)?;
933 true
935 };
936
937 match is_ready {
939 true => debug!("Primary is ready to propose the next round"),
940 false => debug!("Primary is not ready to propose the next round"),
941 }
942
943 }
948 Ok(())
949 }
950
951 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
954 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
956 Some(certificate) => certificate.timestamp(),
958 None => *self.latest_proposed_batch_timestamp.read(),
959 };
960
961 let elapsed = timestamp
963 .checked_sub(previous_timestamp)
964 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
965 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
967 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
968 false => Ok(()),
969 }
970 }
971
972 async fn store_and_broadcast_certificate_lite(
974 &self,
975 proposal: &Proposal<N>,
976 committee: &Committee<N>,
977 ) -> Result<()> {
978 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
980 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
983 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
985 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
986 debug!("Stored a batch certificate for round {}", certificate.round());
987 if let Some(bft_sender) = self.bft_sender.get() {
989 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
991 warn!("Failed to update the BFT DAG from primary - {e}");
992 return Err(e);
993 };
994 }
995 let num_transmissions = certificate.transmission_ids().len();
997 let round = certificate.round();
998 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
999 self.try_increment_to_the_next_round_lite(round + 1).await
1001 }
1002
1003 fn reinsert_transmissions_into_workers(
1006 &self,
1007 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1008 ) -> Result<()> {
1009 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1011 worker.reinsert(transmission_id, transmission);
1012 })
1013 }
1014
1015 #[async_recursion::async_recursion]
1025 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1026 &self,
1027 peer_ip: SocketAddr,
1028 certificate: BatchCertificate<N>,
1029 ) -> Result<()> {
1030 let batch_header = certificate.batch_header();
1032 let batch_round = batch_header.round();
1034
1035 if batch_round <= self.storage.gc_round() {
1037 return Ok(());
1038 }
1039 if self.storage.contains_certificate(certificate.id()) {
1041 return Ok(());
1042 }
1043
1044 if !IS_SYNCING && !self.is_synced() {
1046 bail!(
1047 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1048 fmt_id(certificate.id())
1049 );
1050 }
1051
1052 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1054
1055 if !self.storage.contains_certificate(certificate.id()) {
1057 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1059 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1060 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1061 if let Some(bft_sender) = self.bft_sender.get() {
1063 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1065 warn!("Failed to update the BFT DAG from sync: {e}");
1066 return Err(e);
1067 };
1068 }
1069 }
1070 Ok(())
1071 }
1072
1073 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1075 &self,
1076 peer_ip: SocketAddr,
1077 batch_header: &BatchHeader<N>,
1078 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1079 let batch_round = batch_header.round();
1081
1082 if batch_round <= self.storage.gc_round() {
1084 bail!("Round {batch_round} is too far in the past")
1085 }
1086
1087 if !IS_SYNCING && !self.is_synced() {
1089 bail!(
1090 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1091 fmt_id(batch_header.batch_id())
1092 );
1093 }
1094
1095 let is_quorum_threshold_reached = {
1097 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1098 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1099 committee_lookback.is_quorum_threshold_reached(&authors)
1100 };
1101
1102 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1107 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1109 if is_behind_schedule || is_peer_far_in_future {
1111 self.try_increment_to_the_next_round(batch_round).await?;
1113 }
1114
1115 let missing_transmissions = self.fetch_missing_transmissions(batch_header).await.map_err(|e| {
1117 anyhow!("Failed to fetch missing transmissions for round {batch_round} from '{peer_ip}' - {e}")
1118 })?;
1119
1120 Ok(missing_transmissions)
1121 }
1122
1123 async fn fetch_missing_transmissions(
1126 &self,
1127 batch_header: &BatchHeader<N>,
1128 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1129 if batch_header.round() <= self.storage.gc_round() {
1131 return Ok(Default::default());
1132 }
1133
1134 if self.storage.contains_batch(batch_header.batch_id()) {
1136 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1137 return Ok(Default::default());
1138 }
1139
1140 let workers = self.workers.clone();
1142
1143 let mut fetch_transmissions = FuturesUnordered::new();
1145
1146 let num_workers = self.num_workers();
1148 for transmission_id in batch_header.transmission_ids() {
1150 if !self.storage.contains_transmission(*transmission_id) {
1152 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1154 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1155 };
1156 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1158 fetch_transmissions.push(worker.get_or_fetch_transmission(*transmission_id));
1160 }
1161 }
1162
1163 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1165 while let Some(result) = fetch_transmissions.next().await {
1167 let (transmission_id, transmission) = result?;
1169 transmissions.insert(transmission_id, transmission);
1171 }
1172 Ok(transmissions)
1174 }
1175}
1176
1177impl<N: Network> Primary<N> {
1178 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1180 self.handles.lock().push(tokio::spawn(future));
1181 }
1182
1183 pub async fn shut_down(&self) {
1185 info!("Shutting down the primary...");
1186 self.handles.lock().iter().for_each(|handle| handle.abort());
1188
1189 if self.keep_state {
1191 let proposal_cache = {
1192 let proposal = self.proposed_batch.write().take();
1193 let signed_proposals = self.signed_proposals.read().clone();
1194 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1195 let pending_certificates = self.storage.get_pending_certificates();
1196 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1197 };
1198
1199 if let Err(err) = proposal_cache.store(self.keep_state, &self.storage_mode) {
1200 error!("Failed to store the current proposal cache: {err}");
1201 }
1202 }
1203 }
1204}
1205
1206#[cfg(test)]
1207mod tests {
1208 use super::*;
1209 use amareleo_node_bft_ledger_service::MockLedgerService;
1210 use amareleo_node_bft_storage_service::BFTMemoryService;
1211 use snarkvm::{
1212 console::types::Field,
1213 ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1214 prelude::{Address, Signature},
1215 };
1216
1217 use bytes::Bytes;
1218 use indexmap::IndexSet;
1219 use rand::RngCore;
1220
1221 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1222
1223 async fn primary_without_handlers(
1225 rng: &mut TestRng,
1226 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1227 let (accounts, committee) = {
1229 const COMMITTEE_SIZE: usize = 4;
1230 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1231 let mut members = IndexMap::new();
1232
1233 for i in 0..COMMITTEE_SIZE {
1234 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1235 let account = Account::new(rng).unwrap();
1236 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1237 accounts.push((socket_addr, account));
1238 }
1239
1240 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1241 };
1242
1243 let account = accounts.first().unwrap().1.clone();
1244 let ledger = Arc::new(MockLedgerService::new(committee));
1245 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1246
1247 let mut primary = Primary::new(account, storage, false, StorageMode::Development(0), ledger).unwrap();
1249
1250 primary.workers = Arc::from([Worker::new(
1252 0, primary.storage.clone(),
1254 primary.ledger.clone(),
1255 primary.proposed_batch.clone(),
1256 )
1257 .unwrap()]);
1258
1259 (primary, accounts)
1260 }
1261
1262 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1264 let solution_id = rng.gen::<u64>().into();
1266 let size = rng.gen_range(1024..10 * 1024);
1268 let mut vec = vec![0u8; size];
1270 rng.fill_bytes(&mut vec);
1271 let solution = Data::Buffer(Bytes::from(vec));
1272 (solution_id, solution)
1274 }
1275
1276 fn sample_unconfirmed_transaction(
1278 rng: &mut TestRng,
1279 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1280 let id = Field::<CurrentNetwork>::rand(rng).into();
1282 let size = rng.gen_range(1024..10 * 1024);
1284 let mut vec = vec![0u8; size];
1286 rng.fill_bytes(&mut vec);
1287 let transaction = Data::Buffer(Bytes::from(vec));
1288 (id, transaction)
1290 }
1291
1292 fn create_test_proposal(
1294 author: &Account<CurrentNetwork>,
1295 committee: Committee<CurrentNetwork>,
1296 round: u64,
1297 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1298 timestamp: i64,
1299 rng: &mut TestRng,
1300 ) -> Proposal<CurrentNetwork> {
1301 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1302 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1303 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1304 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1305
1306 let solution_transmission_id = (solution_id, solution_checksum).into();
1307 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1308
1309 let private_key = author.private_key();
1311 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1313 let transmissions = [
1314 (solution_transmission_id, Transmission::Solution(solution)),
1315 (transaction_transmission_id, Transmission::Transaction(transaction)),
1316 ]
1317 .into();
1318 let batch_header = BatchHeader::new(
1320 private_key,
1321 round,
1322 timestamp,
1323 committee.id(),
1324 transmission_ids,
1325 previous_certificate_ids,
1326 rng,
1327 )
1328 .unwrap();
1329 Proposal::new(committee, batch_header, transmissions).unwrap()
1331 }
1332
1333 fn peer_signatures_for_batch(
1335 primary_address: Address<CurrentNetwork>,
1336 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1337 batch_id: Field<CurrentNetwork>,
1338 rng: &mut TestRng,
1339 ) -> IndexSet<Signature<CurrentNetwork>> {
1340 let mut signatures = IndexSet::new();
1341 for (_, account) in accounts {
1342 if account.address() == primary_address {
1343 continue;
1344 }
1345 let signature = account.sign(&[batch_id], rng).unwrap();
1346 signatures.insert(signature);
1347 }
1348 signatures
1349 }
1350
1351 fn create_batch_certificate(
1353 primary_address: Address<CurrentNetwork>,
1354 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1355 round: u64,
1356 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1357 rng: &mut TestRng,
1358 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1359 let timestamp = now();
1360
1361 let author =
1362 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1363 let private_key = author.private_key();
1364
1365 let committee_id = Field::rand(rng);
1366 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1367 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1368 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1369 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1370
1371 let solution_transmission_id = (solution_id, solution_checksum).into();
1372 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1373
1374 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1375 let transmissions = [
1376 (solution_transmission_id, Transmission::Solution(solution)),
1377 (transaction_transmission_id, Transmission::Transaction(transaction)),
1378 ]
1379 .into();
1380
1381 let batch_header = BatchHeader::new(
1382 private_key,
1383 round,
1384 timestamp,
1385 committee_id,
1386 transmission_ids,
1387 previous_certificate_ids,
1388 rng,
1389 )
1390 .unwrap();
1391 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1392 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1393 (certificate, transmissions)
1394 }
1395
1396 fn store_certificate_chain(
1398 primary: &Primary<CurrentNetwork>,
1399 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1400 round: u64,
1401 rng: &mut TestRng,
1402 ) -> IndexSet<Field<CurrentNetwork>> {
1403 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1404 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1405 for cur_round in 1..round {
1406 for (_, account) in accounts.iter() {
1407 let (certificate, transmissions) = create_batch_certificate(
1408 account.address(),
1409 accounts,
1410 cur_round,
1411 previous_certificates.clone(),
1412 rng,
1413 );
1414 next_certificates.insert(certificate.id());
1415 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1416 }
1417
1418 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
1419 previous_certificates = next_certificates;
1420 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1421 }
1422
1423 previous_certificates
1424 }
1425
1426 #[tokio::test]
1427 async fn test_propose_batch() {
1428 let mut rng = TestRng::default();
1429 let (primary, _) = primary_without_handlers(&mut rng).await;
1430
1431 assert!(primary.proposed_batch.read().is_none());
1433
1434 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1436 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1437
1438 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1440 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1441
1442 assert!(primary.propose_batch().await.is_ok());
1444
1445 }
1448
1449 #[tokio::test]
1450 async fn test_propose_batch_with_no_transmissions() {
1451 let mut rng = TestRng::default();
1452 let (primary, _) = primary_without_handlers(&mut rng).await;
1453
1454 assert!(primary.proposed_batch.read().is_none());
1456
1457 assert!(primary.propose_batch().await.is_ok());
1459
1460 }
1463
1464 #[tokio::test]
1465 async fn test_propose_batch_in_round() {
1466 let round = 3;
1467 let mut rng = TestRng::default();
1468 let (primary, accounts) = primary_without_handlers(&mut rng).await;
1469
1470 store_certificate_chain(&primary, &accounts, round, &mut rng);
1472
1473 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
1475
1476 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1478 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1479
1480 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1482 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1483
1484 assert!(primary.propose_batch().await.is_ok());
1486
1487 }
1490
1491 #[tokio::test]
1492 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
1493 let round = 3;
1494 let mut rng = TestRng::default();
1495 let (primary, _) = primary_without_handlers(&mut rng).await;
1496
1497 assert!(primary.proposed_batch.read().is_none());
1499
1500 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1502 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1503
1504 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1506 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1507
1508 let old_proposal_lock_round = *primary.propose_lock.lock().await;
1510 *primary.propose_lock.lock().await = round + 1;
1511
1512 assert!(primary.propose_batch().await.is_ok());
1514 assert!(primary.proposed_batch.read().is_none());
1515
1516 *primary.propose_lock.lock().await = old_proposal_lock_round;
1518
1519 assert!(primary.propose_batch().await.is_ok());
1521
1522 }
1525
1526 #[tokio::test]
1527 async fn test_propose_batch_with_storage_round_behind_proposal() {
1528 let round = 5;
1529 let mut rng = TestRng::default();
1530 let (primary, accounts) = primary_without_handlers(&mut rng).await;
1531
1532 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
1534
1535 let timestamp = now();
1537 let proposal = create_test_proposal(
1538 &primary.account,
1539 primary.ledger.current_committee().unwrap(),
1540 round + 1,
1541 previous_certificates,
1542 timestamp,
1543 &mut rng,
1544 );
1545
1546 *primary.proposed_batch.write() = Some(proposal);
1548
1549 assert!(primary.propose_batch().await.is_ok());
1551 assert!(primary.proposed_batch.read().is_some());
1552 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
1553 }
1554}