1use crate::{
17 DEVELOPMENT_MODE_RNG_SEED,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 Sync,
22 Worker,
23 helpers::{
24 BFTSender,
25 PrimaryReceiver,
26 PrimarySender,
27 Proposal,
28 ProposalCache,
29 SignedProposals,
30 Storage,
31 assign_to_worker,
32 assign_to_workers,
33 fmt_id,
34 now,
35 },
36 spawn_blocking,
37};
38use amareleo_chain_account::Account;
39use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
40use amareleo_node_bft_ledger_service::LedgerService;
41use amareleo_node_sync::DUMMY_SELF_IP;
42use snarkvm::{
43 console::{prelude::*, types::Address},
44 ledger::{
45 block::Transaction,
46 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
47 puzzle::{Solution, SolutionID},
48 },
49 prelude::{Signature, committee::Committee},
50};
51
52use aleo_std::StorageMode;
53use colored::Colorize;
54use futures::stream::{FuturesUnordered, StreamExt};
55use indexmap::IndexMap;
56use parking_lot::{Mutex, RwLock};
57
58use rand::SeedableRng;
59use rand_chacha::ChaChaRng;
60use snarkvm::console::account::PrivateKey;
61
62use std::{
63 collections::{HashMap, HashSet},
64 future::Future,
65 net::SocketAddr,
66 sync::{
67 Arc,
68 atomic::{AtomicBool, Ordering},
69 },
70 time::Duration,
71};
72use tokio::{
73 sync::{Mutex as TMutex, OnceCell},
74 task::JoinHandle,
75};
76use tracing::subscriber::DefaultGuard;
77
78pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
80
81#[derive(Clone)]
82pub struct Primary<N: Network> {
83 sync: Sync<N>,
85 account: Account<N>,
87 storage: Storage<N>,
89 keep_state: bool,
91 save_pending: Arc<AtomicBool>,
93 storage_mode: StorageMode,
95 ledger: Arc<dyn LedgerService<N>>,
97 workers: Arc<[Worker<N>]>,
99 bft_sender: Arc<OnceCell<BFTSender<N>>>,
101 proposed_batch: Arc<ProposedBatch<N>>,
103 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
105 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
107 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
109 tracing: Option<TracingHandler>,
111 propose_lock: Arc<TMutex<u64>>,
113}
114
115impl<N: Network> TracingHandlerGuard for Primary<N> {
116 fn get_tracing_guard(&self) -> Option<DefaultGuard> {
118 self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
119 }
120}
121
122impl<N: Network> Primary<N> {
123 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
125
126 pub fn new(
128 account: Account<N>,
129 storage: Storage<N>,
130 keep_state: bool,
131 storage_mode: StorageMode,
132 ledger: Arc<dyn LedgerService<N>>,
133 tracing: Option<TracingHandler>,
134 ) -> Result<Self> {
135 let sync = Sync::new(storage.clone(), ledger.clone());
137
138 Ok(Self {
140 sync,
141 account,
142 storage,
143 keep_state,
144 save_pending: Arc::new(AtomicBool::new(false)),
145 storage_mode,
146 ledger,
147 workers: Arc::from(vec![]),
148 bft_sender: Default::default(),
149 proposed_batch: Default::default(),
150 latest_proposed_batch_timestamp: Default::default(),
151 signed_proposals: Default::default(),
152 handles: Default::default(),
153 tracing,
154 propose_lock: Default::default(),
155 })
156 }
157
158 async fn load_proposal_cache(&self) -> Result<()> {
160 match ProposalCache::<N>::exists(&self.storage_mode) {
162 true => {
164 match ProposalCache::<N>::load(self.account.address(), &self.storage_mode, self) {
165 Ok(proposal_cache) => {
166 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
168 proposal_cache.into();
169
170 *self.proposed_batch.write() = proposed_batch;
172 *self.signed_proposals.write() = signed_proposals;
174 *self.propose_lock.lock().await = latest_certificate_round;
176
177 for certificate in pending_certificates {
179 let batch_id = certificate.batch_id();
180 if let Err(err) =
184 self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
185 {
186 guard_warn!(
187 self,
188 "Failed to load stored certificate {} from proposal cache - {err}",
189 fmt_id(batch_id)
190 );
191 }
192 }
193 Ok(())
194 }
195 Err(err) => {
196 bail!("Failed to read the signed proposals from the file system - {err}.");
197 }
198 }
199 }
200 false => Ok(()),
202 }
203 }
204
205 pub async fn run(
207 &mut self,
208 bft_sender: Option<BFTSender<N>>,
209 _primary_sender: PrimarySender<N>,
210 primary_receiver: PrimaryReceiver<N>,
211 ) -> Result<()> {
212 guard_info!(self, "Starting the primary instance of the memory pool...");
213
214 if let Some(bft_sender) = &bft_sender {
216 if self.bft_sender.set(bft_sender.clone()).is_err() {
218 guard_error!(self, "Unexpected: BFT sender already set");
219 bail!("Unexpected: BFT sender already set");
220 }
221 }
222
223 let mut workers = Vec::new();
225 for id in 0..MAX_WORKERS {
227 let worker = Worker::new(
229 id,
230 self.storage.clone(),
231 self.ledger.clone(),
232 self.proposed_batch.clone(),
233 self.tracing.clone(),
234 )?;
235
236 workers.push(worker);
238 }
239 self.workers = Arc::from(workers);
241
242 self.sync.initialize(bft_sender).await?;
244 self.load_proposal_cache().await?;
246 self.sync.run().await?;
248
249 self.start_handlers(primary_receiver);
252
253 if self.keep_state {
255 self.save_pending.store(true, Ordering::Relaxed);
256 }
257
258 Ok(())
259 }
260
261 pub fn current_round(&self) -> u64 {
263 self.storage.current_round()
264 }
265
266 pub fn is_synced(&self) -> bool {
268 self.sync.is_synced()
269 }
270
271 pub const fn account(&self) -> &Account<N> {
273 &self.account
274 }
275
276 pub const fn storage(&self) -> &Storage<N> {
278 &self.storage
279 }
280
281 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
283 &self.ledger
284 }
285
286 pub fn num_workers(&self) -> u8 {
288 u8::try_from(self.workers.len()).expect("Too many workers")
289 }
290}
291
292impl<N: Network> Primary<N> {
293 pub fn num_unconfirmed_transmissions(&self) -> usize {
295 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
296 }
297
298 pub fn num_unconfirmed_ratifications(&self) -> usize {
300 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
301 }
302
303 pub fn num_unconfirmed_solutions(&self) -> usize {
305 self.workers.iter().map(|worker| worker.num_solutions()).sum()
306 }
307
308 pub fn num_unconfirmed_transactions(&self) -> usize {
310 self.workers.iter().map(|worker| worker.num_transactions()).sum()
311 }
312}
313
314impl<N: Network> Primary<N> {
315 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
317 self.workers.iter().flat_map(|worker| worker.transmission_ids())
318 }
319
320 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
322 self.workers.iter().flat_map(|worker| worker.transmissions())
323 }
324
325 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
327 self.workers.iter().flat_map(|worker| worker.solutions())
328 }
329
330 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
332 self.workers.iter().flat_map(|worker| worker.transactions())
333 }
334}
335
336impl<N: Network> Primary<N> {
337 pub fn clear_worker_solutions(&self) {
339 self.workers.iter().for_each(Worker::clear_solutions);
340 }
341}
342
343impl<N: Network> Primary<N> {
344 pub async fn propose_batch(&self) -> Result<()> {
345 let mut rng = ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
346 let mut all_acc: Vec<Account<N>> = Vec::new();
347 for _ in 0u64..4u64 {
348 let private_key = PrivateKey::<N>::new(&mut rng)?;
349 let acc = Account::<N>::try_from(private_key)?;
350 all_acc.push(acc);
351 }
352
353 let other_acc: Vec<&Account<N>> = all_acc.iter().skip(1).collect();
355 let round = self.propose_batch_lite(&other_acc).await?;
356 if round == 0u64 {
357 return Ok(());
358 }
359
360 for vid in 1u64..4u64 {
362 let primary_acc = &all_acc[vid as usize];
363 let other_acc: Vec<&Account<N>> =
364 all_acc.iter().filter(|acc| acc.address() != primary_acc.address()).collect();
365
366 self.fake_proposal(vid, primary_acc, &other_acc, round).await?;
367 }
368 Ok(())
369 }
370
371 pub async fn propose_batch_lite(&self, other_acc: &[&Account<N>]) -> Result<u64> {
372 let mut lock_guard = self.propose_lock.lock().await;
374
375 let round = self.current_round();
377 let previous_round = round.saturating_sub(1);
379
380 ensure!(round > 0, "Round 0 cannot have transaction batches");
382
383 if round < *lock_guard {
385 guard_warn!(
386 self,
387 "Cannot propose a batch for round {round} - the latest proposal cache round is {}",
388 *lock_guard
389 );
390 return Ok(0u64);
391 }
392
393 #[cfg(feature = "metrics")]
394 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
395
396 if let Err(e) = self.check_proposal_timestamp(previous_round, self.account.address(), now()) {
398 guard_debug!(
399 self,
400 "Primary is safely skipping a batch proposal for round {round} - {}",
401 format!("{e}").dimmed()
402 );
403 return Ok(0u64);
404 }
405
406 if self.storage.contains_certificate_in_round_from(round, self.account.address()) {
408 if let Some(bft_sender) = self.bft_sender.get() {
410 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
411 Ok(true) => (), Ok(false) => return Ok(0u64),
415 Err(e) => {
417 guard_warn!(self, "Failed to update the BFT to the next round - {e}");
418 return Err(e);
419 }
420 }
421 }
422 guard_debug!(
423 self,
424 "Primary is safely skipping {}",
425 format!("(round {round} was already certified)").dimmed()
426 );
427 return Ok(0u64);
428 }
429
430 if round == *lock_guard {
436 guard_warn!(self, "Primary is safely skipping a batch proposal - round {round} already proposed");
437 return Ok(0u64);
438 }
439
440 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
442 {
444 let mut connected_validators: HashSet<Address<N>> = other_acc.iter().map(|acc| acc.address()).collect();
446
447 connected_validators.insert(self.account.address());
449
450 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
452 guard_debug!(
453 self,
454 "Primary is safely skipping a batch proposal for round {round} {}",
455 "(please connect to more validators)".dimmed()
456 );
457 guard_trace!(self, "Primary is connected to {} validators", connected_validators.len() - 1);
458 return Ok(0u64);
459 }
460 }
461
462 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
464
465 let mut is_ready = previous_round == 0;
468 if previous_round > 0 {
470 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
472 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
473 };
474 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
476 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
478 is_ready = true;
479 }
480 }
481 if !is_ready {
483 guard_debug!(
484 self,
485 "Primary is safely skipping a batch proposal for round {round} {}",
486 format!("(previous round {previous_round} has not reached quorum)").dimmed()
487 );
488 return Ok(0u64);
489 }
490
491 let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
493 let mut transmissions: IndexMap<_, _> = Default::default();
495 for worker in self.workers.iter() {
497 let mut num_transmissions_included_for_worker = 0;
499 'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
501 let num_remaining_transmissions =
503 num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
504 let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
506 if worker_transmissions.peek().is_none() {
508 break 'outer;
509 }
510 'inner: for (id, transmission) in worker_transmissions {
512 if self.ledger.contains_transmission(&id).unwrap_or(true) {
514 guard_trace!(self, "Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
515 continue 'inner;
516 }
517 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
521 guard_trace!(self, "Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
522 continue 'inner;
523 }
524 match (id, transmission.clone()) {
526 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
527 match solution.to_checksum::<N>() {
529 Ok(solution_checksum) if solution_checksum == checksum => (),
530 _ => {
531 guard_trace!(
532 self,
533 "Proposing - Skipping solution '{}' - Checksum mismatch",
534 fmt_id(solution_id)
535 );
536 continue 'inner;
537 }
538 }
539 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
541 guard_trace!(self, "Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
542 continue 'inner;
543 }
544 }
545 (
546 TransmissionID::Transaction(transaction_id, checksum),
547 Transmission::Transaction(transaction),
548 ) => {
549 match transaction.to_checksum::<N>() {
551 Ok(transaction_checksum) if transaction_checksum == checksum => (),
552 _ => {
553 guard_trace!(
554 self,
555 "Proposing - Skipping transaction '{}' - Checksum mismatch",
556 fmt_id(transaction_id)
557 );
558 continue 'inner;
559 }
560 }
561 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
563 guard_trace!(
564 self,
565 "Proposing - Skipping transaction '{}' - {e}",
566 fmt_id(transaction_id)
567 );
568 continue 'inner;
569 }
570 }
571 (TransmissionID::Ratification, Transmission::Ratification) => continue,
574 _ => continue 'inner,
576 }
577 transmissions.insert(id, transmission);
579 num_transmissions_included_for_worker += 1;
580 }
581 }
582 }
583
584 let current_timestamp = now();
586
587 *lock_guard = round;
588
589 guard_info!(self, "Proposing a batch with {} transmissions for round {round}...", transmissions.len());
591
592 let private_key = *self.account.private_key();
594 let committee_id = committee_lookback.id();
596 let transmission_ids = transmissions.keys().copied().collect();
598 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
600 let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
602 &private_key,
603 round,
604 current_timestamp,
605 committee_id,
606 transmission_ids,
607 previous_certificate_ids,
608 &mut rand::thread_rng()
609 ))
610 .and_then(|batch_header| {
611 Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
612 .map(|proposal| (batch_header, proposal))
613 })
614 .inspect_err(|_| {
615 if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
617 guard_error!(self, "Failed to reinsert transmissions: {e:?}");
618 }
619 })?;
620 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
622
623 guard_info!(self, "Quorum threshold reached - Preparing to certify our batch for round {round}...");
630
631 let batch_id = batch_header.batch_id();
633
634 for acc in other_acc.iter() {
636 let signer_acc = (*acc).clone();
638 let signer = signer_acc.address();
639 let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
640
641 proposal.add_signature(signer, signature, &committee_lookback)?;
643 }
644
645 if let Err(e) = self.store_and_broadcast_certificate_lite(&proposal, &committee_lookback).await {
648 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
650 return Err(e);
651 }
652
653 #[cfg(feature = "metrics")]
654 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
655 Ok(round)
656 }
657
658 pub async fn fake_proposal(
659 &self,
660 vid: u64,
661 primary_acc: &Account<N>,
662 other_acc: &[&Account<N>],
663 round: u64,
664 ) -> Result<()> {
665 let transmissions: IndexMap<_, _> = Default::default();
666 let transmission_ids = transmissions.keys().copied().collect();
667
668 let private_key = *primary_acc.private_key();
669 let current_timestamp = now();
670
671 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
672 let committee_id = committee_lookback.id();
673
674 let previous_round = round.saturating_sub(1);
675 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
676 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
677
678 let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
679 &private_key,
680 round,
681 current_timestamp,
682 committee_id,
683 transmission_ids,
684 previous_certificate_ids,
685 &mut rand::thread_rng()
686 ))
687 .and_then(|batch_header| {
688 Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
689 .map(|proposal| (batch_header, proposal))
690 })?;
691
692 let batch_id = batch_header.batch_id();
694 let mut our_sign: Option<Signature<N>> = None;
695
696 for acc in other_acc.iter() {
698 let signer_acc = (*acc).clone();
700 let signer = signer_acc.address();
701 let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
702
703 if signer == self.account.address() {
704 our_sign = Some(signature);
705 }
706
707 proposal.add_signature(signer, signature, &committee_lookback)?;
709 }
710
711 let our_sign = match our_sign {
713 Some(sign) => sign,
714 None => bail!("Fake Proposal generation failed. Validator 0 signature missing."),
715 };
716
717 let (certificate, transmissions) =
719 tokio::task::block_in_place(|| proposal.to_certificate(&committee_lookback))?;
720
721 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
724
725 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
727 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
728 guard_info!(self, "Stored a batch certificate for validator/round {vid}/{round}");
729
730 match self.signed_proposals.write().0.entry(primary_acc.address()) {
731 std::collections::hash_map::Entry::Occupied(mut entry) => {
732 if entry.get().0 == round {
737 return Ok(());
738 }
739 entry.insert((round, batch_id, our_sign));
741 guard_info!(self, "Inserted signature to signed_proposals {vid}/{round}");
742 }
743 std::collections::hash_map::Entry::Vacant(entry) => {
745 entry.insert((round, batch_id, our_sign));
747 guard_info!(self, "Inserted signature to signed_proposals {vid}/{round}");
748 }
749 };
750
751 if let Some(bft_sender) = self.bft_sender.get() {
752 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
754 guard_warn!(self, "Failed to update the BFT DAG from sync: {e}");
755 return Err(e);
756 };
757 }
758
759 Ok(())
760 }
761}
762
763impl<N: Network> Primary<N> {
764 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
766 let PrimaryReceiver { mut rx_unconfirmed_solution, mut rx_unconfirmed_transaction } = primary_receiver;
767
768 let self_ = self.clone();
770 self.spawn(async move {
771 loop {
772 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
774 let current_round = self_.current_round();
775 if !self_.is_synced() {
777 guard_debug!(
778 self_,
779 "Skipping batch proposal for round {current_round} {}",
780 "(node is syncing)".dimmed()
781 );
782 continue;
783 }
784 if self_.propose_lock.try_lock().is_err() {
787 guard_trace!(
788 self_,
789 "Skipping batch proposal for round {current_round} {}",
790 "(node is already proposing)".dimmed()
791 );
792 continue;
793 };
794 if let Err(e) = self_.propose_batch().await {
798 guard_warn!(self_, "Cannot propose a batch - {e}");
799 }
800 }
801 });
802
803 let self_ = self.clone();
807 self.spawn(async move {
808 loop {
809 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
811 if !self_.is_synced() {
813 guard_trace!(self_, "Skipping round increment {}", "(node is syncing)".dimmed());
814 continue;
815 }
816 let next_round = self_.current_round().saturating_add(1);
818 let is_quorum_threshold_reached = {
820 let authors = self_.storage.get_certificate_authors_for_round(next_round);
822 if authors.is_empty() {
824 continue;
825 }
826 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
827 guard_warn!(self_, "Failed to retrieve the committee lookback for round {next_round}");
828 continue;
829 };
830 committee_lookback.is_quorum_threshold_reached(&authors)
831 };
832 if is_quorum_threshold_reached {
834 guard_debug!(self_, "Quorum threshold reached for round {}", next_round);
835 if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
836 guard_warn!(self_, "Failed to increment to the next round - {e}");
837 }
838 }
839 }
840 });
841
842 let self_ = self.clone();
844 self.spawn(async move {
845 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
846 let Ok(checksum) = solution.to_checksum::<N>() else {
848 guard_error!(self_, "Failed to compute the checksum for the unconfirmed solution");
849 continue;
850 };
851 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
853 guard_error!(self_, "Unable to determine the worker ID for the unconfirmed solution");
854 continue;
855 };
856
857 let worker = &self_.workers[worker_id as usize];
859 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
861 callback.send(result).ok();
863 }
864 });
865
866 let self_ = self.clone();
868 self.spawn(async move {
869 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
870 guard_trace!(self_, "Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
871 let Ok(checksum) = transaction.to_checksum::<N>() else {
873 guard_error!(self_, "Failed to compute the checksum for the unconfirmed transaction");
874 continue;
875 };
876 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
878 guard_error!(self_, "Unable to determine the worker ID for the unconfirmed transaction");
879 continue;
880 };
881
882 let worker = &self_.workers[worker_id as usize];
884 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
886 callback.send(result).ok();
888 }
889 });
890 }
891
892 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
894 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
896 let mut fast_forward_round = self.current_round();
897 while fast_forward_round < next_round.saturating_sub(1) {
899 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
901 *self.proposed_batch.write() = None;
903 }
904 }
905
906 let current_round = self.current_round();
908 if current_round < next_round {
910 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
912 match bft_sender.send_primary_round_to_bft(current_round).await {
913 Ok(is_ready) => is_ready,
914 Err(e) => {
915 guard_warn!(self, "Failed to update the BFT to the next round - {e}");
916 return Err(e);
917 }
918 }
919 }
920 else {
922 self.storage.increment_to_next_round(current_round)?;
924 true
926 };
927
928 match is_ready {
930 true => guard_debug!(self, "Primary is ready to propose the next round"),
931 false => guard_debug!(self, "Primary is not ready to propose the next round"),
932 }
933
934 if is_ready {
936 self.propose_batch().await?;
937 }
938 }
939 Ok(())
940 }
941
942 async fn try_increment_to_the_next_round_lite(&self, next_round: u64) -> Result<()> {
944 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
946 let mut fast_forward_round = self.current_round();
947 while fast_forward_round < next_round.saturating_sub(1) {
949 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
951 *self.proposed_batch.write() = None;
953 }
954 }
955
956 let current_round = self.current_round();
958 if current_round < next_round {
960 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
962 match bft_sender.send_primary_round_to_bft(current_round).await {
963 Ok(is_ready) => is_ready,
964 Err(e) => {
965 guard_warn!(self, "Failed to update the BFT to the next round - {e}");
966 return Err(e);
967 }
968 }
969 }
970 else {
972 self.storage.increment_to_next_round(current_round)?;
974 true
976 };
977
978 match is_ready {
980 true => guard_debug!(self, "Primary is ready to propose the next round"),
981 false => guard_debug!(self, "Primary is not ready to propose the next round"),
982 }
983
984 }
989 Ok(())
990 }
991
992 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
995 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
997 Some(certificate) => certificate.timestamp(),
999 None => *self.latest_proposed_batch_timestamp.read(),
1000 };
1001
1002 let elapsed = timestamp
1004 .checked_sub(previous_timestamp)
1005 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1006 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1008 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1009 false => Ok(()),
1010 }
1011 }
1012
1013 async fn store_and_broadcast_certificate_lite(
1015 &self,
1016 proposal: &Proposal<N>,
1017 committee: &Committee<N>,
1018 ) -> Result<()> {
1019 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1021 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1024 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1026 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1027 guard_debug!(self, "Stored a batch certificate for round {}", certificate.round());
1028 if let Some(bft_sender) = self.bft_sender.get() {
1030 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1032 guard_warn!(self, "Failed to update the BFT DAG from primary - {e}");
1033 return Err(e);
1034 };
1035 }
1036 let num_transmissions = certificate.transmission_ids().len();
1038 let round = certificate.round();
1039 guard_info!(self, "\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1040 self.try_increment_to_the_next_round_lite(round + 1).await
1042 }
1043
1044 fn reinsert_transmissions_into_workers(
1047 &self,
1048 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1049 ) -> Result<()> {
1050 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1052 worker.reinsert(transmission_id, transmission);
1053 })
1054 }
1055
1056 #[async_recursion::async_recursion]
1066 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1067 &self,
1068 peer_ip: SocketAddr,
1069 certificate: BatchCertificate<N>,
1070 ) -> Result<()> {
1071 let batch_header = certificate.batch_header();
1073 let batch_round = batch_header.round();
1075
1076 if batch_round <= self.storage.gc_round() {
1078 return Ok(());
1079 }
1080 if self.storage.contains_certificate(certificate.id()) {
1082 return Ok(());
1083 }
1084
1085 if !IS_SYNCING && !self.is_synced() {
1087 bail!(
1088 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1089 fmt_id(certificate.id())
1090 );
1091 }
1092
1093 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1095
1096 if !self.storage.contains_certificate(certificate.id()) {
1098 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1100 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1101 guard_debug!(self, "Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1102 if let Some(bft_sender) = self.bft_sender.get() {
1104 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1106 guard_warn!(self, "Failed to update the BFT DAG from sync: {e}");
1107 return Err(e);
1108 };
1109 }
1110 }
1111 Ok(())
1112 }
1113
1114 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1116 &self,
1117 peer_ip: SocketAddr,
1118 batch_header: &BatchHeader<N>,
1119 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1120 let batch_round = batch_header.round();
1122
1123 if batch_round <= self.storage.gc_round() {
1125 bail!("Round {batch_round} is too far in the past")
1126 }
1127
1128 if !IS_SYNCING && !self.is_synced() {
1130 bail!(
1131 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1132 fmt_id(batch_header.batch_id())
1133 );
1134 }
1135
1136 let is_quorum_threshold_reached = {
1138 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1139 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1140 committee_lookback.is_quorum_threshold_reached(&authors)
1141 };
1142
1143 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1148 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1150 if is_behind_schedule || is_peer_far_in_future {
1152 self.try_increment_to_the_next_round(batch_round).await?;
1154 }
1155
1156 let missing_transmissions = self.fetch_missing_transmissions(batch_header).await.map_err(|e| {
1158 anyhow!("Failed to fetch missing transmissions for round {batch_round} from '{peer_ip}' - {e}")
1159 })?;
1160
1161 Ok(missing_transmissions)
1162 }
1163
1164 async fn fetch_missing_transmissions(
1167 &self,
1168 batch_header: &BatchHeader<N>,
1169 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1170 if batch_header.round() <= self.storage.gc_round() {
1172 return Ok(Default::default());
1173 }
1174
1175 if self.storage.contains_batch(batch_header.batch_id()) {
1177 guard_trace!(self, "Batch for round {} from peer has already been processed", batch_header.round());
1178 return Ok(Default::default());
1179 }
1180
1181 let workers = self.workers.clone();
1183
1184 let mut fetch_transmissions = FuturesUnordered::new();
1186
1187 let num_workers = self.num_workers();
1189 for transmission_id in batch_header.transmission_ids() {
1191 if !self.storage.contains_transmission(*transmission_id) {
1193 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1195 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1196 };
1197 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1199 fetch_transmissions.push(worker.get_or_fetch_transmission(*transmission_id));
1201 }
1202 }
1203
1204 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1206 while let Some(result) = fetch_transmissions.next().await {
1208 let (transmission_id, transmission) = result?;
1210 transmissions.insert(transmission_id, transmission);
1212 }
1213 Ok(transmissions)
1215 }
1216}
1217
1218impl<N: Network> Primary<N> {
1219 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1221 self.handles.lock().push(tokio::spawn(future));
1222 }
1223
1224 pub async fn shut_down(&self) {
1226 guard_info!(self, "Shutting down the primary...");
1227 {
1228 let mut handles = self.handles.lock();
1230 handles.iter().for_each(|handle| handle.abort());
1231 handles.clear();
1232 }
1233
1234 let save_pending = self.save_pending.swap(false, Ordering::AcqRel);
1238 if save_pending {
1239 let proposal_cache = {
1240 let proposal = self.proposed_batch.write().take();
1241 let signed_proposals = self.signed_proposals.read().clone();
1242 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1243 let pending_certificates = self.storage.get_pending_certificates();
1244 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1245 };
1246
1247 if let Err(err) = proposal_cache.store(&self.storage_mode, self) {
1248 guard_error!(self, "Failed to store the current proposal cache: {err}");
1249 }
1250 }
1251 }
1252}
1253
1254#[cfg(test)]
1255mod tests {
1256 use super::*;
1257 use amareleo_node_bft_ledger_service::MockLedgerService;
1258 use amareleo_node_bft_storage_service::BFTMemoryService;
1259 use snarkvm::{
1260 console::types::Field,
1261 ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1262 prelude::{Address, Signature},
1263 };
1264
1265 use bytes::Bytes;
1266 use indexmap::IndexSet;
1267 use rand::RngCore;
1268
1269 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1270
1271 async fn primary_without_handlers(
1273 rng: &mut TestRng,
1274 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1275 let (accounts, committee) = {
1277 const COMMITTEE_SIZE: usize = 4;
1278 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1279 let mut members = IndexMap::new();
1280
1281 for i in 0..COMMITTEE_SIZE {
1282 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1283 let account = Account::new(rng).unwrap();
1284 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1285 accounts.push((socket_addr, account));
1286 }
1287
1288 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1289 };
1290
1291 let account = accounts.first().unwrap().1.clone();
1292 let ledger = Arc::new(MockLedgerService::new(committee));
1293 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1294
1295 let mut primary = Primary::new(account, storage, false, StorageMode::Development(0), ledger, None).unwrap();
1297
1298 primary.workers = Arc::from([Worker::new(
1300 0, primary.storage.clone(),
1302 primary.ledger.clone(),
1303 primary.proposed_batch.clone(),
1304 None,
1305 )
1306 .unwrap()]);
1307
1308 (primary, accounts)
1309 }
1310
1311 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1313 let solution_id = rng.gen::<u64>().into();
1315 let size = rng.gen_range(1024..10 * 1024);
1317 let mut vec = vec![0u8; size];
1319 rng.fill_bytes(&mut vec);
1320 let solution = Data::Buffer(Bytes::from(vec));
1321 (solution_id, solution)
1323 }
1324
1325 fn sample_unconfirmed_transaction(
1327 rng: &mut TestRng,
1328 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1329 let id = Field::<CurrentNetwork>::rand(rng).into();
1331 let size = rng.gen_range(1024..10 * 1024);
1333 let mut vec = vec![0u8; size];
1335 rng.fill_bytes(&mut vec);
1336 let transaction = Data::Buffer(Bytes::from(vec));
1337 (id, transaction)
1339 }
1340
1341 fn create_test_proposal(
1343 author: &Account<CurrentNetwork>,
1344 committee: Committee<CurrentNetwork>,
1345 round: u64,
1346 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1347 timestamp: i64,
1348 rng: &mut TestRng,
1349 ) -> Proposal<CurrentNetwork> {
1350 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1351 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1352 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1353 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1354
1355 let solution_transmission_id = (solution_id, solution_checksum).into();
1356 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1357
1358 let private_key = author.private_key();
1360 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1362 let transmissions = [
1363 (solution_transmission_id, Transmission::Solution(solution)),
1364 (transaction_transmission_id, Transmission::Transaction(transaction)),
1365 ]
1366 .into();
1367 let batch_header = BatchHeader::new(
1369 private_key,
1370 round,
1371 timestamp,
1372 committee.id(),
1373 transmission_ids,
1374 previous_certificate_ids,
1375 rng,
1376 )
1377 .unwrap();
1378 Proposal::new(committee, batch_header, transmissions).unwrap()
1380 }
1381
1382 fn peer_signatures_for_batch(
1384 primary_address: Address<CurrentNetwork>,
1385 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1386 batch_id: Field<CurrentNetwork>,
1387 rng: &mut TestRng,
1388 ) -> IndexSet<Signature<CurrentNetwork>> {
1389 let mut signatures = IndexSet::new();
1390 for (_, account) in accounts {
1391 if account.address() == primary_address {
1392 continue;
1393 }
1394 let signature = account.sign(&[batch_id], rng).unwrap();
1395 signatures.insert(signature);
1396 }
1397 signatures
1398 }
1399
1400 fn create_batch_certificate(
1402 primary_address: Address<CurrentNetwork>,
1403 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1404 round: u64,
1405 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1406 rng: &mut TestRng,
1407 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1408 let timestamp = now();
1409
1410 let author =
1411 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1412 let private_key = author.private_key();
1413
1414 let committee_id = Field::rand(rng);
1415 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1416 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1417 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1418 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1419
1420 let solution_transmission_id = (solution_id, solution_checksum).into();
1421 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1422
1423 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1424 let transmissions = [
1425 (solution_transmission_id, Transmission::Solution(solution)),
1426 (transaction_transmission_id, Transmission::Transaction(transaction)),
1427 ]
1428 .into();
1429
1430 let batch_header = BatchHeader::new(
1431 private_key,
1432 round,
1433 timestamp,
1434 committee_id,
1435 transmission_ids,
1436 previous_certificate_ids,
1437 rng,
1438 )
1439 .unwrap();
1440 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1441 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1442 (certificate, transmissions)
1443 }
1444
1445 fn store_certificate_chain(
1447 primary: &Primary<CurrentNetwork>,
1448 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1449 round: u64,
1450 rng: &mut TestRng,
1451 ) -> IndexSet<Field<CurrentNetwork>> {
1452 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1453 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1454 for cur_round in 1..round {
1455 for (_, account) in accounts.iter() {
1456 let (certificate, transmissions) = create_batch_certificate(
1457 account.address(),
1458 accounts,
1459 cur_round,
1460 previous_certificates.clone(),
1461 rng,
1462 );
1463 next_certificates.insert(certificate.id());
1464 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1465 }
1466
1467 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
1468 previous_certificates = next_certificates;
1469 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1470 }
1471
1472 previous_certificates
1473 }
1474
1475 #[tokio::test]
1476 async fn test_propose_batch() {
1477 let mut rng = TestRng::default();
1478 let (primary, _) = primary_without_handlers(&mut rng).await;
1479
1480 assert!(primary.proposed_batch.read().is_none());
1482
1483 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1485 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1486
1487 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1489 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1490
1491 assert!(primary.propose_batch().await.is_ok());
1493
1494 }
1497
1498 #[tokio::test]
1499 async fn test_propose_batch_with_no_transmissions() {
1500 let mut rng = TestRng::default();
1501 let (primary, _) = primary_without_handlers(&mut rng).await;
1502
1503 assert!(primary.proposed_batch.read().is_none());
1505
1506 assert!(primary.propose_batch().await.is_ok());
1508
1509 }
1512
1513 #[tokio::test]
1514 async fn test_propose_batch_in_round() {
1515 let round = 3;
1516 let mut rng = TestRng::default();
1517 let (primary, accounts) = primary_without_handlers(&mut rng).await;
1518
1519 store_certificate_chain(&primary, &accounts, round, &mut rng);
1521
1522 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
1524
1525 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1527 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1528
1529 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1531 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1532
1533 assert!(primary.propose_batch().await.is_ok());
1535
1536 }
1539
1540 #[tokio::test]
1541 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
1542 let round = 3;
1543 let mut rng = TestRng::default();
1544 let (primary, _) = primary_without_handlers(&mut rng).await;
1545
1546 assert!(primary.proposed_batch.read().is_none());
1548
1549 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1551 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1552
1553 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1555 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1556
1557 let old_proposal_lock_round = *primary.propose_lock.lock().await;
1559 *primary.propose_lock.lock().await = round + 1;
1560
1561 assert!(primary.propose_batch().await.is_ok());
1563 assert!(primary.proposed_batch.read().is_none());
1564
1565 *primary.propose_lock.lock().await = old_proposal_lock_round;
1567
1568 assert!(primary.propose_batch().await.is_ok());
1570
1571 }
1574
1575 #[tokio::test]
1576 async fn test_propose_batch_with_storage_round_behind_proposal() {
1577 let round = 5;
1578 let mut rng = TestRng::default();
1579 let (primary, accounts) = primary_without_handlers(&mut rng).await;
1580
1581 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
1583
1584 let timestamp = now();
1586 let proposal = create_test_proposal(
1587 &primary.account,
1588 primary.ledger.current_committee().unwrap(),
1589 round + 1,
1590 previous_certificates,
1591 timestamp,
1592 &mut rng,
1593 );
1594
1595 *primary.proposed_batch.write() = Some(proposal);
1597
1598 assert!(primary.propose_batch().await.is_ok());
1600 assert!(primary.proposed_batch.read().is_some());
1601 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
1602 }
1603}