1use crate::{
17 DEVELOPMENT_MODE_RNG_SEED,
18 MAX_BATCH_DELAY_IN_MS,
19 MAX_WORKERS,
20 MIN_BATCH_DELAY_IN_SECS,
21 Sync,
22 Worker,
23 helpers::{
24 BFTSender,
25 PrimaryReceiver,
26 PrimarySender,
27 Proposal,
28 ProposalCache,
29 SignedProposals,
30 Storage,
31 assign_to_worker,
32 assign_to_workers,
33 fmt_id,
34 now,
35 },
36 spawn_blocking,
37};
38use amareleo_chain_account::Account;
39use amareleo_chain_tracing::TracingHandler;
40use amareleo_node_bft_ledger_service::LedgerService;
41use amareleo_node_sync::DUMMY_SELF_IP;
42use snarkvm::{
43 console::{prelude::*, types::Address},
44 ledger::{
45 block::Transaction,
46 narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
47 puzzle::{Solution, SolutionID},
48 },
49 prelude::{Signature, committee::Committee},
50};
51
52use aleo_std::StorageMode;
53use colored::Colorize;
54use futures::stream::{FuturesUnordered, StreamExt};
55use indexmap::IndexMap;
56use parking_lot::{Mutex, RwLock};
57
58use rand::SeedableRng;
59use rand_chacha::ChaChaRng;
60use snarkvm::console::account::PrivateKey;
61
62use std::{
63 collections::{HashMap, HashSet},
64 future::Future,
65 net::SocketAddr,
66 sync::Arc,
67 time::Duration,
68};
69use tokio::{
70 sync::{Mutex as TMutex, OnceCell},
71 task::JoinHandle,
72};
73use tracing::subscriber::DefaultGuard;
74
75pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
77
78#[derive(Clone)]
79pub struct Primary<N: Network> {
80 sync: Sync<N>,
82 account: Account<N>,
84 storage: Storage<N>,
86 keep_state: bool,
88 storage_mode: StorageMode,
90 ledger: Arc<dyn LedgerService<N>>,
92 workers: Arc<[Worker<N>]>,
94 bft_sender: Arc<OnceCell<BFTSender<N>>>,
96 proposed_batch: Arc<ProposedBatch<N>>,
98 latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
100 signed_proposals: Arc<RwLock<SignedProposals<N>>>,
102 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
104 tracing: Option<TracingHandler>,
106 propose_lock: Arc<TMutex<u64>>,
108}
109
110impl<N: Network> Primary<N> {
111 pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
113
114 pub fn new(
116 account: Account<N>,
117 storage: Storage<N>,
118 keep_state: bool,
119 storage_mode: StorageMode,
120 ledger: Arc<dyn LedgerService<N>>,
121 tracing: Option<TracingHandler>,
122 ) -> Result<Self> {
123 let sync = Sync::new(storage.clone(), ledger.clone());
125
126 Ok(Self {
128 sync,
129 account,
130 storage,
131 keep_state,
132 storage_mode,
133 ledger,
134 workers: Arc::from(vec![]),
135 bft_sender: Default::default(),
136 proposed_batch: Default::default(),
137 latest_proposed_batch_timestamp: Default::default(),
138 signed_proposals: Default::default(),
139 handles: Default::default(),
140 tracing,
141 propose_lock: Default::default(),
142 })
143 }
144
145 pub fn get_tracing_guard(&self) -> Option<DefaultGuard> {
147 self.tracing.clone().map(|trace_handle| trace_handle.subscribe_thread())
148 }
149
150 async fn load_proposal_cache(&self) -> Result<()> {
152 let _guard = self.get_tracing_guard();
153
154 match ProposalCache::<N>::exists(&self.storage_mode) {
156 true => {
158 match ProposalCache::<N>::load(self.account.address(), &self.storage_mode, self.tracing.clone()) {
159 Ok(proposal_cache) => {
160 let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
162 proposal_cache.into();
163
164 *self.proposed_batch.write() = proposed_batch;
166 *self.signed_proposals.write() = signed_proposals;
168 *self.propose_lock.lock().await = latest_certificate_round;
170
171 for certificate in pending_certificates {
173 let batch_id = certificate.batch_id();
174 if let Err(err) =
178 self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
179 {
180 warn!(
181 "Failed to load stored certificate {} from proposal cache - {err}",
182 fmt_id(batch_id)
183 );
184 }
185 }
186 Ok(())
187 }
188 Err(err) => {
189 bail!("Failed to read the signed proposals from the file system - {err}.");
190 }
191 }
192 }
193 false => Ok(()),
195 }
196 }
197
198 pub async fn run(
200 &mut self,
201 bft_sender: Option<BFTSender<N>>,
202 _primary_sender: PrimarySender<N>,
203 primary_receiver: PrimaryReceiver<N>,
204 ) -> Result<()> {
205 let _guard = self.get_tracing_guard();
206 info!("Starting the primary instance of the memory pool...");
207
208 if let Some(bft_sender) = &bft_sender {
210 self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
212 }
213
214 let mut workers = Vec::new();
216 for id in 0..MAX_WORKERS {
218 let worker = Worker::new(
220 id,
221 self.storage.clone(),
222 self.ledger.clone(),
223 self.proposed_batch.clone(),
224 self.tracing.clone(),
225 )?;
226
227 workers.push(worker);
229 }
230 self.workers = Arc::from(workers);
232
233 self.sync.initialize(bft_sender).await?;
235 self.load_proposal_cache().await?;
237 self.sync.run().await?;
239 self.start_handlers(primary_receiver);
242
243 Ok(())
244 }
245
246 pub fn current_round(&self) -> u64 {
248 self.storage.current_round()
249 }
250
251 pub fn is_synced(&self) -> bool {
253 self.sync.is_synced()
254 }
255
256 pub const fn account(&self) -> &Account<N> {
258 &self.account
259 }
260
261 pub const fn storage(&self) -> &Storage<N> {
263 &self.storage
264 }
265
266 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
268 &self.ledger
269 }
270
271 pub fn num_workers(&self) -> u8 {
273 u8::try_from(self.workers.len()).expect("Too many workers")
274 }
275
276 pub const fn workers(&self) -> &Arc<[Worker<N>]> {
278 &self.workers
279 }
280
281 pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
283 &self.proposed_batch
284 }
285}
286
287impl<N: Network> Primary<N> {
288 pub fn num_unconfirmed_transmissions(&self) -> usize {
290 self.workers.iter().map(|worker| worker.num_transmissions()).sum()
291 }
292
293 pub fn num_unconfirmed_ratifications(&self) -> usize {
295 self.workers.iter().map(|worker| worker.num_ratifications()).sum()
296 }
297
298 pub fn num_unconfirmed_solutions(&self) -> usize {
300 self.workers.iter().map(|worker| worker.num_solutions()).sum()
301 }
302
303 pub fn num_unconfirmed_transactions(&self) -> usize {
305 self.workers.iter().map(|worker| worker.num_transactions()).sum()
306 }
307}
308
309impl<N: Network> Primary<N> {
310 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
312 self.workers.iter().flat_map(|worker| worker.transmission_ids())
313 }
314
315 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
317 self.workers.iter().flat_map(|worker| worker.transmissions())
318 }
319
320 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
322 self.workers.iter().flat_map(|worker| worker.solutions())
323 }
324
325 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
327 self.workers.iter().flat_map(|worker| worker.transactions())
328 }
329}
330
331impl<N: Network> Primary<N> {
332 pub fn clear_worker_solutions(&self) {
334 self.workers.iter().for_each(Worker::clear_solutions);
335 }
336}
337
338impl<N: Network> Primary<N> {
339 pub async fn propose_batch(&self) -> Result<()> {
340 let mut rng = ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
341 let mut all_acc: Vec<Account<N>> = Vec::new();
342
343 for _ in 0u64..4u64 {
344 let private_key = PrivateKey::<N>::new(&mut rng)?;
345 let acc = Account::<N>::try_from(private_key).expect("Failed to initialize account with private key");
346 all_acc.push(acc);
347 }
348
349 let primary_addr = all_acc[0].address();
351 let other_acc: Vec<&Account<N>> = all_acc.iter().filter(|acc| acc.address() != primary_addr).collect();
352
353 let round = self.propose_batch_lite(&other_acc).await?;
354 if round == 0u64 {
355 return Ok(());
356 }
357
358 for vid in 1..all_acc.len() {
360 let primary_acc = &all_acc[vid];
361 let other_acc: Vec<&Account<N>> =
362 all_acc.iter().filter(|acc| acc.address() != primary_acc.address()).collect();
363
364 self.fake_proposal(vid.try_into().unwrap(), primary_acc, &other_acc, round).await?;
365 }
366 Ok(())
367 }
368
369 pub async fn propose_batch_lite(&self, other_acc: &[&Account<N>]) -> Result<u64> {
370 let mut lock_guard = self.propose_lock.lock().await;
372 let _guard = self.get_tracing_guard();
373
374 let round = self.current_round();
376 let previous_round = round.saturating_sub(1);
378
379 ensure!(round > 0, "Round 0 cannot have transaction batches");
381
382 if round < *lock_guard {
384 warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
385 return Ok(0u64);
386 }
387
388 #[cfg(feature = "metrics")]
389 metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
390
391 if let Err(e) = self.check_proposal_timestamp(previous_round, self.account.address(), now()) {
393 debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
394 return Ok(0u64);
395 }
396
397 if self.storage.contains_certificate_in_round_from(round, self.account.address()) {
399 if let Some(bft_sender) = self.bft_sender.get() {
401 match bft_sender.send_primary_round_to_bft(self.current_round()).await {
402 Ok(true) => (), Ok(false) => return Ok(0u64),
406 Err(e) => {
408 warn!("Failed to update the BFT to the next round - {e}");
409 return Err(e);
410 }
411 }
412 }
413 debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
414 return Ok(0u64);
415 }
416
417 if round == *lock_guard {
423 warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
424 return Ok(0u64);
425 }
426
427 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
429 {
431 let mut connected_validators: HashSet<Address<N>> = other_acc.iter().map(|acc| acc.address()).collect();
433
434 connected_validators.insert(self.account.address());
436
437 if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
439 debug!(
440 "Primary is safely skipping a batch proposal for round {round} {}",
441 "(please connect to more validators)".dimmed()
442 );
443 trace!("Primary is connected to {} validators", connected_validators.len() - 1);
444 return Ok(0u64);
445 }
446 }
447
448 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
450
451 let mut is_ready = previous_round == 0;
454 if previous_round > 0 {
456 let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
458 bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
459 };
460 let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
462 if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
464 is_ready = true;
465 }
466 }
467 if !is_ready {
469 debug!(
470 "Primary is safely skipping a batch proposal for round {round} {}",
471 format!("(previous round {previous_round} has not reached quorum)").dimmed()
472 );
473 return Ok(0u64);
474 }
475
476 let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
478 let mut transmissions: IndexMap<_, _> = Default::default();
480 for worker in self.workers.iter() {
482 let mut num_transmissions_included_for_worker = 0;
484 'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
486 let num_remaining_transmissions =
488 num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
489 let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
491 if worker_transmissions.peek().is_none() {
493 break 'outer;
494 }
495 'inner: for (id, transmission) in worker_transmissions {
497 if self.ledger.contains_transmission(&id).unwrap_or(true) {
499 trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
500 continue 'inner;
501 }
502 if !transmissions.is_empty() && self.storage.contains_transmission(id) {
506 trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
507 continue 'inner;
508 }
509 match (id, transmission.clone()) {
511 (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
512 match solution.to_checksum::<N>() {
514 Ok(solution_checksum) if solution_checksum == checksum => (),
515 _ => {
516 trace!(
517 "Proposing - Skipping solution '{}' - Checksum mismatch",
518 fmt_id(solution_id)
519 );
520 continue 'inner;
521 }
522 }
523 if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
525 trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
526 continue 'inner;
527 }
528 }
529 (
530 TransmissionID::Transaction(transaction_id, checksum),
531 Transmission::Transaction(transaction),
532 ) => {
533 match transaction.to_checksum::<N>() {
535 Ok(transaction_checksum) if transaction_checksum == checksum => (),
536 _ => {
537 trace!(
538 "Proposing - Skipping transaction '{}' - Checksum mismatch",
539 fmt_id(transaction_id)
540 );
541 continue 'inner;
542 }
543 }
544 if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
546 trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
547 continue 'inner;
548 }
549 }
550 (TransmissionID::Ratification, Transmission::Ratification) => continue,
553 _ => continue 'inner,
555 }
556 transmissions.insert(id, transmission);
558 num_transmissions_included_for_worker += 1;
559 }
560 }
561 }
562
563 let current_timestamp = now();
565
566 *lock_guard = round;
567
568 info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
570
571 let private_key = *self.account.private_key();
573 let committee_id = committee_lookback.id();
575 let transmission_ids = transmissions.keys().copied().collect();
577 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
579 let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
581 &private_key,
582 round,
583 current_timestamp,
584 committee_id,
585 transmission_ids,
586 previous_certificate_ids,
587 &mut rand::thread_rng()
588 ))
589 .and_then(|batch_header| {
590 Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
591 .map(|proposal| (batch_header, proposal))
592 })
593 .inspect_err(|_| {
594 if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
596 error!("Failed to reinsert transmissions: {e:?}");
597 }
598 })?;
599 *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
601
602 info!("Quorum threshold reached - Preparing to certify our batch for round {round}...");
609
610 let batch_id = batch_header.batch_id();
612
613 for acc in other_acc.iter() {
615 let signer_acc = (*acc).clone();
617 let signer = signer_acc.address();
618 let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
619
620 proposal.add_signature(signer, signature, &committee_lookback)?;
622 }
623
624 if let Err(e) = self.store_and_broadcast_certificate_lite(&proposal, &committee_lookback).await {
627 self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
629 return Err(e);
630 }
631
632 #[cfg(feature = "metrics")]
633 metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
634 Ok(round)
635 }
636
637 pub async fn fake_proposal(
638 &self,
639 vid: u64,
640 primary_acc: &Account<N>,
641 other_acc: &[&Account<N>],
642 round: u64,
643 ) -> Result<()> {
644 let _guard = self.get_tracing_guard();
645 let transmissions: IndexMap<_, _> = Default::default();
646 let transmission_ids = transmissions.keys().copied().collect();
647
648 let private_key = *primary_acc.private_key();
649 let current_timestamp = now();
650
651 let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
652 let committee_id = committee_lookback.id();
653
654 let previous_round = round.saturating_sub(1);
655 let previous_certificates = self.storage.get_certificates_for_round(previous_round);
656 let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
657
658 let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
659 &private_key,
660 round,
661 current_timestamp,
662 committee_id,
663 transmission_ids,
664 previous_certificate_ids,
665 &mut rand::thread_rng()
666 ))
667 .and_then(|batch_header| {
668 Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
669 .map(|proposal| (batch_header, proposal))
670 })?;
671
672 let batch_id = batch_header.batch_id();
674 let mut our_sign: Option<Signature<N>> = None;
675
676 for acc in other_acc.iter() {
678 let signer_acc = (*acc).clone();
680 let signer = signer_acc.address();
681 let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
682
683 if signer == self.account.address() {
684 our_sign = Some(signature);
685 }
686
687 proposal.add_signature(signer, signature, &committee_lookback)?;
689 }
690
691 let our_sign = match our_sign {
693 Some(sign) => sign,
694 None => bail!("Fake Proposal generation failed. Validator 0 signature missing."),
695 };
696
697 let (certificate, transmissions) =
699 tokio::task::block_in_place(|| proposal.to_certificate(&committee_lookback))?;
700
701 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
704
705 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
707 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
708 info!("Stored a batch certificate for validator/round {vid}/{round}");
709
710 match self.signed_proposals.write().0.entry(primary_acc.address()) {
711 std::collections::hash_map::Entry::Occupied(mut entry) => {
712 if entry.get().0 == round {
717 return Ok(());
718 }
719 entry.insert((round, batch_id, our_sign));
721 info!("Inserted signature to signed_proposals {vid}/{round}");
722 }
723 std::collections::hash_map::Entry::Vacant(entry) => {
725 entry.insert((round, batch_id, our_sign));
727 info!("Inserted signature to signed_proposals {vid}/{round}");
728 }
729 };
730
731 if let Some(bft_sender) = self.bft_sender.get() {
732 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
734 warn!("Failed to update the BFT DAG from sync: {e}");
735 return Err(e);
736 };
737 }
738
739 Ok(())
740 }
741}
742
743impl<N: Network> Primary<N> {
744 fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
746 let PrimaryReceiver {
747 rx_batch_propose: _,
748 rx_batch_signature: _,
749 rx_batch_certified: _,
750 rx_primary_ping: _,
751 mut rx_unconfirmed_solution,
752 mut rx_unconfirmed_transaction,
753 } = primary_receiver;
754
755 let self_ = self.clone();
757 let guard = self_.get_tracing_guard();
758 self.spawn(async move {
759 let _guard = guard;
760 loop {
761 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
763 let current_round = self_.current_round();
764 if !self_.is_synced() {
766 debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
767 continue;
768 }
769 if self_.propose_lock.try_lock().is_err() {
772 trace!(
773 "Skipping batch proposal for round {current_round} {}",
774 "(node is already proposing)".dimmed()
775 );
776 continue;
777 };
778 if let Err(e) = self_.propose_batch().await {
782 warn!("Cannot propose a batch - {e}");
783 }
784 }
785 });
786
787 let self_ = self.clone();
791 let guard = self_.get_tracing_guard();
792 self.spawn(async move {
793 let _guard = guard;
794 loop {
795 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
797 if !self_.is_synced() {
799 trace!("Skipping round increment {}", "(node is syncing)".dimmed());
800 continue;
801 }
802 let next_round = self_.current_round().saturating_add(1);
804 let is_quorum_threshold_reached = {
806 let authors = self_.storage.get_certificate_authors_for_round(next_round);
808 if authors.is_empty() {
810 continue;
811 }
812 let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
813 warn!("Failed to retrieve the committee lookback for round {next_round}");
814 continue;
815 };
816 committee_lookback.is_quorum_threshold_reached(&authors)
817 };
818 if is_quorum_threshold_reached {
820 debug!("Quorum threshold reached for round {}", next_round);
821 if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
822 warn!("Failed to increment to the next round - {e}");
823 }
824 }
825 }
826 });
827
828 let self_ = self.clone();
830 let guard = self_.get_tracing_guard();
831 self.spawn(async move {
832 let _guard = guard;
833 while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
834 let Ok(checksum) = solution.to_checksum::<N>() else {
836 error!("Failed to compute the checksum for the unconfirmed solution");
837 continue;
838 };
839 let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
841 error!("Unable to determine the worker ID for the unconfirmed solution");
842 continue;
843 };
844 let self_ = self_.clone();
845 tokio::spawn(async move {
846 let worker = &self_.workers[worker_id as usize];
848 let result = worker.process_unconfirmed_solution(solution_id, solution).await;
850 callback.send(result).ok();
852 });
853 }
854 });
855
856 let self_ = self.clone();
858 let guard = self_.get_tracing_guard();
859 self.spawn(async move {
860 let _guard = guard;
861 while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
862 trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
863 let Ok(checksum) = transaction.to_checksum::<N>() else {
865 error!("Failed to compute the checksum for the unconfirmed transaction");
866 continue;
867 };
868 let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
870 error!("Unable to determine the worker ID for the unconfirmed transaction");
871 continue;
872 };
873 let self_ = self_.clone();
874 tokio::spawn(async move {
875 let worker = &self_.workers[worker_id as usize];
877 let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
879 callback.send(result).ok();
881 });
882 }
883 });
884 }
885
886 async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
888 let _guard = self.get_tracing_guard();
889
890 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
892 let mut fast_forward_round = self.current_round();
893 while fast_forward_round < next_round.saturating_sub(1) {
895 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
897 *self.proposed_batch.write() = None;
899 }
900 }
901
902 let current_round = self.current_round();
904 if current_round < next_round {
906 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
908 match bft_sender.send_primary_round_to_bft(current_round).await {
909 Ok(is_ready) => is_ready,
910 Err(e) => {
911 warn!("Failed to update the BFT to the next round - {e}");
912 return Err(e);
913 }
914 }
915 }
916 else {
918 self.storage.increment_to_next_round(current_round)?;
920 true
922 };
923
924 match is_ready {
926 true => debug!("Primary is ready to propose the next round"),
927 false => debug!("Primary is not ready to propose the next round"),
928 }
929
930 if is_ready {
932 self.propose_batch().await?;
933 }
934 }
935 Ok(())
936 }
937
938 async fn try_increment_to_the_next_round_lite(&self, next_round: u64) -> Result<()> {
940 let _guard = self.get_tracing_guard();
941
942 if self.current_round() + self.storage.max_gc_rounds() >= next_round {
944 let mut fast_forward_round = self.current_round();
945 while fast_forward_round < next_round.saturating_sub(1) {
947 fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
949 *self.proposed_batch.write() = None;
951 }
952 }
953
954 let current_round = self.current_round();
956 if current_round < next_round {
958 let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
960 match bft_sender.send_primary_round_to_bft(current_round).await {
961 Ok(is_ready) => is_ready,
962 Err(e) => {
963 warn!("Failed to update the BFT to the next round - {e}");
964 return Err(e);
965 }
966 }
967 }
968 else {
970 self.storage.increment_to_next_round(current_round)?;
972 true
974 };
975
976 match is_ready {
978 true => debug!("Primary is ready to propose the next round"),
979 false => debug!("Primary is not ready to propose the next round"),
980 }
981
982 }
987 Ok(())
988 }
989
990 fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
993 let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
995 Some(certificate) => certificate.timestamp(),
997 None => *self.latest_proposed_batch_timestamp.read(),
998 };
999
1000 let elapsed = timestamp
1002 .checked_sub(previous_timestamp)
1003 .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1004 match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1006 true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1007 false => Ok(()),
1008 }
1009 }
1010
1011 async fn store_and_broadcast_certificate_lite(
1013 &self,
1014 proposal: &Proposal<N>,
1015 committee: &Committee<N>,
1016 ) -> Result<()> {
1017 let _guard = self.get_tracing_guard();
1018
1019 let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1021 let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1024 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1026 spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1027 debug!("Stored a batch certificate for round {}", certificate.round());
1028 if let Some(bft_sender) = self.bft_sender.get() {
1030 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1032 warn!("Failed to update the BFT DAG from primary - {e}");
1033 return Err(e);
1034 };
1035 }
1036 let num_transmissions = certificate.transmission_ids().len();
1038 let round = certificate.round();
1039 info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1040 self.try_increment_to_the_next_round_lite(round + 1).await
1042 }
1043
1044 fn reinsert_transmissions_into_workers(
1047 &self,
1048 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1049 ) -> Result<()> {
1050 assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1052 worker.reinsert(transmission_id, transmission);
1053 })
1054 }
1055
1056 #[async_recursion::async_recursion]
1066 async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1067 &self,
1068 peer_ip: SocketAddr,
1069 certificate: BatchCertificate<N>,
1070 ) -> Result<()> {
1071 let _guard = self.get_tracing_guard();
1072 let batch_header = certificate.batch_header();
1074 let batch_round = batch_header.round();
1076
1077 if batch_round <= self.storage.gc_round() {
1079 return Ok(());
1080 }
1081 if self.storage.contains_certificate(certificate.id()) {
1083 return Ok(());
1084 }
1085
1086 if !IS_SYNCING && !self.is_synced() {
1088 bail!(
1089 "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1090 fmt_id(certificate.id())
1091 );
1092 }
1093
1094 let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1096
1097 if !self.storage.contains_certificate(certificate.id()) {
1099 let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1101 spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1102 debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1103 if let Some(bft_sender) = self.bft_sender.get() {
1105 if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1107 warn!("Failed to update the BFT DAG from sync: {e}");
1108 return Err(e);
1109 };
1110 }
1111 }
1112 Ok(())
1113 }
1114
1115 async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1117 &self,
1118 peer_ip: SocketAddr,
1119 batch_header: &BatchHeader<N>,
1120 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1121 let batch_round = batch_header.round();
1123
1124 if batch_round <= self.storage.gc_round() {
1126 bail!("Round {batch_round} is too far in the past")
1127 }
1128
1129 if !IS_SYNCING && !self.is_synced() {
1131 bail!(
1132 "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1133 fmt_id(batch_header.batch_id())
1134 );
1135 }
1136
1137 let is_quorum_threshold_reached = {
1139 let authors = self.storage.get_certificate_authors_for_round(batch_round);
1140 let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1141 committee_lookback.is_quorum_threshold_reached(&authors)
1142 };
1143
1144 let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1149 let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1151 if is_behind_schedule || is_peer_far_in_future {
1153 self.try_increment_to_the_next_round(batch_round).await?;
1155 }
1156
1157 let missing_transmissions = self.fetch_missing_transmissions(batch_header).await.map_err(|e| {
1159 anyhow!("Failed to fetch missing transmissions for round {batch_round} from '{peer_ip}' - {e}")
1160 })?;
1161
1162 Ok(missing_transmissions)
1163 }
1164
1165 async fn fetch_missing_transmissions(
1168 &self,
1169 batch_header: &BatchHeader<N>,
1170 ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1171 if batch_header.round() <= self.storage.gc_round() {
1173 return Ok(Default::default());
1174 }
1175
1176 let _guard = self.get_tracing_guard();
1177 if self.storage.contains_batch(batch_header.batch_id()) {
1179 trace!("Batch for round {} from peer has already been processed", batch_header.round());
1180 return Ok(Default::default());
1181 }
1182
1183 let workers = self.workers.clone();
1185
1186 let mut fetch_transmissions = FuturesUnordered::new();
1188
1189 let num_workers = self.num_workers();
1191 for transmission_id in batch_header.transmission_ids() {
1193 if !self.storage.contains_transmission(*transmission_id) {
1195 let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1197 bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1198 };
1199 let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1201 fetch_transmissions.push(worker.get_or_fetch_transmission(*transmission_id));
1203 }
1204 }
1205
1206 let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1208 while let Some(result) = fetch_transmissions.next().await {
1210 let (transmission_id, transmission) = result?;
1212 transmissions.insert(transmission_id, transmission);
1214 }
1215 Ok(transmissions)
1217 }
1218}
1219
1220impl<N: Network> Primary<N> {
1221 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1223 self.handles.lock().push(tokio::spawn(future));
1224 }
1225
1226 pub async fn shut_down(&self) {
1228 let _guard = self.get_tracing_guard();
1229 info!("Shutting down the primary...");
1230 self.handles.lock().iter().for_each(|handle| handle.abort());
1232
1233 if self.keep_state {
1235 let proposal_cache = {
1236 let proposal = self.proposed_batch.write().take();
1237 let signed_proposals = self.signed_proposals.read().clone();
1238 let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1239 let pending_certificates = self.storage.get_pending_certificates();
1240 ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1241 };
1242
1243 if let Err(err) = proposal_cache.store(&self.storage_mode, self.tracing.clone()) {
1244 error!("Failed to store the current proposal cache: {err}");
1245 }
1246 }
1247 }
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252 use super::*;
1253 use amareleo_node_bft_ledger_service::MockLedgerService;
1254 use amareleo_node_bft_storage_service::BFTMemoryService;
1255 use snarkvm::{
1256 console::types::Field,
1257 ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1258 prelude::{Address, Signature},
1259 };
1260
1261 use bytes::Bytes;
1262 use indexmap::IndexSet;
1263 use rand::RngCore;
1264
1265 type CurrentNetwork = snarkvm::prelude::MainnetV0;
1266
1267 async fn primary_without_handlers(
1269 rng: &mut TestRng,
1270 ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1271 let (accounts, committee) = {
1273 const COMMITTEE_SIZE: usize = 4;
1274 let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1275 let mut members = IndexMap::new();
1276
1277 for i in 0..COMMITTEE_SIZE {
1278 let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1279 let account = Account::new(rng).unwrap();
1280 members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1281 accounts.push((socket_addr, account));
1282 }
1283
1284 (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1285 };
1286
1287 let account = accounts.first().unwrap().1.clone();
1288 let ledger = Arc::new(MockLedgerService::new(committee));
1289 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1290
1291 let mut primary = Primary::new(account, storage, false, StorageMode::Development(0), ledger, None).unwrap();
1293
1294 primary.workers = Arc::from([Worker::new(
1296 0, primary.storage.clone(),
1298 primary.ledger.clone(),
1299 primary.proposed_batch.clone(),
1300 None,
1301 )
1302 .unwrap()]);
1303
1304 (primary, accounts)
1305 }
1306
1307 fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1309 let solution_id = rng.gen::<u64>().into();
1311 let size = rng.gen_range(1024..10 * 1024);
1313 let mut vec = vec![0u8; size];
1315 rng.fill_bytes(&mut vec);
1316 let solution = Data::Buffer(Bytes::from(vec));
1317 (solution_id, solution)
1319 }
1320
1321 fn sample_unconfirmed_transaction(
1323 rng: &mut TestRng,
1324 ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1325 let id = Field::<CurrentNetwork>::rand(rng).into();
1327 let size = rng.gen_range(1024..10 * 1024);
1329 let mut vec = vec![0u8; size];
1331 rng.fill_bytes(&mut vec);
1332 let transaction = Data::Buffer(Bytes::from(vec));
1333 (id, transaction)
1335 }
1336
1337 fn create_test_proposal(
1339 author: &Account<CurrentNetwork>,
1340 committee: Committee<CurrentNetwork>,
1341 round: u64,
1342 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1343 timestamp: i64,
1344 rng: &mut TestRng,
1345 ) -> Proposal<CurrentNetwork> {
1346 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1347 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1348 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1349 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1350
1351 let solution_transmission_id = (solution_id, solution_checksum).into();
1352 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1353
1354 let private_key = author.private_key();
1356 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1358 let transmissions = [
1359 (solution_transmission_id, Transmission::Solution(solution)),
1360 (transaction_transmission_id, Transmission::Transaction(transaction)),
1361 ]
1362 .into();
1363 let batch_header = BatchHeader::new(
1365 private_key,
1366 round,
1367 timestamp,
1368 committee.id(),
1369 transmission_ids,
1370 previous_certificate_ids,
1371 rng,
1372 )
1373 .unwrap();
1374 Proposal::new(committee, batch_header, transmissions).unwrap()
1376 }
1377
1378 fn peer_signatures_for_batch(
1380 primary_address: Address<CurrentNetwork>,
1381 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1382 batch_id: Field<CurrentNetwork>,
1383 rng: &mut TestRng,
1384 ) -> IndexSet<Signature<CurrentNetwork>> {
1385 let mut signatures = IndexSet::new();
1386 for (_, account) in accounts {
1387 if account.address() == primary_address {
1388 continue;
1389 }
1390 let signature = account.sign(&[batch_id], rng).unwrap();
1391 signatures.insert(signature);
1392 }
1393 signatures
1394 }
1395
1396 fn create_batch_certificate(
1398 primary_address: Address<CurrentNetwork>,
1399 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1400 round: u64,
1401 previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1402 rng: &mut TestRng,
1403 ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1404 let timestamp = now();
1405
1406 let author =
1407 accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1408 let private_key = author.private_key();
1409
1410 let committee_id = Field::rand(rng);
1411 let (solution_id, solution) = sample_unconfirmed_solution(rng);
1412 let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1413 let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1414 let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1415
1416 let solution_transmission_id = (solution_id, solution_checksum).into();
1417 let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1418
1419 let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1420 let transmissions = [
1421 (solution_transmission_id, Transmission::Solution(solution)),
1422 (transaction_transmission_id, Transmission::Transaction(transaction)),
1423 ]
1424 .into();
1425
1426 let batch_header = BatchHeader::new(
1427 private_key,
1428 round,
1429 timestamp,
1430 committee_id,
1431 transmission_ids,
1432 previous_certificate_ids,
1433 rng,
1434 )
1435 .unwrap();
1436 let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1437 let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1438 (certificate, transmissions)
1439 }
1440
1441 fn store_certificate_chain(
1443 primary: &Primary<CurrentNetwork>,
1444 accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1445 round: u64,
1446 rng: &mut TestRng,
1447 ) -> IndexSet<Field<CurrentNetwork>> {
1448 let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1449 let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1450 for cur_round in 1..round {
1451 for (_, account) in accounts.iter() {
1452 let (certificate, transmissions) = create_batch_certificate(
1453 account.address(),
1454 accounts,
1455 cur_round,
1456 previous_certificates.clone(),
1457 rng,
1458 );
1459 next_certificates.insert(certificate.id());
1460 assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1461 }
1462
1463 assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
1464 previous_certificates = next_certificates;
1465 next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1466 }
1467
1468 previous_certificates
1469 }
1470
1471 #[tokio::test]
1472 async fn test_propose_batch() {
1473 let mut rng = TestRng::default();
1474 let (primary, _) = primary_without_handlers(&mut rng).await;
1475
1476 assert!(primary.proposed_batch.read().is_none());
1478
1479 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1481 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1482
1483 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1485 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1486
1487 assert!(primary.propose_batch().await.is_ok());
1489
1490 }
1493
1494 #[tokio::test]
1495 async fn test_propose_batch_with_no_transmissions() {
1496 let mut rng = TestRng::default();
1497 let (primary, _) = primary_without_handlers(&mut rng).await;
1498
1499 assert!(primary.proposed_batch.read().is_none());
1501
1502 assert!(primary.propose_batch().await.is_ok());
1504
1505 }
1508
1509 #[tokio::test]
1510 async fn test_propose_batch_in_round() {
1511 let round = 3;
1512 let mut rng = TestRng::default();
1513 let (primary, accounts) = primary_without_handlers(&mut rng).await;
1514
1515 store_certificate_chain(&primary, &accounts, round, &mut rng);
1517
1518 tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
1520
1521 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1523 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1524
1525 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1527 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1528
1529 assert!(primary.propose_batch().await.is_ok());
1531
1532 }
1535
1536 #[tokio::test]
1537 async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
1538 let round = 3;
1539 let mut rng = TestRng::default();
1540 let (primary, _) = primary_without_handlers(&mut rng).await;
1541
1542 assert!(primary.proposed_batch.read().is_none());
1544
1545 let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1547 let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1548
1549 primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1551 primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1552
1553 let old_proposal_lock_round = *primary.propose_lock.lock().await;
1555 *primary.propose_lock.lock().await = round + 1;
1556
1557 assert!(primary.propose_batch().await.is_ok());
1559 assert!(primary.proposed_batch.read().is_none());
1560
1561 *primary.propose_lock.lock().await = old_proposal_lock_round;
1563
1564 assert!(primary.propose_batch().await.is_ok());
1566
1567 }
1570
1571 #[tokio::test]
1572 async fn test_propose_batch_with_storage_round_behind_proposal() {
1573 let round = 5;
1574 let mut rng = TestRng::default();
1575 let (primary, accounts) = primary_without_handlers(&mut rng).await;
1576
1577 let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
1579
1580 let timestamp = now();
1582 let proposal = create_test_proposal(
1583 &primary.account,
1584 primary.ledger.current_committee().unwrap(),
1585 round + 1,
1586 previous_certificates,
1587 timestamp,
1588 &mut rng,
1589 );
1590
1591 *primary.proposed_batch.write() = Some(proposal);
1593
1594 assert!(primary.propose_batch().await.is_ok());
1596 assert!(primary.proposed_batch.read().is_some());
1597 assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
1598 }
1599}