1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 Primary,
19 helpers::{
20 BFTReceiver,
21 ConsensusSender,
22 DAG,
23 PrimaryReceiver,
24 PrimarySender,
25 Storage,
26 fmt_id,
27 init_bft_channels,
28 now,
29 },
30};
31use aleo_std::StorageMode;
32use amareleo_chain_account::Account;
33use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
34use amareleo_node_bft_ledger_service::LedgerService;
35use snarkvm::{
36 console::account::Address,
37 ledger::{
38 block::Transaction,
39 committee::Committee,
40 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
41 puzzle::{Solution, SolutionID},
42 },
43 prelude::{Field, Network, Result, bail, ensure},
44};
45
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48#[cfg(feature = "locktick")]
49use locktick::{
50 parking_lot::{Mutex, RwLock},
51 tokio::Mutex as TMutex,
52};
53#[cfg(not(feature = "locktick"))]
54use parking_lot::{Mutex, RwLock};
55use std::{
56 collections::{BTreeMap, HashSet},
57 future::Future,
58 sync::{
59 Arc,
60 atomic::{AtomicI64, Ordering},
61 },
62};
63#[cfg(not(feature = "locktick"))]
64use tokio::sync::Mutex as TMutex;
65use tokio::{
66 sync::{OnceCell, oneshot},
67 task::JoinHandle,
68};
69use tracing::subscriber::DefaultGuard;
70
71#[derive(Clone)]
72pub struct BFT<N: Network> {
73 primary: Primary<N>,
75 dag: Arc<RwLock<DAG<N>>>,
77 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
79 leader_certificate_timer: Arc<AtomicI64>,
81 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
83 tracing: Option<TracingHandler>,
85 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
87 bft_lock: Arc<TMutex<()>>,
89}
90
91impl<N: Network> TracingHandlerGuard for BFT<N> {
92 fn get_tracing_guard(&self) -> Option<DefaultGuard> {
94 self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
95 }
96}
97
98impl<N: Network> BFT<N> {
99 pub fn new(
101 account: Account<N>,
102 storage: Storage<N>,
103 keep_state: bool,
104 storage_mode: StorageMode,
105 ledger: Arc<dyn LedgerService<N>>,
106 tracing: Option<TracingHandler>,
107 ) -> Result<Self> {
108 Ok(Self {
109 primary: Primary::new(account, storage, keep_state, storage_mode, ledger, tracing.clone())?,
110 dag: Default::default(),
111 leader_certificate: Default::default(),
112 leader_certificate_timer: Default::default(),
113 consensus_sender: Default::default(),
114 tracing: tracing.clone(),
115 handles: Default::default(),
116 bft_lock: Default::default(),
117 })
118 }
119
120 pub async fn run(
122 &mut self,
123 consensus_sender: Option<ConsensusSender<N>>,
124 primary_sender: PrimarySender<N>,
125 primary_receiver: PrimaryReceiver<N>,
126 ) -> Result<()> {
127 guard_info!(self, "Starting the BFT instance...");
128
129 let (bft_sender, bft_receiver) = init_bft_channels::<N>();
131
132 self.start_handlers(bft_receiver);
134
135 let result = self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await;
137 if let Err(err) = result {
138 guard_error!(self, "BFT failed to run the primary instance - {err}");
139 self.shut_down().await;
140 return Err(err);
141 }
142
143 if let Some(consensus_sender) = consensus_sender {
147 let result = self.consensus_sender.set(consensus_sender);
148 if result.is_err() {
149 self.shut_down().await;
150 guard_error!(self, "Consensus sender already set");
151 bail!("Consensus sender already set");
152 }
153 }
154 Ok(())
155 }
156
157 pub fn is_synced(&self) -> bool {
159 self.primary.is_synced()
160 }
161
162 pub const fn primary(&self) -> &Primary<N> {
164 &self.primary
165 }
166
167 pub const fn storage(&self) -> &Storage<N> {
169 self.primary.storage()
170 }
171
172 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
174 self.primary.ledger()
175 }
176
177 pub fn leader(&self) -> Option<Address<N>> {
179 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
180 }
181
182 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
184 &self.leader_certificate
185 }
186}
187
188impl<N: Network> BFT<N> {
189 pub fn num_unconfirmed_transmissions(&self) -> usize {
191 self.primary.num_unconfirmed_transmissions()
192 }
193
194 pub fn num_unconfirmed_ratifications(&self) -> usize {
196 self.primary.num_unconfirmed_ratifications()
197 }
198
199 pub fn num_unconfirmed_solutions(&self) -> usize {
201 self.primary.num_unconfirmed_solutions()
202 }
203
204 pub fn num_unconfirmed_transactions(&self) -> usize {
206 self.primary.num_unconfirmed_transactions()
207 }
208}
209
210impl<N: Network> BFT<N> {
211 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
213 self.primary.worker_transmission_ids()
214 }
215
216 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
218 self.primary.worker_transmissions()
219 }
220
221 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
223 self.primary.worker_solutions()
224 }
225
226 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
228 self.primary.worker_transactions()
229 }
230}
231
232impl<N: Network> BFT<N> {
233 fn update_to_next_round(&self, current_round: u64) -> bool {
235 let storage_round = self.storage().current_round();
237 if current_round < storage_round {
238 guard_debug!(
239 self,
240 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
241 );
242 return false;
243 }
244
245 let is_ready = match current_round % 2 == 0 {
247 true => self.update_leader_certificate_to_even_round(current_round),
248 false => self.is_leader_quorum_or_nonleaders_available(current_round),
249 };
250
251 #[cfg(feature = "metrics")]
252 {
253 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
254 if start > 0 {
256 let end = now();
257 let elapsed = std::time::Duration::from_secs((end - start) as u64);
258 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
259 }
260 }
261
262 if current_round % 2 == 0 {
264 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
266 if !is_ready {
268 guard_trace!(self, is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
269 }
270 let leader_round = leader_certificate.round();
272 match leader_round == current_round {
273 true => {
274 guard_info!(
275 self,
276 "\n\nRound {current_round} elected a leader - {}\n",
277 leader_certificate.author()
278 );
279 #[cfg(feature = "metrics")]
280 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
281 }
282 false => {
283 guard_warn!(self, "BFT failed to elect a leader for round {current_round} (!= {leader_round})")
284 }
285 }
286 } else {
287 match is_ready {
288 true => guard_info!(self, "\n\nRound {current_round} reached quorum without a leader\n"),
289 false => {
290 guard_info!(self, "{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed())
291 }
292 }
293 }
294 }
295
296 if is_ready {
298 if let Err(e) = self.storage().increment_to_next_round(current_round) {
300 guard_warn!(self, "BFT failed to increment to the next round from round {current_round} - {e}");
301 return false;
302 }
303 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
305 }
306
307 is_ready
308 }
309
310 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
316 let current_round = self.storage().current_round();
318 if current_round != even_round {
320 guard_warn!(
321 self,
322 "BFT storage (at round {current_round}) is out of sync with the current even round {even_round}"
323 );
324 return false;
325 }
326
327 if current_round % 2 != 0 || current_round < 2 {
329 guard_error!(self, "BFT cannot update the leader certificate in an odd round");
330 return false;
331 }
332
333 let current_certificates = self.storage().get_certificates_for_round(current_round);
335 if current_certificates.is_empty() {
337 *self.leader_certificate.write() = None;
339 return false;
340 }
341
342 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
344 Ok(committee) => committee,
345 Err(e) => {
346 guard_error!(
347 self,
348 "BFT failed to retrieve the committee lookback for the even round {current_round} - {e}"
349 );
350 return false;
351 }
352 };
353 let leader = match self.ledger().latest_leader() {
355 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
356 _ => {
357 let computed_leader = self.primary.account().address();
359 self.ledger().update_latest_leader(current_round, computed_leader);
369
370 computed_leader
371 }
372 };
373 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
375 *self.leader_certificate.write() = leader_certificate.cloned();
376
377 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
378 }
379
380 fn is_even_round_ready_for_next_round(
384 &self,
385 certificates: IndexSet<BatchCertificate<N>>,
386 committee: Committee<N>,
387 current_round: u64,
388 ) -> bool {
389 let authors = certificates.into_iter().map(|c| c.author()).collect();
391 if !committee.is_quorum_threshold_reached(&authors) {
393 guard_trace!(self, "BFT failed to reach quorum threshold in even round {current_round}");
394 return false;
395 }
396 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
398 if leader_certificate.round() == current_round {
399 return true;
400 }
401 }
402 if self.is_timer_expired() {
404 guard_debug!(
405 self,
406 "BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)"
407 );
408 return true;
409 }
410 false
412 }
413
414 fn is_timer_expired(&self) -> bool {
416 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
417 }
418
419 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
424 let current_round = self.storage().current_round();
426 if current_round != odd_round {
428 guard_warn!(
429 self,
430 "BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}"
431 );
432 return false;
433 }
434 if current_round % 2 != 1 {
436 guard_error!(self, "BFT does not compute stakes for the leader certificate in an even round");
437 return false;
438 }
439 let current_certificates = self.storage().get_certificates_for_round(current_round);
441 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
443 Ok(committee) => committee,
444 Err(e) => {
445 guard_error!(
446 self,
447 "BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}"
448 );
449 return false;
450 }
451 };
452 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
454 if !committee_lookback.is_quorum_threshold_reached(&authors) {
456 guard_trace!(self, "BFT failed reach quorum threshold in odd round {current_round}. ");
457 return false;
458 }
459 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
461 return true;
463 };
464 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
466 leader_certificate.id(),
467 current_certificates,
468 &committee_lookback,
469 );
470 stake_with_leader >= committee_lookback.availability_threshold()
472 || stake_without_leader >= committee_lookback.quorum_threshold()
473 || self.is_timer_expired()
474 }
475
476 fn compute_stake_for_leader_certificate(
478 &self,
479 leader_certificate_id: Field<N>,
480 current_certificates: IndexSet<BatchCertificate<N>>,
481 current_committee: &Committee<N>,
482 ) -> (u64, u64) {
483 if current_certificates.is_empty() {
485 return (0, 0);
486 }
487
488 let mut stake_with_leader = 0u64;
490 let mut stake_without_leader = 0u64;
492 for certificate in current_certificates {
494 let stake = current_committee.get_stake(certificate.author());
496 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
498 true => stake_with_leader = stake_with_leader.saturating_add(stake),
500 false => stake_without_leader = stake_without_leader.saturating_add(stake),
502 }
503 }
504 (stake_with_leader, stake_without_leader)
506 }
507}
508
509impl<N: Network> BFT<N> {
510 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
512 &self,
513 certificate: BatchCertificate<N>,
514 ) -> Result<()> {
515 let _lock = self.bft_lock.lock().await;
517 let certificate_round = certificate.round();
519 self.dag.write().insert(certificate, self);
521
522 let commit_round = certificate_round.saturating_sub(1);
524 if commit_round % 2 != 0 || commit_round < 2 {
526 return Ok(());
527 }
528 if commit_round <= self.dag.read().last_committed_round() {
530 return Ok(());
531 }
532
533 guard_info!(self, "Checking if the leader is ready to be committed for round {commit_round}...");
535
536 let Ok(_committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
538 bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
539 };
540
541 let leader = match self.ledger().latest_leader() {
543 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
544 _ => {
545 let computed_leader = self.primary.account().address();
547 self.ledger().update_latest_leader(commit_round, computed_leader);
553
554 computed_leader
555 }
556 };
557
558 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
560 else {
561 guard_trace!(self, "BFT did not find the leader certificate for commit round {commit_round} yet");
562 return Ok(());
563 };
564 let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
566 bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
568 };
569 let Ok(certificate_committee_lookback) = self.ledger().get_committee_lookback_for_round(certificate_round)
571 else {
572 bail!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}");
573 };
574 let authors = certificates
576 .values()
577 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
578 true => Some(c.author()),
579 false => None,
580 })
581 .collect();
582 if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
584 guard_trace!(self, "BFT is not ready to commit {commit_round}");
586 return Ok(());
587 }
588
589 guard_info!(self, "Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
591
592 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
594 }
595
596 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
598 &self,
599 leader_certificate: BatchCertificate<N>,
600 ) -> Result<()> {
601 let latest_leader_round = leader_certificate.round();
603 let mut leader_certificates = vec![leader_certificate.clone()];
606 {
607 let leader_round = leader_certificate.round();
609
610 let mut current_certificate = leader_certificate;
611 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
612 {
613 let leader = match self.ledger().latest_leader() {
623 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
624 _ => {
625 let computed_leader = self.primary.account().address();
627 self.ledger().update_latest_leader(round, computed_leader);
636
637 computed_leader
638 }
639 };
640 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
642 else {
643 continue;
644 };
645 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
647 leader_certificates.push(previous_certificate.clone());
649 current_certificate = previous_certificate;
651 }
652 }
653 }
654
655 for leader_certificate in leader_certificates.into_iter().rev() {
657 let leader_round = leader_certificate.round();
659 let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
661 Ok(subdag) => subdag,
662 Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
663 };
664 if !IS_SYNCING {
666 let mut transmissions = IndexMap::new();
668 let mut seen_transaction_ids = IndexSet::new();
670 let mut seen_solution_ids = IndexSet::new();
672 for certificate in commit_subdag.values().flatten() {
674 for transmission_id in certificate.transmission_ids() {
676 match transmission_id {
680 TransmissionID::Solution(solution_id, _) => {
681 if seen_solution_ids.contains(&solution_id) {
683 continue;
684 }
685 }
686 TransmissionID::Transaction(transaction_id, _) => {
687 if seen_transaction_ids.contains(transaction_id) {
689 continue;
690 }
691 }
692 TransmissionID::Ratification => {
693 bail!("Ratifications are currently not supported in the BFT.")
694 }
695 }
696 if transmissions.contains_key(transmission_id) {
698 continue;
699 }
700 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
703 continue;
704 }
705 let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
707 bail!(
708 "BFT failed to retrieve transmission '{}.{}' from round {}",
709 fmt_id(transmission_id),
710 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
711 certificate.round()
712 );
713 };
714 match transmission_id {
716 TransmissionID::Solution(id, _) => {
717 seen_solution_ids.insert(id);
718 }
719 TransmissionID::Transaction(id, _) => {
720 seen_transaction_ids.insert(id);
721 }
722 TransmissionID::Ratification => {}
723 }
724 transmissions.insert(*transmission_id, transmission);
726 }
727 }
728 let subdag = Subdag::from(commit_subdag.clone())?;
731 let anchor_round = subdag.anchor_round();
733 let num_transmissions = transmissions.len();
735 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
737
738 ensure!(
740 anchor_round == leader_round,
741 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
742 );
743
744 if let Some(consensus_sender) = self.consensus_sender.get() {
746 let (callback_sender, callback_receiver) = oneshot::channel();
748 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
750 match callback_receiver.await {
752 Ok(Ok(())) => (), Ok(Err(e)) => {
754 guard_error!(self, "BFT failed to advance the subdag for round {anchor_round} - {e}");
755 return Ok(());
756 }
757 Err(e) => {
758 guard_error!(self, "BFT failed to receive the callback for round {anchor_round} - {e}");
759 return Ok(());
760 }
761 }
762 }
763
764 guard_info!(
765 self,
766 "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
767 );
768 }
769
770 let mut dag_write = self.dag.write();
772 for certificate in commit_subdag.values().flatten() {
773 dag_write.commit(certificate, self.storage().max_gc_rounds());
774 }
775 }
776
777 self.storage().garbage_collect_certificates(latest_leader_round);
779
780 Ok(())
781 }
782
783 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
785 &self,
786 leader_certificate: BatchCertificate<N>,
787 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
788 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
790 let mut already_ordered = HashSet::new();
792 let mut buffer = vec![leader_certificate];
794 while let Some(certificate) = buffer.pop() {
796 commit.entry(certificate.round()).or_default().insert(certificate.clone());
798
799 let previous_round = certificate.round().saturating_sub(1);
801 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
802 continue;
803 }
804 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
808 if already_ordered.contains(previous_certificate_id) {
810 continue;
811 }
812 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
814 continue;
815 }
816 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
818 continue;
819 }
820
821 let previous_certificate = {
823 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
825 Some(previous_certificate) => previous_certificate,
827 None => match self.storage().get_certificate(*previous_certificate_id) {
829 Some(previous_certificate) => previous_certificate,
831 None => bail!(
833 "Missing previous certificate {} for round {previous_round}",
834 fmt_id(previous_certificate_id)
835 ),
836 },
837 }
838 };
839 already_ordered.insert(previous_certificate.id());
841 buffer.push(previous_certificate);
843 }
844 }
845 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
847 Ok(commit)
849 }
850
851 fn is_linked(
853 &self,
854 previous_certificate: BatchCertificate<N>,
855 current_certificate: BatchCertificate<N>,
856 ) -> Result<bool> {
857 let mut traversal = vec![current_certificate.clone()];
859 for round in (previous_certificate.round()..current_certificate.round()).rev() {
861 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
863 bail!("BFT failed to retrieve the certificates for past round {round}");
866 };
867 traversal = certificates
869 .into_values()
870 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
871 .collect();
872 }
873 Ok(traversal.contains(&previous_certificate))
874 }
875}
876
877impl<N: Network> BFT<N> {
878 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
880 let BFTReceiver { mut rx_primary_round, mut rx_primary_certificate, mut rx_sync_bft_dag_at_bootup } =
881 bft_receiver;
882
883 let self_ = self.clone();
885 self.spawn(async move {
886 while let Some((current_round, callback)) = rx_primary_round.recv().await {
887 callback.send(self_.update_to_next_round(current_round)).ok();
888 }
889 });
890
891 let self_ = self.clone();
893 self.spawn(async move {
894 while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
895 let result = self_.update_dag::<true, false>(certificate).await;
897 callback.send(result).ok();
900 }
901 });
902
903 let self_ = self.clone();
905 self.spawn(async move {
906 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
907 self_.sync_bft_dag_at_bootup(certificates).await;
908 }
909 });
910 }
911
912 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
919 let mut dag = self.dag.write();
921
922 for certificate in certificates {
924 dag.commit(&certificate, self.storage().max_gc_rounds());
925 }
926 }
927
928 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
930 self.handles.lock().push(tokio::spawn(future));
931 }
932
933 pub async fn shut_down(&self) {
935 guard_info!(self, "Shutting down the BFT...");
936 let _lock = self.bft_lock.lock().await;
938
939 self.primary.shut_down().await;
941
942 let mut handles = self.handles.lock();
944 handles.iter().for_each(|handle| handle.abort());
945 handles.clear();
946 }
947}
948
949#[cfg(test)]
950mod tests {
951 use crate::{
952 BFT,
953 DEVELOPMENT_MODE_RNG_SEED,
954 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
955 helpers::{Storage, amareleo_storage_mode, default_ledger_dir},
956 };
957
958 use amareleo_chain_account::Account;
959 use amareleo_node_bft_ledger_service::MockLedgerService;
960 use amareleo_node_bft_storage_service::BFTMemoryService;
961 use snarkvm::{
962 console::account::{Address, PrivateKey},
963 ledger::{
964 committee::Committee,
965 narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
966 },
967 utilities::TestRng,
968 };
969
970 use aleo_std::StorageMode;
971 use anyhow::Result;
972 use indexmap::{IndexMap, IndexSet};
973 use rand::SeedableRng;
974 use rand_chacha::ChaChaRng;
975 use std::sync::Arc;
976
977 type CurrentNetwork = snarkvm::console::network::MainnetV0;
978
979 fn sample_test_instance(
981 committee_round: Option<u64>,
982 max_gc_rounds: u64,
983 rng: &mut TestRng,
984 ) -> (
985 Committee<CurrentNetwork>,
986 Account<CurrentNetwork>,
987 Arc<MockLedgerService<CurrentNetwork>>,
988 Storage<CurrentNetwork>,
989 ) {
990 let committee = match committee_round {
991 Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
992 None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
993 };
994 let account = Account::new(rng).unwrap();
995 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
996 let transmissions = Arc::new(BFTMemoryService::new());
997 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
998
999 (committee, account, ledger, storage)
1000 }
1001
1002 #[test]
1003 #[tracing_test::traced_test]
1004 fn test_is_leader_quorum_odd() -> Result<()> {
1005 let rng = &mut TestRng::default();
1006
1007 let mut certificates = IndexSet::new();
1009 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1010 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1011 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1012 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1013
1014 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1016 1,
1017 vec![
1018 certificates[0].author(),
1019 certificates[1].author(),
1020 certificates[2].author(),
1021 certificates[3].author(),
1022 ],
1023 rng,
1024 );
1025
1026 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1028 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1030 let account = Account::new(rng)?;
1032 let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1034 assert!(bft.is_timer_expired());
1035 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1037 assert!(!result);
1039 for certificate in certificates.iter() {
1041 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1042 }
1043 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1045 assert!(result); let leader_certificate = sample_batch_certificate(rng);
1048 *bft.leader_certificate.write() = Some(leader_certificate);
1049 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1051 assert!(result); Ok(())
1054 }
1055
1056 #[test]
1057 #[tracing_test::traced_test]
1058 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1059 let rng = &mut TestRng::default();
1060
1061 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1063 assert_eq!(committee.starting_round(), 1);
1064 assert_eq!(storage.current_round(), 1);
1065 assert_eq!(storage.max_gc_rounds(), 10);
1066
1067 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1069 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1074 assert!(!result);
1075 Ok(())
1076 }
1077
1078 #[test]
1079 #[tracing_test::traced_test]
1080 fn test_is_leader_quorum_even() -> Result<()> {
1081 let rng = &mut TestRng::default();
1082
1083 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1085 assert_eq!(committee.starting_round(), 2);
1086 assert_eq!(storage.current_round(), 2);
1087 assert_eq!(storage.max_gc_rounds(), 10);
1088
1089 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1091 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1095 assert!(!result);
1096 Ok(())
1097 }
1098
1099 #[test]
1100 #[tracing_test::traced_test]
1101 fn test_is_even_round_ready() -> Result<()> {
1102 let rng = &mut TestRng::default();
1103
1104 let mut certificates = IndexSet::new();
1106 certificates.insert(sample_batch_certificate_for_round(2, rng));
1107 certificates.insert(sample_batch_certificate_for_round(2, rng));
1108 certificates.insert(sample_batch_certificate_for_round(2, rng));
1109 certificates.insert(sample_batch_certificate_for_round(2, rng));
1110
1111 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1113 2,
1114 vec![
1115 certificates[0].author(),
1116 certificates[1].author(),
1117 certificates[2].author(),
1118 certificates[3].author(),
1119 ],
1120 rng,
1121 );
1122
1123 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1125 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1127 let account = Account::new(rng)?;
1129 let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1131 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1133 *bft.leader_certificate.write() = Some(leader_certificate);
1134 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1135 assert!(!result);
1137 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1139 assert!(result);
1140
1141 let bft_timer =
1143 BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1144 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1146 if !bft_timer.is_timer_expired() {
1147 assert!(!result);
1148 }
1149 let leader_certificate_timeout =
1151 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1152 std::thread::sleep(leader_certificate_timeout);
1153 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1155 if bft_timer.is_timer_expired() {
1156 assert!(result);
1157 } else {
1158 assert!(!result);
1159 }
1160
1161 Ok(())
1162 }
1163
1164 #[test]
1165 #[tracing_test::traced_test]
1166 fn test_update_leader_certificate_odd() -> Result<()> {
1167 let rng = &mut TestRng::default();
1168
1169 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1171 assert_eq!(storage.max_gc_rounds(), 10);
1172
1173 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1175
1176 let result = bft.update_leader_certificate_to_even_round(1);
1178 assert!(!result);
1179 Ok(())
1180 }
1181
1182 #[test]
1183 #[tracing_test::traced_test]
1184 fn test_update_leader_certificate_bad_round() -> Result<()> {
1185 let rng = &mut TestRng::default();
1186
1187 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1189 assert_eq!(storage.max_gc_rounds(), 10);
1190
1191 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1193
1194 let result = bft.update_leader_certificate_to_even_round(6);
1196 assert!(!result);
1197 Ok(())
1198 }
1199
1200 #[test]
1201 #[tracing_test::traced_test]
1202 fn test_update_leader_certificate_even() -> Result<()> {
1203 let rng = &mut TestRng::default();
1204
1205 let current_round = 3;
1207
1208 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1210 current_round,
1211 rng,
1212 );
1213
1214 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1216 2,
1217 vec![
1218 certificates[0].author(),
1219 certificates[1].author(),
1220 certificates[2].author(),
1221 certificates[3].author(),
1222 ],
1223 rng,
1224 );
1225
1226 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1228
1229 let transmissions = Arc::new(BFTMemoryService::new());
1231 let storage = Storage::new(ledger.clone(), transmissions, 10, None);
1232 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1233 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1234 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1235 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1236 assert_eq!(storage.current_round(), 2);
1237
1238 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1241
1242 let account = Account::new(rng)?;
1244 let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1245
1246 *bft.leader_certificate.write() = Some(leader_certificate);
1248
1249 let result = bft.update_leader_certificate_to_even_round(2);
1252 assert!(result);
1253
1254 Ok(())
1255 }
1256
1257 #[tokio::test]
1258 #[tracing_test::traced_test]
1259 async fn test_order_dag_with_dfs() -> Result<()> {
1260 let rng = &mut TestRng::default();
1261
1262 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1264
1265 let previous_round = 2; let current_round = previous_round + 1;
1268
1269 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1271 current_round,
1272 rng,
1273 );
1274
1275 {
1279 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1281 let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1283
1284 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1286
1287 for certificate in previous_certificates.clone() {
1289 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1290 }
1291
1292 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1294 assert!(result.is_ok());
1295 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1296 assert_eq!(candidate_certificates.len(), 1);
1297 let expected_certificates = vec![certificate.clone()];
1298 assert_eq!(
1299 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1300 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1301 );
1302 assert_eq!(candidate_certificates, expected_certificates);
1303 }
1304
1305 {
1309 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1311 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1313
1314 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1316
1317 for certificate in previous_certificates.clone() {
1319 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1320 }
1321
1322 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1324 assert!(result.is_ok());
1325 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1326 assert_eq!(candidate_certificates.len(), 5);
1327 let expected_certificates = vec![
1328 previous_certificates[0].clone(),
1329 previous_certificates[1].clone(),
1330 previous_certificates[2].clone(),
1331 previous_certificates[3].clone(),
1332 certificate,
1333 ];
1334 assert_eq!(
1335 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1336 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1337 );
1338 assert_eq!(candidate_certificates, expected_certificates);
1339 }
1340
1341 Ok(())
1342 }
1343
1344 #[test]
1345 #[tracing_test::traced_test]
1346 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1347 let rng = &mut TestRng::default();
1348
1349 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1351 assert_eq!(committee.starting_round(), 1);
1352 assert_eq!(storage.current_round(), 1);
1353 assert_eq!(storage.max_gc_rounds(), 1);
1354
1355 let previous_round = 2; let current_round = previous_round + 1;
1358
1359 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1361 current_round,
1362 rng,
1363 );
1364 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1366
1367 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1371
1372 let error_msg = format!(
1374 "Missing previous certificate {} for round {previous_round}",
1375 crate::helpers::fmt_id(previous_certificate_ids[3]),
1376 );
1377
1378 let result = bft.order_dag_with_dfs::<false>(certificate);
1380 assert!(result.is_err());
1381 assert_eq!(result.unwrap_err().to_string(), error_msg);
1382 Ok(())
1383 }
1384
1385 #[tokio::test]
1386 #[tracing_test::traced_test]
1387 async fn test_bft_gc_on_commit() -> Result<()> {
1388 let rng = &mut TestRng::default();
1389
1390 let max_gc_rounds = 1;
1392 let committee_round = 0;
1393 let commit_round = 2;
1394 let current_round = commit_round + 1;
1395
1396 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1398 current_round,
1399 rng,
1400 );
1401
1402 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1404 committee_round,
1405 vec![
1406 certificates[0].author(),
1407 certificates[1].author(),
1408 certificates[2].author(),
1409 certificates[3].author(),
1410 ],
1411 rng,
1412 );
1413
1414 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1416
1417 let transmissions = Arc::new(BFTMemoryService::new());
1419 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
1420 for certificate in certificates.iter() {
1422 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1423 }
1424
1425 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1428
1429 let account = Account::new(rng)?;
1431 let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1432 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1434
1435 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1437
1438 for certificate in certificates {
1440 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1441 }
1442
1443 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1445
1446 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1448
1449 Ok(())
1450 }
1451
1452 #[tokio::test]
1453 #[tracing_test::traced_test]
1454 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1455 let rng = &mut TestRng::default();
1456
1457 let max_gc_rounds = 1;
1459 let committee_round = 0;
1460 let commit_round = 2;
1461 let current_round = commit_round + 1;
1462
1463 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1465 current_round,
1466 rng,
1467 );
1468
1469 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1471 committee_round,
1472 vec![
1473 certificates[0].author(),
1474 certificates[1].author(),
1475 certificates[2].author(),
1476 certificates[3].author(),
1477 ],
1478 rng,
1479 );
1480
1481 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1483
1484 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1486 for certificate in certificates.iter() {
1488 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1489 }
1490
1491 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1494
1495 let account = Account::new(rng)?;
1497 let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1498
1499 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1501
1502 for certificate in certificates.clone() {
1504 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1505 }
1506
1507 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1509
1510 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1514 let bootup_bft = BFT::new(account, storage_2, false, StorageMode::Development(0), ledger, None)?;
1516
1517 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1519
1520 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1522
1523 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1525 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1526
1527 for certificate in certificates {
1529 let certificate_round = certificate.round();
1530 let certificate_id = certificate.id();
1531 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1533 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1536 }
1537
1538 Ok(())
1539 }
1540
1541 #[tokio::test]
1542 #[tracing_test::traced_test]
1543 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1544 let rng = &mut TestRng::default();
1551 let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1552
1553 let private_keys = vec![
1554 PrivateKey::new(rng_pks).unwrap(),
1555 PrivateKey::new(rng_pks).unwrap(),
1556 PrivateKey::new(rng_pks).unwrap(),
1557 PrivateKey::new(rng_pks).unwrap(),
1558 ];
1559 let address0 = Address::try_from(private_keys[0])?;
1560
1561 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1563 let committee_round = 0;
1564 let commit_round = 2;
1565 let current_round = commit_round + 1;
1566 let next_round = current_round + 1;
1567
1568 let (round_to_certificates_map, committee) = {
1570 let addresses = vec![
1571 Address::try_from(private_keys[0])?,
1572 Address::try_from(private_keys[1])?,
1573 Address::try_from(private_keys[2])?,
1574 Address::try_from(private_keys[3])?,
1575 ];
1576 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1577 committee_round,
1578 addresses,
1579 rng,
1580 );
1581 let mut round_to_certificates_map: IndexMap<
1583 u64,
1584 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1585 > = IndexMap::new();
1586 let mut previous_certificates = IndexSet::with_capacity(4);
1587 for _ in 0..4 {
1589 previous_certificates.insert(sample_batch_certificate(rng));
1590 }
1591 for round in 0..commit_round + 3 {
1592 let mut current_certificates = IndexSet::new();
1593 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1594 IndexSet::new()
1595 } else {
1596 previous_certificates.iter().map(|c| c.id()).collect()
1597 };
1598 let transmission_ids =
1599 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1600 .into_iter()
1601 .collect::<IndexSet<_>>();
1602 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1603 let committee_id = committee.id();
1604 for (i, private_key_1) in private_keys.iter().enumerate() {
1605 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1606 private_key_1,
1607 round,
1608 timestamp,
1609 committee_id,
1610 transmission_ids.clone(),
1611 previous_certificate_ids.clone(),
1612 rng,
1613 )
1614 .unwrap();
1615 let mut signatures = IndexSet::with_capacity(4);
1616 for (j, private_key_2) in private_keys.iter().enumerate() {
1617 if i != j {
1618 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1619 }
1620 }
1621 let certificate =
1622 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1623 current_certificates.insert(certificate);
1624 }
1625 round_to_certificates_map.insert(round, current_certificates.clone());
1627 previous_certificates = current_certificates.clone();
1628 }
1629 (round_to_certificates_map, committee)
1630 };
1631
1632 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1634 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1636 let leader = address0; let next_leader = address0; let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1641 for i in 1..=commit_round {
1642 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1643 if i == commit_round {
1644 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1646 if let Some(c) = leader_certificate {
1647 pre_shutdown_certificates.push(c.clone());
1648 }
1649 continue;
1650 }
1651 pre_shutdown_certificates.extend(certificates);
1652 }
1653 for certificate in pre_shutdown_certificates.iter() {
1654 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1655 }
1656 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1658 Vec::new();
1659 for j in commit_round..=commit_round + 2 {
1660 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1661 post_shutdown_certificates.extend(certificate);
1662 }
1663 for certificate in post_shutdown_certificates.iter() {
1664 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1665 }
1666 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1668 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1669
1670 let account = Account::try_from(private_keys[0])?;
1672 let ledger_dir = default_ledger_dir(1, true, "0");
1673 let storage_mode = amareleo_storage_mode(ledger_dir);
1674 let bft = BFT::new(account.clone(), storage, true, storage_mode.clone(), ledger.clone(), None)?;
1675
1676 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1678
1679 for certificate in pre_shutdown_certificates.clone() {
1681 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1682 }
1683
1684 for certificate in post_shutdown_certificates.clone() {
1686 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1687 }
1688 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1690 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1691 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1692
1693 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1697
1698 let bootup_bft = BFT::new(account, bootup_storage.clone(), true, storage_mode.clone(), ledger.clone(), None)?;
1700
1701 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1703
1704 for certificate in post_shutdown_certificates.iter() {
1706 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1707 }
1708 for certificate in post_shutdown_certificates.clone() {
1709 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1710 }
1711 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1713 let commit_subdag_metadata_bootup =
1714 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1715 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1716 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1717
1718 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1722
1723 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1725 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1726 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1727 assert!(
1728 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1729 );
1730
1731 for certificate in pre_shutdown_certificates.clone() {
1733 let certificate_round = certificate.round();
1734 let certificate_id = certificate.id();
1735 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1737 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1738 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1741 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1742 }
1743
1744 for certificate in committed_certificates_bootup.clone() {
1746 let certificate_round = certificate.round();
1747 let certificate_id = certificate.id();
1748 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1750 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1751 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1754 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1755 }
1756
1757 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1759
1760 Ok(())
1761 }
1762
1763 #[tokio::test]
1764 #[tracing_test::traced_test]
1765 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1766 let rng = &mut TestRng::default();
1773 let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1774
1775 let private_keys = vec![
1776 PrivateKey::new(rng_pks).unwrap(),
1777 PrivateKey::new(rng_pks).unwrap(),
1778 PrivateKey::new(rng_pks).unwrap(),
1779 PrivateKey::new(rng_pks).unwrap(),
1780 ];
1781 let address0 = Address::try_from(private_keys[0])?;
1782
1783 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1785 let committee_round = 0;
1786 let commit_round = 2;
1787 let current_round = commit_round + 1;
1788 let next_round = current_round + 1;
1789
1790 let (round_to_certificates_map, committee) = {
1792 let addresses = vec![
1793 Address::try_from(private_keys[0])?,
1794 Address::try_from(private_keys[1])?,
1795 Address::try_from(private_keys[2])?,
1796 Address::try_from(private_keys[3])?,
1797 ];
1798 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1799 committee_round,
1800 addresses,
1801 rng,
1802 );
1803 let mut round_to_certificates_map: IndexMap<
1805 u64,
1806 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1807 > = IndexMap::new();
1808 let mut previous_certificates = IndexSet::with_capacity(4);
1809 for _ in 0..4 {
1811 previous_certificates.insert(sample_batch_certificate(rng));
1812 }
1813 for round in 0..=commit_round + 2 {
1814 let mut current_certificates = IndexSet::new();
1815 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1816 IndexSet::new()
1817 } else {
1818 previous_certificates.iter().map(|c| c.id()).collect()
1819 };
1820 let transmission_ids =
1821 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1822 .into_iter()
1823 .collect::<IndexSet<_>>();
1824 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1825 let committee_id = committee.id();
1826 for (i, private_key_1) in private_keys.iter().enumerate() {
1827 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1828 private_key_1,
1829 round,
1830 timestamp,
1831 committee_id,
1832 transmission_ids.clone(),
1833 previous_certificate_ids.clone(),
1834 rng,
1835 )
1836 .unwrap();
1837 let mut signatures = IndexSet::with_capacity(4);
1838 for (j, private_key_2) in private_keys.iter().enumerate() {
1839 if i != j {
1840 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1841 }
1842 }
1843 let certificate =
1844 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1845 current_certificates.insert(certificate);
1846 }
1847 round_to_certificates_map.insert(round, current_certificates.clone());
1849 previous_certificates = current_certificates.clone();
1850 }
1851 (round_to_certificates_map, committee)
1852 };
1853
1854 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1856 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1858 let leader = address0; let next_leader = address0; let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1863 for i in 1..=commit_round {
1864 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1865 if i == commit_round {
1866 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1868 if let Some(c) = leader_certificate {
1869 pre_shutdown_certificates.push(c.clone());
1870 }
1871 continue;
1872 }
1873 pre_shutdown_certificates.extend(certificates);
1874 }
1875 for certificate in pre_shutdown_certificates.iter() {
1876 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1877 }
1878 let account = Account::try_from(private_keys[0])?;
1880 let bootup_bft =
1881 BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1882 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1884 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1886
1887 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1889 Vec::new();
1890 for j in commit_round..=commit_round + 2 {
1891 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1892 post_shutdown_certificates.extend(certificate);
1893 }
1894 for certificate in post_shutdown_certificates.iter() {
1895 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1896 }
1897
1898 for certificate in post_shutdown_certificates.clone() {
1900 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1901 }
1902
1903 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1905 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1906 let committed_certificates = commit_subdag.values().flatten();
1907
1908 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1910 for committed_certificate in committed_certificates.clone() {
1911 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1912 }
1913 }
1914 Ok(())
1915 }
1916}