1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 Primary,
19 helpers::{
20 BFTReceiver,
21 ConsensusSender,
22 DAG,
23 PrimaryReceiver,
24 PrimarySender,
25 Storage,
26 fmt_id,
27 init_bft_channels,
28 now,
29 },
30};
31use aleo_std::StorageMode;
32use amareleo_chain_account::Account;
33use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
34use amareleo_node_bft_ledger_service::LedgerService;
35use snarkvm::{
36 console::account::Address,
37 ledger::{
38 block::Transaction,
39 committee::Committee,
40 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
41 puzzle::{Solution, SolutionID},
42 },
43 prelude::{Field, Network, Result, bail, ensure},
44};
45
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48use parking_lot::{Mutex, RwLock};
49use std::{
50 collections::{BTreeMap, HashSet},
51 future::Future,
52 sync::{
53 Arc,
54 atomic::{AtomicI64, Ordering},
55 },
56};
57use tokio::{
58 sync::{Mutex as TMutex, OnceCell, oneshot},
59 task::JoinHandle,
60};
61use tracing::subscriber::DefaultGuard;
62
63#[derive(Clone)]
64pub struct BFT<N: Network> {
65 primary: Primary<N>,
67 dag: Arc<RwLock<DAG<N>>>,
69 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
71 leader_certificate_timer: Arc<AtomicI64>,
73 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
75 tracing: Option<TracingHandler>,
77 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
79 bft_lock: Arc<TMutex<()>>,
81}
82
83impl<N: Network> TracingHandlerGuard for BFT<N> {
84 fn get_tracing_guard(&self) -> Option<DefaultGuard> {
86 self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
87 }
88}
89
90impl<N: Network> BFT<N> {
91 pub fn new(
93 account: Account<N>,
94 storage: Storage<N>,
95 keep_state: bool,
96 storage_mode: StorageMode,
97 ledger: Arc<dyn LedgerService<N>>,
98 tracing: Option<TracingHandler>,
99 ) -> Result<Self> {
100 Ok(Self {
101 primary: Primary::new(account, storage, keep_state, storage_mode, ledger, tracing.clone())?,
102 dag: Default::default(),
103 leader_certificate: Default::default(),
104 leader_certificate_timer: Default::default(),
105 consensus_sender: Default::default(),
106 tracing: tracing.clone(),
107 handles: Default::default(),
108 bft_lock: Default::default(),
109 })
110 }
111
112 pub async fn run(
114 &mut self,
115 consensus_sender: Option<ConsensusSender<N>>,
116 primary_sender: PrimarySender<N>,
117 primary_receiver: PrimaryReceiver<N>,
118 ) -> Result<()> {
119 guard_info!(self, "Starting the BFT instance...");
120
121 let (bft_sender, bft_receiver) = init_bft_channels::<N>();
123
124 self.start_handlers(bft_receiver);
126
127 let result = self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await;
129 if let Err(err) = result {
130 guard_error!(self, "BFT failed to run the primary instance - {err}");
131 self.shut_down().await;
132 return Err(err);
133 }
134
135 if let Some(consensus_sender) = consensus_sender {
139 let result = self.consensus_sender.set(consensus_sender);
140 if result.is_err() {
141 self.shut_down().await;
142 guard_error!(self, "Consensus sender already set");
143 bail!("Consensus sender already set");
144 }
145 }
146 Ok(())
147 }
148
149 pub fn is_synced(&self) -> bool {
151 self.primary.is_synced()
152 }
153
154 pub const fn primary(&self) -> &Primary<N> {
156 &self.primary
157 }
158
159 pub const fn storage(&self) -> &Storage<N> {
161 self.primary.storage()
162 }
163
164 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
166 self.primary.ledger()
167 }
168
169 pub fn leader(&self) -> Option<Address<N>> {
171 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
172 }
173
174 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
176 &self.leader_certificate
177 }
178}
179
180impl<N: Network> BFT<N> {
181 pub fn num_unconfirmed_transmissions(&self) -> usize {
183 self.primary.num_unconfirmed_transmissions()
184 }
185
186 pub fn num_unconfirmed_ratifications(&self) -> usize {
188 self.primary.num_unconfirmed_ratifications()
189 }
190
191 pub fn num_unconfirmed_solutions(&self) -> usize {
193 self.primary.num_unconfirmed_solutions()
194 }
195
196 pub fn num_unconfirmed_transactions(&self) -> usize {
198 self.primary.num_unconfirmed_transactions()
199 }
200}
201
202impl<N: Network> BFT<N> {
203 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
205 self.primary.worker_transmission_ids()
206 }
207
208 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
210 self.primary.worker_transmissions()
211 }
212
213 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
215 self.primary.worker_solutions()
216 }
217
218 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
220 self.primary.worker_transactions()
221 }
222}
223
224impl<N: Network> BFT<N> {
225 fn update_to_next_round(&self, current_round: u64) -> bool {
227 let storage_round = self.storage().current_round();
229 if current_round < storage_round {
230 guard_debug!(
231 self,
232 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
233 );
234 return false;
235 }
236
237 let is_ready = match current_round % 2 == 0 {
239 true => self.update_leader_certificate_to_even_round(current_round),
240 false => self.is_leader_quorum_or_nonleaders_available(current_round),
241 };
242
243 #[cfg(feature = "metrics")]
244 {
245 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
246 if start > 0 {
248 let end = now();
249 let elapsed = std::time::Duration::from_secs((end - start) as u64);
250 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
251 }
252 }
253
254 if current_round % 2 == 0 {
256 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
258 if !is_ready {
260 guard_trace!(self, is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
261 }
262 let leader_round = leader_certificate.round();
264 match leader_round == current_round {
265 true => {
266 guard_info!(
267 self,
268 "\n\nRound {current_round} elected a leader - {}\n",
269 leader_certificate.author()
270 );
271 #[cfg(feature = "metrics")]
272 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
273 }
274 false => {
275 guard_warn!(self, "BFT failed to elect a leader for round {current_round} (!= {leader_round})")
276 }
277 }
278 } else {
279 match is_ready {
280 true => guard_info!(self, "\n\nRound {current_round} reached quorum without a leader\n"),
281 false => {
282 guard_info!(self, "{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed())
283 }
284 }
285 }
286 }
287
288 if is_ready {
290 if let Err(e) = self.storage().increment_to_next_round(current_round) {
292 guard_warn!(self, "BFT failed to increment to the next round from round {current_round} - {e}");
293 return false;
294 }
295 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
297 }
298
299 is_ready
300 }
301
302 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
308 let current_round = self.storage().current_round();
310 if current_round != even_round {
312 guard_warn!(
313 self,
314 "BFT storage (at round {current_round}) is out of sync with the current even round {even_round}"
315 );
316 return false;
317 }
318
319 if current_round % 2 != 0 || current_round < 2 {
321 guard_error!(self, "BFT cannot update the leader certificate in an odd round");
322 return false;
323 }
324
325 let current_certificates = self.storage().get_certificates_for_round(current_round);
327 if current_certificates.is_empty() {
329 *self.leader_certificate.write() = None;
331 return false;
332 }
333
334 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
336 Ok(committee) => committee,
337 Err(e) => {
338 guard_error!(
339 self,
340 "BFT failed to retrieve the committee lookback for the even round {current_round} - {e}"
341 );
342 return false;
343 }
344 };
345 let leader = match self.ledger().latest_leader() {
347 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
348 _ => {
349 let computed_leader = self.primary.account().address();
351 self.ledger().update_latest_leader(current_round, computed_leader);
361
362 computed_leader
363 }
364 };
365 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
367 *self.leader_certificate.write() = leader_certificate.cloned();
368
369 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
370 }
371
372 fn is_even_round_ready_for_next_round(
376 &self,
377 certificates: IndexSet<BatchCertificate<N>>,
378 committee: Committee<N>,
379 current_round: u64,
380 ) -> bool {
381 let authors = certificates.into_iter().map(|c| c.author()).collect();
383 if !committee.is_quorum_threshold_reached(&authors) {
385 guard_trace!(self, "BFT failed to reach quorum threshold in even round {current_round}");
386 return false;
387 }
388 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
390 if leader_certificate.round() == current_round {
391 return true;
392 }
393 }
394 if self.is_timer_expired() {
396 guard_debug!(
397 self,
398 "BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)"
399 );
400 return true;
401 }
402 false
404 }
405
406 fn is_timer_expired(&self) -> bool {
408 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
409 }
410
411 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
416 let current_round = self.storage().current_round();
418 if current_round != odd_round {
420 guard_warn!(
421 self,
422 "BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}"
423 );
424 return false;
425 }
426 if current_round % 2 != 1 {
428 guard_error!(self, "BFT does not compute stakes for the leader certificate in an even round");
429 return false;
430 }
431 let current_certificates = self.storage().get_certificates_for_round(current_round);
433 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
435 Ok(committee) => committee,
436 Err(e) => {
437 guard_error!(
438 self,
439 "BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}"
440 );
441 return false;
442 }
443 };
444 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
446 if !committee_lookback.is_quorum_threshold_reached(&authors) {
448 guard_trace!(self, "BFT failed reach quorum threshold in odd round {current_round}. ");
449 return false;
450 }
451 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
453 return true;
455 };
456 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
458 leader_certificate.id(),
459 current_certificates,
460 &committee_lookback,
461 );
462 stake_with_leader >= committee_lookback.availability_threshold()
464 || stake_without_leader >= committee_lookback.quorum_threshold()
465 || self.is_timer_expired()
466 }
467
468 fn compute_stake_for_leader_certificate(
470 &self,
471 leader_certificate_id: Field<N>,
472 current_certificates: IndexSet<BatchCertificate<N>>,
473 current_committee: &Committee<N>,
474 ) -> (u64, u64) {
475 if current_certificates.is_empty() {
477 return (0, 0);
478 }
479
480 let mut stake_with_leader = 0u64;
482 let mut stake_without_leader = 0u64;
484 for certificate in current_certificates {
486 let stake = current_committee.get_stake(certificate.author());
488 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
490 true => stake_with_leader = stake_with_leader.saturating_add(stake),
492 false => stake_without_leader = stake_without_leader.saturating_add(stake),
494 }
495 }
496 (stake_with_leader, stake_without_leader)
498 }
499}
500
501impl<N: Network> BFT<N> {
502 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
504 &self,
505 certificate: BatchCertificate<N>,
506 ) -> Result<()> {
507 let _lock = self.bft_lock.lock().await;
509 let certificate_round = certificate.round();
511 self.dag.write().insert(certificate, self);
513
514 let commit_round = certificate_round.saturating_sub(1);
516 if commit_round % 2 != 0 || commit_round < 2 {
518 return Ok(());
519 }
520 if commit_round <= self.dag.read().last_committed_round() {
522 return Ok(());
523 }
524
525 guard_info!(self, "Checking if the leader is ready to be committed for round {commit_round}...");
527
528 let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
530 bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
531 };
532
533 let leader = match self.ledger().latest_leader() {
535 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
536 _ => {
537 let computed_leader = self.primary.account().address();
539 self.ledger().update_latest_leader(commit_round, computed_leader);
545
546 computed_leader
547 }
548 };
549
550 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
552 else {
553 guard_trace!(self, "BFT did not find the leader certificate for commit round {commit_round} yet");
554 return Ok(());
555 };
556 let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
558 bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
560 };
561 let authors = certificates
563 .values()
564 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
565 true => Some(c.author()),
566 false => None,
567 })
568 .collect();
569 if !committee_lookback.is_availability_threshold_reached(&authors) {
571 guard_trace!(self, "BFT is not ready to commit {commit_round}");
573 return Ok(());
574 }
575
576 guard_info!(self, "Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
578
579 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
581 }
582
583 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
585 &self,
586 leader_certificate: BatchCertificate<N>,
587 ) -> Result<()> {
588 let latest_leader_round = leader_certificate.round();
590 let mut leader_certificates = vec![leader_certificate.clone()];
593 {
594 let leader_round = leader_certificate.round();
596
597 let mut current_certificate = leader_certificate;
598 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
599 {
600 let leader = match self.ledger().latest_leader() {
610 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
611 _ => {
612 let computed_leader = self.primary.account().address();
614 self.ledger().update_latest_leader(round, computed_leader);
623
624 computed_leader
625 }
626 };
627 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
629 else {
630 continue;
631 };
632 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
634 leader_certificates.push(previous_certificate.clone());
636 current_certificate = previous_certificate;
638 }
639 }
640 }
641
642 for leader_certificate in leader_certificates.into_iter().rev() {
644 let leader_round = leader_certificate.round();
646 let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
648 Ok(subdag) => subdag,
649 Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
650 };
651 if !IS_SYNCING {
653 let mut transmissions = IndexMap::new();
655 let mut seen_transaction_ids = IndexSet::new();
657 let mut seen_solution_ids = IndexSet::new();
659 for certificate in commit_subdag.values().flatten() {
661 for transmission_id in certificate.transmission_ids() {
663 match transmission_id {
667 TransmissionID::Solution(solution_id, _) => {
668 if seen_solution_ids.contains(&solution_id) {
670 continue;
671 }
672 }
673 TransmissionID::Transaction(transaction_id, _) => {
674 if seen_transaction_ids.contains(transaction_id) {
676 continue;
677 }
678 }
679 TransmissionID::Ratification => {
680 bail!("Ratifications are currently not supported in the BFT.")
681 }
682 }
683 if transmissions.contains_key(transmission_id) {
685 continue;
686 }
687 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
690 continue;
691 }
692 let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
694 bail!(
695 "BFT failed to retrieve transmission '{}.{}' from round {}",
696 fmt_id(transmission_id),
697 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
698 certificate.round()
699 );
700 };
701 match transmission_id {
703 TransmissionID::Solution(id, _) => {
704 seen_solution_ids.insert(id);
705 }
706 TransmissionID::Transaction(id, _) => {
707 seen_transaction_ids.insert(id);
708 }
709 TransmissionID::Ratification => {}
710 }
711 transmissions.insert(*transmission_id, transmission);
713 }
714 }
715 let subdag = Subdag::from(commit_subdag.clone())?;
718 let anchor_round = subdag.anchor_round();
720 let num_transmissions = transmissions.len();
722 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
724
725 ensure!(
727 anchor_round == leader_round,
728 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
729 );
730
731 if let Some(consensus_sender) = self.consensus_sender.get() {
733 let (callback_sender, callback_receiver) = oneshot::channel();
735 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
737 match callback_receiver.await {
739 Ok(Ok(())) => (), Ok(Err(e)) => {
741 guard_error!(self, "BFT failed to advance the subdag for round {anchor_round} - {e}");
742 return Ok(());
743 }
744 Err(e) => {
745 guard_error!(self, "BFT failed to receive the callback for round {anchor_round} - {e}");
746 return Ok(());
747 }
748 }
749 }
750
751 guard_info!(
752 self,
753 "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
754 );
755 }
756
757 let mut dag_write = self.dag.write();
759 for certificate in commit_subdag.values().flatten() {
760 dag_write.commit(certificate, self.storage().max_gc_rounds());
761 }
762 }
763
764 self.storage().garbage_collect_certificates(latest_leader_round);
766
767 Ok(())
768 }
769
770 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
772 &self,
773 leader_certificate: BatchCertificate<N>,
774 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
775 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
777 let mut already_ordered = HashSet::new();
779 let mut buffer = vec![leader_certificate];
781 while let Some(certificate) = buffer.pop() {
783 commit.entry(certificate.round()).or_default().insert(certificate.clone());
785
786 let previous_round = certificate.round().saturating_sub(1);
788 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
789 continue;
790 }
791 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
795 if already_ordered.contains(previous_certificate_id) {
797 continue;
798 }
799 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
801 continue;
802 }
803 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
805 continue;
806 }
807
808 let previous_certificate = {
810 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
812 Some(previous_certificate) => previous_certificate,
814 None => match self.storage().get_certificate(*previous_certificate_id) {
816 Some(previous_certificate) => previous_certificate,
818 None => bail!(
820 "Missing previous certificate {} for round {previous_round}",
821 fmt_id(previous_certificate_id)
822 ),
823 },
824 }
825 };
826 already_ordered.insert(previous_certificate.id());
828 buffer.push(previous_certificate);
830 }
831 }
832 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
834 Ok(commit)
836 }
837
838 fn is_linked(
840 &self,
841 previous_certificate: BatchCertificate<N>,
842 current_certificate: BatchCertificate<N>,
843 ) -> Result<bool> {
844 let mut traversal = vec![current_certificate.clone()];
846 for round in (previous_certificate.round()..current_certificate.round()).rev() {
848 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
850 bail!("BFT failed to retrieve the certificates for past round {round}");
853 };
854 traversal = certificates
856 .into_values()
857 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
858 .collect();
859 }
860 Ok(traversal.contains(&previous_certificate))
861 }
862}
863
864impl<N: Network> BFT<N> {
865 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
867 let BFTReceiver { mut rx_primary_round, mut rx_primary_certificate, mut rx_sync_bft_dag_at_bootup } =
868 bft_receiver;
869
870 let self_ = self.clone();
872 self.spawn(async move {
873 while let Some((current_round, callback)) = rx_primary_round.recv().await {
874 callback.send(self_.update_to_next_round(current_round)).ok();
875 }
876 });
877
878 let self_ = self.clone();
880 self.spawn(async move {
881 while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
882 let result = self_.update_dag::<true, false>(certificate).await;
884 callback.send(result).ok();
887 }
888 });
889
890 let self_ = self.clone();
892 self.spawn(async move {
893 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
894 self_.sync_bft_dag_at_bootup(certificates).await;
895 }
896 });
897 }
898
899 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
906 let mut dag = self.dag.write();
908
909 for certificate in certificates {
911 dag.commit(&certificate, self.storage().max_gc_rounds());
912 }
913 }
914
915 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
917 self.handles.lock().push(tokio::spawn(future));
918 }
919
920 pub async fn shut_down(&self) {
922 guard_info!(self, "Shutting down the BFT...");
923 let _lock = self.bft_lock.lock().await;
925
926 self.primary.shut_down().await;
928
929 let mut handles = self.handles.lock();
931 handles.iter().for_each(|handle| handle.abort());
932 handles.clear();
933 }
934}
935
936#[cfg(test)]
937mod tests {
938 use crate::{
939 BFT,
940 DEVELOPMENT_MODE_RNG_SEED,
941 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
942 helpers::{Storage, amareleo_storage_mode, default_ledger_dir},
943 };
944
945 use amareleo_chain_account::Account;
946 use amareleo_node_bft_ledger_service::MockLedgerService;
947 use amareleo_node_bft_storage_service::BFTMemoryService;
948 use snarkvm::{
949 console::account::{Address, PrivateKey},
950 ledger::{
951 committee::Committee,
952 narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
953 },
954 utilities::TestRng,
955 };
956
957 use aleo_std::StorageMode;
958 use anyhow::Result;
959 use indexmap::{IndexMap, IndexSet};
960 use rand::SeedableRng;
961 use rand_chacha::ChaChaRng;
962 use std::sync::Arc;
963
964 type CurrentNetwork = snarkvm::console::network::MainnetV0;
965
966 fn sample_test_instance(
968 committee_round: Option<u64>,
969 max_gc_rounds: u64,
970 rng: &mut TestRng,
971 ) -> (
972 Committee<CurrentNetwork>,
973 Account<CurrentNetwork>,
974 Arc<MockLedgerService<CurrentNetwork>>,
975 Storage<CurrentNetwork>,
976 ) {
977 let committee = match committee_round {
978 Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
979 None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
980 };
981 let account = Account::new(rng).unwrap();
982 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
983 let transmissions = Arc::new(BFTMemoryService::new());
984 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
985
986 (committee, account, ledger, storage)
987 }
988
989 #[test]
990 #[tracing_test::traced_test]
991 fn test_is_leader_quorum_odd() -> Result<()> {
992 let rng = &mut TestRng::default();
993
994 let mut certificates = IndexSet::new();
996 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
997 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
998 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
999 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1000
1001 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1003 1,
1004 vec![
1005 certificates[0].author(),
1006 certificates[1].author(),
1007 certificates[2].author(),
1008 certificates[3].author(),
1009 ],
1010 rng,
1011 );
1012
1013 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1015 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1017 let account = Account::new(rng)?;
1019 let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1021 assert!(bft.is_timer_expired());
1022 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1024 assert!(!result);
1026 for certificate in certificates.iter() {
1028 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1029 }
1030 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1032 assert!(result); let leader_certificate = sample_batch_certificate(rng);
1035 *bft.leader_certificate.write() = Some(leader_certificate);
1036 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1038 assert!(result); Ok(())
1041 }
1042
1043 #[test]
1044 #[tracing_test::traced_test]
1045 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1046 let rng = &mut TestRng::default();
1047
1048 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1050 assert_eq!(committee.starting_round(), 1);
1051 assert_eq!(storage.current_round(), 1);
1052 assert_eq!(storage.max_gc_rounds(), 10);
1053
1054 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1056 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1061 assert!(!result);
1062 Ok(())
1063 }
1064
1065 #[test]
1066 #[tracing_test::traced_test]
1067 fn test_is_leader_quorum_even() -> Result<()> {
1068 let rng = &mut TestRng::default();
1069
1070 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1072 assert_eq!(committee.starting_round(), 2);
1073 assert_eq!(storage.current_round(), 2);
1074 assert_eq!(storage.max_gc_rounds(), 10);
1075
1076 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1078 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1082 assert!(!result);
1083 Ok(())
1084 }
1085
1086 #[test]
1087 #[tracing_test::traced_test]
1088 fn test_is_even_round_ready() -> Result<()> {
1089 let rng = &mut TestRng::default();
1090
1091 let mut certificates = IndexSet::new();
1093 certificates.insert(sample_batch_certificate_for_round(2, rng));
1094 certificates.insert(sample_batch_certificate_for_round(2, rng));
1095 certificates.insert(sample_batch_certificate_for_round(2, rng));
1096 certificates.insert(sample_batch_certificate_for_round(2, rng));
1097
1098 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1100 2,
1101 vec![
1102 certificates[0].author(),
1103 certificates[1].author(),
1104 certificates[2].author(),
1105 certificates[3].author(),
1106 ],
1107 rng,
1108 );
1109
1110 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1112 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1114 let account = Account::new(rng)?;
1116 let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1118 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1120 *bft.leader_certificate.write() = Some(leader_certificate);
1121 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1122 assert!(!result);
1124 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1126 assert!(result);
1127
1128 let bft_timer =
1130 BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1131 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1133 if !bft_timer.is_timer_expired() {
1134 assert!(!result);
1135 }
1136 let leader_certificate_timeout =
1138 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1139 std::thread::sleep(leader_certificate_timeout);
1140 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1142 if bft_timer.is_timer_expired() {
1143 assert!(result);
1144 } else {
1145 assert!(!result);
1146 }
1147
1148 Ok(())
1149 }
1150
1151 #[test]
1152 #[tracing_test::traced_test]
1153 fn test_update_leader_certificate_odd() -> Result<()> {
1154 let rng = &mut TestRng::default();
1155
1156 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1158 assert_eq!(storage.max_gc_rounds(), 10);
1159
1160 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1162
1163 let result = bft.update_leader_certificate_to_even_round(1);
1165 assert!(!result);
1166 Ok(())
1167 }
1168
1169 #[test]
1170 #[tracing_test::traced_test]
1171 fn test_update_leader_certificate_bad_round() -> Result<()> {
1172 let rng = &mut TestRng::default();
1173
1174 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1176 assert_eq!(storage.max_gc_rounds(), 10);
1177
1178 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1180
1181 let result = bft.update_leader_certificate_to_even_round(6);
1183 assert!(!result);
1184 Ok(())
1185 }
1186
1187 #[test]
1188 #[tracing_test::traced_test]
1189 fn test_update_leader_certificate_even() -> Result<()> {
1190 let rng = &mut TestRng::default();
1191
1192 let current_round = 3;
1194
1195 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1197 current_round,
1198 rng,
1199 );
1200
1201 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1203 2,
1204 vec![
1205 certificates[0].author(),
1206 certificates[1].author(),
1207 certificates[2].author(),
1208 certificates[3].author(),
1209 ],
1210 rng,
1211 );
1212
1213 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1215
1216 let transmissions = Arc::new(BFTMemoryService::new());
1218 let storage = Storage::new(ledger.clone(), transmissions, 10, None);
1219 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1220 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1221 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1222 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1223 assert_eq!(storage.current_round(), 2);
1224
1225 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1228
1229 let account = Account::new(rng)?;
1231 let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1232
1233 *bft.leader_certificate.write() = Some(leader_certificate);
1235
1236 let result = bft.update_leader_certificate_to_even_round(2);
1239 assert!(result);
1240
1241 Ok(())
1242 }
1243
1244 #[tokio::test]
1245 #[tracing_test::traced_test]
1246 async fn test_order_dag_with_dfs() -> Result<()> {
1247 let rng = &mut TestRng::default();
1248
1249 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1251
1252 let previous_round = 2; let current_round = previous_round + 1;
1255
1256 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1258 current_round,
1259 rng,
1260 );
1261
1262 {
1266 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1268 let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1270
1271 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1273
1274 for certificate in previous_certificates.clone() {
1276 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1277 }
1278
1279 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1281 assert!(result.is_ok());
1282 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1283 assert_eq!(candidate_certificates.len(), 1);
1284 let expected_certificates = vec![certificate.clone()];
1285 assert_eq!(
1286 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1287 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1288 );
1289 assert_eq!(candidate_certificates, expected_certificates);
1290 }
1291
1292 {
1296 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1298 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1300
1301 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1303
1304 for certificate in previous_certificates.clone() {
1306 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1307 }
1308
1309 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1311 assert!(result.is_ok());
1312 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1313 assert_eq!(candidate_certificates.len(), 5);
1314 let expected_certificates = vec![
1315 previous_certificates[0].clone(),
1316 previous_certificates[1].clone(),
1317 previous_certificates[2].clone(),
1318 previous_certificates[3].clone(),
1319 certificate,
1320 ];
1321 assert_eq!(
1322 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1323 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1324 );
1325 assert_eq!(candidate_certificates, expected_certificates);
1326 }
1327
1328 Ok(())
1329 }
1330
1331 #[test]
1332 #[tracing_test::traced_test]
1333 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1334 let rng = &mut TestRng::default();
1335
1336 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1338 assert_eq!(committee.starting_round(), 1);
1339 assert_eq!(storage.current_round(), 1);
1340 assert_eq!(storage.max_gc_rounds(), 1);
1341
1342 let previous_round = 2; let current_round = previous_round + 1;
1345
1346 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1348 current_round,
1349 rng,
1350 );
1351 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1353
1354 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1358
1359 let error_msg = format!(
1361 "Missing previous certificate {} for round {previous_round}",
1362 crate::helpers::fmt_id(previous_certificate_ids[3]),
1363 );
1364
1365 let result = bft.order_dag_with_dfs::<false>(certificate);
1367 assert!(result.is_err());
1368 assert_eq!(result.unwrap_err().to_string(), error_msg);
1369 Ok(())
1370 }
1371
1372 #[tokio::test]
1373 #[tracing_test::traced_test]
1374 async fn test_bft_gc_on_commit() -> Result<()> {
1375 let rng = &mut TestRng::default();
1376
1377 let max_gc_rounds = 1;
1379 let committee_round = 0;
1380 let commit_round = 2;
1381 let current_round = commit_round + 1;
1382
1383 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1385 current_round,
1386 rng,
1387 );
1388
1389 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1391 committee_round,
1392 vec![
1393 certificates[0].author(),
1394 certificates[1].author(),
1395 certificates[2].author(),
1396 certificates[3].author(),
1397 ],
1398 rng,
1399 );
1400
1401 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1403
1404 let transmissions = Arc::new(BFTMemoryService::new());
1406 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
1407 for certificate in certificates.iter() {
1409 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1410 }
1411
1412 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1415
1416 let account = Account::new(rng)?;
1418 let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1419 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1421
1422 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1424
1425 for certificate in certificates {
1427 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1428 }
1429
1430 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1432
1433 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1435
1436 Ok(())
1437 }
1438
1439 #[tokio::test]
1440 #[tracing_test::traced_test]
1441 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1442 let rng = &mut TestRng::default();
1443
1444 let max_gc_rounds = 1;
1446 let committee_round = 0;
1447 let commit_round = 2;
1448 let current_round = commit_round + 1;
1449
1450 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1452 current_round,
1453 rng,
1454 );
1455
1456 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1458 committee_round,
1459 vec![
1460 certificates[0].author(),
1461 certificates[1].author(),
1462 certificates[2].author(),
1463 certificates[3].author(),
1464 ],
1465 rng,
1466 );
1467
1468 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1470
1471 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1473 for certificate in certificates.iter() {
1475 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1476 }
1477
1478 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1481
1482 let account = Account::new(rng)?;
1484 let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1485
1486 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1488
1489 for certificate in certificates.clone() {
1491 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1492 }
1493
1494 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1496
1497 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1501 let bootup_bft = BFT::new(account, storage_2, false, StorageMode::Development(0), ledger, None)?;
1503
1504 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1506
1507 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1509
1510 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1512 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1513
1514 for certificate in certificates {
1516 let certificate_round = certificate.round();
1517 let certificate_id = certificate.id();
1518 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1520 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1523 }
1524
1525 Ok(())
1526 }
1527
1528 #[tokio::test]
1529 #[tracing_test::traced_test]
1530 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1531 let rng = &mut TestRng::default();
1538 let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1539
1540 let private_keys = vec![
1541 PrivateKey::new(rng_pks).unwrap(),
1542 PrivateKey::new(rng_pks).unwrap(),
1543 PrivateKey::new(rng_pks).unwrap(),
1544 PrivateKey::new(rng_pks).unwrap(),
1545 ];
1546 let address0 = Address::try_from(private_keys[0])?;
1547
1548 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1550 let committee_round = 0;
1551 let commit_round = 2;
1552 let current_round = commit_round + 1;
1553 let next_round = current_round + 1;
1554
1555 let (round_to_certificates_map, committee) = {
1557 let addresses = vec![
1558 Address::try_from(private_keys[0])?,
1559 Address::try_from(private_keys[1])?,
1560 Address::try_from(private_keys[2])?,
1561 Address::try_from(private_keys[3])?,
1562 ];
1563 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1564 committee_round,
1565 addresses,
1566 rng,
1567 );
1568 let mut round_to_certificates_map: IndexMap<
1570 u64,
1571 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1572 > = IndexMap::new();
1573 let mut previous_certificates = IndexSet::with_capacity(4);
1574 for _ in 0..4 {
1576 previous_certificates.insert(sample_batch_certificate(rng));
1577 }
1578 for round in 0..commit_round + 3 {
1579 let mut current_certificates = IndexSet::new();
1580 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1581 IndexSet::new()
1582 } else {
1583 previous_certificates.iter().map(|c| c.id()).collect()
1584 };
1585 let transmission_ids =
1586 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1587 .into_iter()
1588 .collect::<IndexSet<_>>();
1589 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1590 let committee_id = committee.id();
1591 for (i, private_key_1) in private_keys.iter().enumerate() {
1592 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1593 private_key_1,
1594 round,
1595 timestamp,
1596 committee_id,
1597 transmission_ids.clone(),
1598 previous_certificate_ids.clone(),
1599 rng,
1600 )
1601 .unwrap();
1602 let mut signatures = IndexSet::with_capacity(4);
1603 for (j, private_key_2) in private_keys.iter().enumerate() {
1604 if i != j {
1605 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1606 }
1607 }
1608 let certificate =
1609 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1610 current_certificates.insert(certificate);
1611 }
1612 round_to_certificates_map.insert(round, current_certificates.clone());
1614 previous_certificates = current_certificates.clone();
1615 }
1616 (round_to_certificates_map, committee)
1617 };
1618
1619 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1621 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1623 let leader = address0; let next_leader = address0; let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1628 for i in 1..=commit_round {
1629 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1630 if i == commit_round {
1631 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1633 if let Some(c) = leader_certificate {
1634 pre_shutdown_certificates.push(c.clone());
1635 }
1636 continue;
1637 }
1638 pre_shutdown_certificates.extend(certificates);
1639 }
1640 for certificate in pre_shutdown_certificates.iter() {
1641 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1642 }
1643 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1645 Vec::new();
1646 for j in commit_round..=commit_round + 2 {
1647 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1648 post_shutdown_certificates.extend(certificate);
1649 }
1650 for certificate in post_shutdown_certificates.iter() {
1651 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1652 }
1653 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1655 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1656
1657 let account = Account::try_from(private_keys[0])?;
1659 let ledger_dir = default_ledger_dir(1, true, "0");
1660 let storage_mode = amareleo_storage_mode(ledger_dir);
1661 let bft = BFT::new(account.clone(), storage, true, storage_mode.clone(), ledger.clone(), None)?;
1662
1663 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1665
1666 for certificate in pre_shutdown_certificates.clone() {
1668 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1669 }
1670
1671 for certificate in post_shutdown_certificates.clone() {
1673 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1674 }
1675 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1677 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1678 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1679
1680 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1684
1685 let bootup_bft = BFT::new(account, bootup_storage.clone(), true, storage_mode.clone(), ledger.clone(), None)?;
1687
1688 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1690
1691 for certificate in post_shutdown_certificates.iter() {
1693 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1694 }
1695 for certificate in post_shutdown_certificates.clone() {
1696 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1697 }
1698 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1700 let commit_subdag_metadata_bootup =
1701 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1702 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1703 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1704
1705 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1709
1710 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1712 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1713 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1714 assert!(
1715 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1716 );
1717
1718 for certificate in pre_shutdown_certificates.clone() {
1720 let certificate_round = certificate.round();
1721 let certificate_id = certificate.id();
1722 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1724 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1725 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1728 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1729 }
1730
1731 for certificate in committed_certificates_bootup.clone() {
1733 let certificate_round = certificate.round();
1734 let certificate_id = certificate.id();
1735 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1737 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1738 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1741 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1742 }
1743
1744 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1746
1747 Ok(())
1748 }
1749
1750 #[tokio::test]
1751 #[tracing_test::traced_test]
1752 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1753 let rng = &mut TestRng::default();
1760 let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1761
1762 let private_keys = vec![
1763 PrivateKey::new(rng_pks).unwrap(),
1764 PrivateKey::new(rng_pks).unwrap(),
1765 PrivateKey::new(rng_pks).unwrap(),
1766 PrivateKey::new(rng_pks).unwrap(),
1767 ];
1768 let address0 = Address::try_from(private_keys[0])?;
1769
1770 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1772 let committee_round = 0;
1773 let commit_round = 2;
1774 let current_round = commit_round + 1;
1775 let next_round = current_round + 1;
1776
1777 let (round_to_certificates_map, committee) = {
1779 let addresses = vec![
1780 Address::try_from(private_keys[0])?,
1781 Address::try_from(private_keys[1])?,
1782 Address::try_from(private_keys[2])?,
1783 Address::try_from(private_keys[3])?,
1784 ];
1785 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1786 committee_round,
1787 addresses,
1788 rng,
1789 );
1790 let mut round_to_certificates_map: IndexMap<
1792 u64,
1793 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1794 > = IndexMap::new();
1795 let mut previous_certificates = IndexSet::with_capacity(4);
1796 for _ in 0..4 {
1798 previous_certificates.insert(sample_batch_certificate(rng));
1799 }
1800 for round in 0..=commit_round + 2 {
1801 let mut current_certificates = IndexSet::new();
1802 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1803 IndexSet::new()
1804 } else {
1805 previous_certificates.iter().map(|c| c.id()).collect()
1806 };
1807 let transmission_ids =
1808 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1809 .into_iter()
1810 .collect::<IndexSet<_>>();
1811 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1812 let committee_id = committee.id();
1813 for (i, private_key_1) in private_keys.iter().enumerate() {
1814 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1815 private_key_1,
1816 round,
1817 timestamp,
1818 committee_id,
1819 transmission_ids.clone(),
1820 previous_certificate_ids.clone(),
1821 rng,
1822 )
1823 .unwrap();
1824 let mut signatures = IndexSet::with_capacity(4);
1825 for (j, private_key_2) in private_keys.iter().enumerate() {
1826 if i != j {
1827 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1828 }
1829 }
1830 let certificate =
1831 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1832 current_certificates.insert(certificate);
1833 }
1834 round_to_certificates_map.insert(round, current_certificates.clone());
1836 previous_certificates = current_certificates.clone();
1837 }
1838 (round_to_certificates_map, committee)
1839 };
1840
1841 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1843 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1845 let leader = address0; let next_leader = address0; let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1850 for i in 1..=commit_round {
1851 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1852 if i == commit_round {
1853 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1855 if let Some(c) = leader_certificate {
1856 pre_shutdown_certificates.push(c.clone());
1857 }
1858 continue;
1859 }
1860 pre_shutdown_certificates.extend(certificates);
1861 }
1862 for certificate in pre_shutdown_certificates.iter() {
1863 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1864 }
1865 let account = Account::try_from(private_keys[0])?;
1867 let bootup_bft =
1868 BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1869 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1871 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1873
1874 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1876 Vec::new();
1877 for j in commit_round..=commit_round + 2 {
1878 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1879 post_shutdown_certificates.extend(certificate);
1880 }
1881 for certificate in post_shutdown_certificates.iter() {
1882 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1883 }
1884
1885 for certificate in post_shutdown_certificates.clone() {
1887 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1888 }
1889
1890 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1892 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1893 let committed_certificates = commit_subdag.values().flatten();
1894
1895 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1897 for committed_certificate in committed_certificates.clone() {
1898 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1899 }
1900 }
1901 Ok(())
1902 }
1903}