1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 Primary,
19 helpers::{
20 BFTReceiver,
21 ConsensusSender,
22 DAG,
23 PrimaryReceiver,
24 PrimarySender,
25 Storage,
26 fmt_id,
27 init_bft_channels,
28 now,
29 },
30};
31use aleo_std::StorageMode;
32use amareleo_chain_account::Account;
33use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
34use amareleo_node_bft_ledger_service::LedgerService;
35use snarkvm::{
36 console::account::Address,
37 ledger::{
38 block::Transaction,
39 committee::Committee,
40 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
41 puzzle::{Solution, SolutionID},
42 },
43 prelude::{Field, Network, Result, bail, ensure},
44};
45
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48#[cfg(feature = "locktick")]
49use locktick::{
50 parking_lot::{Mutex, RwLock},
51 tokio::Mutex as TMutex,
52};
53#[cfg(not(feature = "locktick"))]
54use parking_lot::{Mutex, RwLock};
55use std::{
56 collections::{BTreeMap, HashSet},
57 future::Future,
58 sync::{
59 Arc,
60 atomic::{AtomicI64, Ordering},
61 },
62};
63#[cfg(not(feature = "locktick"))]
64use tokio::sync::Mutex as TMutex;
65use tokio::{
66 sync::{OnceCell, oneshot},
67 task::JoinHandle,
68};
69use tracing::subscriber::DefaultGuard;
70
71#[derive(Clone)]
72pub struct BFT<N: Network> {
73 primary: Primary<N>,
75 dag: Arc<RwLock<DAG<N>>>,
77 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
79 leader_certificate_timer: Arc<AtomicI64>,
81 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
83 tracing: Option<TracingHandler>,
85 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
87 bft_lock: Arc<TMutex<()>>,
89}
90
91impl<N: Network> TracingHandlerGuard for BFT<N> {
92 fn get_tracing_guard(&self) -> Option<DefaultGuard> {
94 self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
95 }
96}
97
98impl<N: Network> BFT<N> {
99 pub fn new(
101 account: Account<N>,
102 storage: Storage<N>,
103 keep_state: bool,
104 storage_mode: StorageMode,
105 ledger: Arc<dyn LedgerService<N>>,
106 tracing: Option<TracingHandler>,
107 ) -> Result<Self> {
108 Ok(Self {
109 primary: Primary::new(account, storage, keep_state, storage_mode, ledger, tracing.clone())?,
110 dag: Default::default(),
111 leader_certificate: Default::default(),
112 leader_certificate_timer: Default::default(),
113 consensus_sender: Default::default(),
114 tracing: tracing.clone(),
115 handles: Default::default(),
116 bft_lock: Default::default(),
117 })
118 }
119
120 pub async fn run(
122 &mut self,
123 consensus_sender: Option<ConsensusSender<N>>,
124 primary_sender: PrimarySender<N>,
125 primary_receiver: PrimaryReceiver<N>,
126 ) -> Result<()> {
127 guard_info!(self, "Starting the BFT instance...");
128
129 let (bft_sender, bft_receiver) = init_bft_channels::<N>();
131
132 self.start_handlers(bft_receiver);
134
135 let result = self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await;
137 if let Err(err) = result {
138 guard_error!(self, "BFT failed to run the primary instance - {err}");
139 self.shut_down().await;
140 return Err(err);
141 }
142
143 if let Some(consensus_sender) = consensus_sender {
147 let result = self.consensus_sender.set(consensus_sender);
148 if result.is_err() {
149 self.shut_down().await;
150 guard_error!(self, "Consensus sender already set");
151 bail!("Consensus sender already set");
152 }
153 }
154 Ok(())
155 }
156
157 pub fn is_synced(&self) -> bool {
159 self.primary.is_synced()
160 }
161
162 pub const fn primary(&self) -> &Primary<N> {
164 &self.primary
165 }
166
167 pub const fn storage(&self) -> &Storage<N> {
169 self.primary.storage()
170 }
171
172 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
174 self.primary.ledger()
175 }
176
177 pub fn leader(&self) -> Option<Address<N>> {
179 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
180 }
181
182 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
184 &self.leader_certificate
185 }
186}
187
188impl<N: Network> BFT<N> {
189 pub fn num_unconfirmed_transmissions(&self) -> usize {
191 self.primary.num_unconfirmed_transmissions()
192 }
193
194 pub fn num_unconfirmed_ratifications(&self) -> usize {
196 self.primary.num_unconfirmed_ratifications()
197 }
198
199 pub fn num_unconfirmed_solutions(&self) -> usize {
201 self.primary.num_unconfirmed_solutions()
202 }
203
204 pub fn num_unconfirmed_transactions(&self) -> usize {
206 self.primary.num_unconfirmed_transactions()
207 }
208}
209
210impl<N: Network> BFT<N> {
211 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
213 self.primary.worker_transmission_ids()
214 }
215
216 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
218 self.primary.worker_transmissions()
219 }
220
221 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
223 self.primary.worker_solutions()
224 }
225
226 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
228 self.primary.worker_transactions()
229 }
230}
231
232impl<N: Network> BFT<N> {
233 fn update_to_next_round(&self, current_round: u64) -> bool {
235 let storage_round = self.storage().current_round();
237 if current_round < storage_round {
238 guard_debug!(
239 self,
240 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
241 );
242 return false;
243 }
244
245 let is_ready = match current_round % 2 == 0 {
247 true => self.update_leader_certificate_to_even_round(current_round),
248 false => self.is_leader_quorum_or_nonleaders_available(current_round),
249 };
250
251 #[cfg(feature = "metrics")]
252 {
253 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
254 if start > 0 {
256 let end = now();
257 let elapsed = std::time::Duration::from_secs((end - start) as u64);
258 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
259 }
260 }
261
262 if current_round % 2 == 0 {
264 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
266 if !is_ready {
268 guard_trace!(self, is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
269 }
270 let leader_round = leader_certificate.round();
272 match leader_round == current_round {
273 true => {
274 guard_info!(
275 self,
276 "\n\nRound {current_round} elected a leader - {}\n",
277 leader_certificate.author()
278 );
279 #[cfg(feature = "metrics")]
280 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
281 }
282 false => {
283 guard_warn!(self, "BFT failed to elect a leader for round {current_round} (!= {leader_round})")
284 }
285 }
286 } else {
287 match is_ready {
288 true => guard_info!(self, "\n\nRound {current_round} reached quorum without a leader\n"),
289 false => {
290 guard_info!(self, "{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed())
291 }
292 }
293 }
294 }
295
296 if is_ready {
298 if let Err(e) = self.storage().increment_to_next_round(current_round) {
300 guard_warn!(self, "BFT failed to increment to the next round from round {current_round} - {e}");
301 return false;
302 }
303 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
305 }
306
307 is_ready
308 }
309
310 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
316 let current_round = self.storage().current_round();
318 if current_round != even_round {
320 guard_warn!(
321 self,
322 "BFT storage (at round {current_round}) is out of sync with the current even round {even_round}"
323 );
324 return false;
325 }
326
327 if current_round % 2 != 0 || current_round < 2 {
329 guard_error!(self, "BFT cannot update the leader certificate in an odd round");
330 return false;
331 }
332
333 let current_certificates = self.storage().get_certificates_for_round(current_round);
335 if current_certificates.is_empty() {
337 *self.leader_certificate.write() = None;
339 return false;
340 }
341
342 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
344 Ok(committee) => committee,
345 Err(e) => {
346 guard_error!(
347 self,
348 "BFT failed to retrieve the committee lookback for the even round {current_round} - {e}"
349 );
350 return false;
351 }
352 };
353 let leader = match self.ledger().latest_leader() {
355 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
356 _ => {
357 let computed_leader = self.primary.account().address();
359 self.ledger().update_latest_leader(current_round, computed_leader);
369
370 computed_leader
371 }
372 };
373 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
375 *self.leader_certificate.write() = leader_certificate.cloned();
376
377 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
378 }
379
380 fn is_even_round_ready_for_next_round(
384 &self,
385 certificates: IndexSet<BatchCertificate<N>>,
386 committee: Committee<N>,
387 current_round: u64,
388 ) -> bool {
389 let authors = certificates.into_iter().map(|c| c.author()).collect();
391 if !committee.is_quorum_threshold_reached(&authors) {
393 guard_trace!(self, "BFT failed to reach quorum threshold in even round {current_round}");
394 return false;
395 }
396 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
398 if leader_certificate.round() == current_round {
399 return true;
400 }
401 }
402 if self.is_timer_expired() {
404 guard_debug!(
405 self,
406 "BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)"
407 );
408 return true;
409 }
410 false
412 }
413
414 fn is_timer_expired(&self) -> bool {
416 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
417 }
418
419 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
424 let current_round = self.storage().current_round();
426 if current_round != odd_round {
428 guard_warn!(
429 self,
430 "BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}"
431 );
432 return false;
433 }
434 if current_round % 2 != 1 {
436 guard_error!(self, "BFT does not compute stakes for the leader certificate in an even round");
437 return false;
438 }
439 let current_certificates = self.storage().get_certificates_for_round(current_round);
441 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
443 Ok(committee) => committee,
444 Err(e) => {
445 guard_error!(
446 self,
447 "BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}"
448 );
449 return false;
450 }
451 };
452 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
454 if !committee_lookback.is_quorum_threshold_reached(&authors) {
456 guard_trace!(self, "BFT failed reach quorum threshold in odd round {current_round}. ");
457 return false;
458 }
459 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
461 return true;
463 };
464 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
466 leader_certificate.id(),
467 current_certificates,
468 &committee_lookback,
469 );
470 stake_with_leader >= committee_lookback.availability_threshold()
472 || stake_without_leader >= committee_lookback.quorum_threshold()
473 || self.is_timer_expired()
474 }
475
476 fn compute_stake_for_leader_certificate(
478 &self,
479 leader_certificate_id: Field<N>,
480 current_certificates: IndexSet<BatchCertificate<N>>,
481 current_committee: &Committee<N>,
482 ) -> (u64, u64) {
483 if current_certificates.is_empty() {
485 return (0, 0);
486 }
487
488 let mut stake_with_leader = 0u64;
490 let mut stake_without_leader = 0u64;
492 for certificate in current_certificates {
494 let stake = current_committee.get_stake(certificate.author());
496 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
498 true => stake_with_leader = stake_with_leader.saturating_add(stake),
500 false => stake_without_leader = stake_without_leader.saturating_add(stake),
502 }
503 }
504 (stake_with_leader, stake_without_leader)
506 }
507}
508
509impl<N: Network> BFT<N> {
510 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
512 &self,
513 certificate: BatchCertificate<N>,
514 ) -> Result<()> {
515 let _lock = self.bft_lock.lock().await;
517 let certificate_round = certificate.round();
519 self.dag.write().insert(certificate, self);
521
522 let commit_round = certificate_round.saturating_sub(1);
524 if commit_round % 2 != 0 || commit_round < 2 {
526 return Ok(());
527 }
528 if commit_round <= self.dag.read().last_committed_round() {
530 return Ok(());
531 }
532
533 guard_info!(self, "Checking if the leader is ready to be committed for round {commit_round}...");
535
536 let Ok(_committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
538 bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
539 };
540
541 let leader = match self.ledger().latest_leader() {
543 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
544 _ => {
545 let computed_leader = self.primary.account().address();
547 self.ledger().update_latest_leader(commit_round, computed_leader);
553
554 computed_leader
555 }
556 };
557
558 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
560 else {
561 guard_trace!(self, "BFT did not find the leader certificate for commit round {commit_round} yet");
562 return Ok(());
563 };
564 let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
566 bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
568 };
569 let Ok(certificate_committee_lookback) = self.ledger().get_committee_lookback_for_round(certificate_round)
571 else {
572 bail!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}");
573 };
574 let authors = certificates
576 .values()
577 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
578 true => Some(c.author()),
579 false => None,
580 })
581 .collect();
582 if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
584 guard_trace!(self, "BFT is not ready to commit {commit_round}");
586 return Ok(());
587 }
588
589 guard_info!(self, "Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
591
592 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
594 }
595
596 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
598 &self,
599 leader_certificate: BatchCertificate<N>,
600 ) -> Result<()> {
601 let latest_leader_round = leader_certificate.round();
603 let mut leader_certificates = vec![leader_certificate.clone()];
606 {
607 let leader_round = leader_certificate.round();
609
610 let mut current_certificate = leader_certificate;
611 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
612 {
613 let leader = match self.ledger().latest_leader() {
623 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
624 _ => {
625 let computed_leader = self.primary.account().address();
627 self.ledger().update_latest_leader(round, computed_leader);
636
637 computed_leader
638 }
639 };
640 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
642 else {
643 continue;
644 };
645 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
647 leader_certificates.push(previous_certificate.clone());
649 current_certificate = previous_certificate;
651 }
652 }
653 }
654
655 for leader_certificate in leader_certificates.into_iter().rev() {
657 let leader_round = leader_certificate.round();
659 let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
661 Ok(subdag) => subdag,
662 Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
663 };
664 if !IS_SYNCING {
666 let mut transmissions = IndexMap::new();
668 let mut seen_transaction_ids = IndexSet::new();
670 let mut seen_solution_ids = IndexSet::new();
672 for certificate in commit_subdag.values().flatten() {
674 for transmission_id in certificate.transmission_ids() {
676 match transmission_id {
680 TransmissionID::Solution(solution_id, _) => {
681 if seen_solution_ids.contains(&solution_id) {
683 continue;
684 }
685 }
686 TransmissionID::Transaction(transaction_id, _) => {
687 if seen_transaction_ids.contains(transaction_id) {
689 continue;
690 }
691 }
692 TransmissionID::Ratification => {
693 bail!("Ratifications are currently not supported in the BFT.")
694 }
695 }
696 if transmissions.contains_key(transmission_id) {
698 continue;
699 }
700 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
703 continue;
704 }
705 let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
707 bail!(
708 "BFT failed to retrieve transmission '{}.{}' from round {}",
709 fmt_id(transmission_id),
710 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
711 certificate.round()
712 );
713 };
714 match transmission_id {
716 TransmissionID::Solution(id, _) => {
717 seen_solution_ids.insert(id);
718 }
719 TransmissionID::Transaction(id, _) => {
720 seen_transaction_ids.insert(id);
721 }
722 TransmissionID::Ratification => {}
723 }
724 transmissions.insert(*transmission_id, transmission);
726 }
727 }
728 let subdag = Subdag::from(commit_subdag.clone())?;
731 let anchor_round = subdag.anchor_round();
733 let num_transmissions = transmissions.len();
735 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
737
738 ensure!(
740 anchor_round == leader_round,
741 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
742 );
743
744 if let Some(consensus_sender) = self.consensus_sender.get() {
746 let (callback_sender, callback_receiver) = oneshot::channel();
748 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
750 match callback_receiver.await {
752 Ok(Ok(())) => (), Ok(Err(e)) => {
754 guard_error!(self, "BFT failed to advance the subdag for round {anchor_round} - {e}");
755 return Ok(());
756 }
757 Err(e) => {
758 guard_error!(self, "BFT failed to receive the callback for round {anchor_round} - {e}");
759 return Ok(());
760 }
761 }
762 }
763
764 guard_info!(
765 self,
766 "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
767 );
768 }
769
770 let mut dag_write = self.dag.write();
772 for certificate in commit_subdag.values().flatten() {
773 dag_write.commit(certificate, self.storage().max_gc_rounds());
774 }
775 }
776
777 self.storage().garbage_collect_certificates(latest_leader_round);
793
794 Ok(())
795 }
796
797 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
799 &self,
800 leader_certificate: BatchCertificate<N>,
801 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
802 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
804 let mut already_ordered = HashSet::new();
806 let mut buffer = vec![leader_certificate];
808 while let Some(certificate) = buffer.pop() {
810 commit.entry(certificate.round()).or_default().insert(certificate.clone());
812
813 let previous_round = certificate.round().saturating_sub(1);
818 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
819 continue;
820 }
821 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
825 if already_ordered.contains(previous_certificate_id) {
827 continue;
828 }
829 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
831 continue;
832 }
833 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
835 continue;
836 }
837
838 let previous_certificate = {
840 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
842 Some(previous_certificate) => previous_certificate,
844 None => match self.storage().get_certificate(*previous_certificate_id) {
846 Some(previous_certificate) => previous_certificate,
848 None => bail!(
850 "Missing previous certificate {} for round {previous_round}",
851 fmt_id(previous_certificate_id)
852 ),
853 },
854 }
855 };
856 already_ordered.insert(previous_certificate.id());
858 buffer.push(previous_certificate);
860 }
861 }
862 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
864 Ok(commit)
866 }
867
868 fn is_linked(
870 &self,
871 previous_certificate: BatchCertificate<N>,
872 current_certificate: BatchCertificate<N>,
873 ) -> Result<bool> {
874 let mut traversal = vec![current_certificate.clone()];
876 for round in (previous_certificate.round()..current_certificate.round()).rev() {
878 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
880 bail!("BFT failed to retrieve the certificates for past round {round}");
883 };
884 traversal = certificates
886 .into_values()
887 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
888 .collect();
889 }
890 Ok(traversal.contains(&previous_certificate))
891 }
892}
893
894impl<N: Network> BFT<N> {
895 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
897 let BFTReceiver { mut rx_primary_round, mut rx_primary_certificate, mut rx_sync_bft_dag_at_bootup } =
898 bft_receiver;
899
900 let self_ = self.clone();
902 self.spawn(async move {
903 while let Some((current_round, callback)) = rx_primary_round.recv().await {
904 callback.send(self_.update_to_next_round(current_round)).ok();
905 }
906 });
907
908 let self_ = self.clone();
910 self.spawn(async move {
911 while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
912 let result = self_.update_dag::<true, false>(certificate).await;
914 callback.send(result).ok();
917 }
918 });
919
920 let self_ = self.clone();
922 self.spawn(async move {
923 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
924 self_.sync_bft_dag_at_bootup(certificates).await;
925 }
926 });
927 }
928
929 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
936 let mut dag = self.dag.write();
938
939 for certificate in certificates {
941 dag.commit(&certificate, self.storage().max_gc_rounds());
942 }
943 }
944
945 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
947 self.handles.lock().push(tokio::spawn(future));
948 }
949
950 pub async fn shut_down(&self) {
952 guard_info!(self, "Shutting down the BFT...");
953 let _lock = self.bft_lock.lock().await;
955
956 self.primary.shut_down().await;
958
959 let mut handles = self.handles.lock();
961 handles.iter().for_each(|handle| handle.abort());
962 handles.clear();
963 }
964}
965
966#[cfg(test)]
967mod tests {
968 use crate::{
969 BFT,
970 DEVELOPMENT_MODE_RNG_SEED,
971 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
972 helpers::{Storage, amareleo_storage_mode, default_ledger_dir},
973 };
974
975 use amareleo_chain_account::Account;
976 use amareleo_node_bft_ledger_service::MockLedgerService;
977 use amareleo_node_bft_storage_service::BFTMemoryService;
978 use snarkvm::{
979 console::account::{Address, PrivateKey},
980 ledger::{
981 committee::Committee,
982 narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
983 },
984 utilities::TestRng,
985 };
986
987 use aleo_std::StorageMode;
988 use anyhow::Result;
989 use indexmap::{IndexMap, IndexSet};
990 use rand::SeedableRng;
991 use rand_chacha::ChaChaRng;
992 use std::sync::Arc;
993
994 type CurrentNetwork = snarkvm::console::network::MainnetV0;
995
996 fn sample_test_instance(
998 committee_round: Option<u64>,
999 max_gc_rounds: u64,
1000 rng: &mut TestRng,
1001 ) -> (
1002 Committee<CurrentNetwork>,
1003 Account<CurrentNetwork>,
1004 Arc<MockLedgerService<CurrentNetwork>>,
1005 Storage<CurrentNetwork>,
1006 ) {
1007 let committee = match committee_round {
1008 Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
1009 None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
1010 };
1011 let account = Account::new(rng).unwrap();
1012 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1013 let transmissions = Arc::new(BFTMemoryService::new());
1014 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
1015
1016 (committee, account, ledger, storage)
1017 }
1018
1019 #[test]
1020 #[tracing_test::traced_test]
1021 fn test_is_leader_quorum_odd() -> Result<()> {
1022 let rng = &mut TestRng::default();
1023
1024 let mut certificates = IndexSet::new();
1026 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1027 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1028 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1029 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1030
1031 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1033 1,
1034 vec![
1035 certificates[0].author(),
1036 certificates[1].author(),
1037 certificates[2].author(),
1038 certificates[3].author(),
1039 ],
1040 rng,
1041 );
1042
1043 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1045 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1047 let account = Account::new(rng)?;
1049 let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1051 assert!(bft.is_timer_expired());
1052 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1054 assert!(!result);
1056 for certificate in certificates.iter() {
1058 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1059 }
1060 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1062 assert!(result); let leader_certificate = sample_batch_certificate(rng);
1065 *bft.leader_certificate.write() = Some(leader_certificate);
1066 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1068 assert!(result); Ok(())
1071 }
1072
1073 #[test]
1074 #[tracing_test::traced_test]
1075 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1076 let rng = &mut TestRng::default();
1077
1078 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1080 assert_eq!(committee.starting_round(), 1);
1081 assert_eq!(storage.current_round(), 1);
1082 assert_eq!(storage.max_gc_rounds(), 10);
1083
1084 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1086 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1091 assert!(!result);
1092 Ok(())
1093 }
1094
1095 #[test]
1096 #[tracing_test::traced_test]
1097 fn test_is_leader_quorum_even() -> Result<()> {
1098 let rng = &mut TestRng::default();
1099
1100 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1102 assert_eq!(committee.starting_round(), 2);
1103 assert_eq!(storage.current_round(), 2);
1104 assert_eq!(storage.max_gc_rounds(), 10);
1105
1106 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1108 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1112 assert!(!result);
1113 Ok(())
1114 }
1115
1116 #[test]
1117 #[tracing_test::traced_test]
1118 fn test_is_even_round_ready() -> Result<()> {
1119 let rng = &mut TestRng::default();
1120
1121 let mut certificates = IndexSet::new();
1123 certificates.insert(sample_batch_certificate_for_round(2, rng));
1124 certificates.insert(sample_batch_certificate_for_round(2, rng));
1125 certificates.insert(sample_batch_certificate_for_round(2, rng));
1126 certificates.insert(sample_batch_certificate_for_round(2, rng));
1127
1128 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1130 2,
1131 vec![
1132 certificates[0].author(),
1133 certificates[1].author(),
1134 certificates[2].author(),
1135 certificates[3].author(),
1136 ],
1137 rng,
1138 );
1139
1140 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1142 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1144 let account = Account::new(rng)?;
1146 let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1148 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1150 *bft.leader_certificate.write() = Some(leader_certificate);
1151 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1152 assert!(!result);
1154 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1156 assert!(result);
1157
1158 let bft_timer =
1160 BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1161 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1163 if !bft_timer.is_timer_expired() {
1164 assert!(!result);
1165 }
1166 let leader_certificate_timeout =
1168 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1169 std::thread::sleep(leader_certificate_timeout);
1170 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1172 if bft_timer.is_timer_expired() {
1173 assert!(result);
1174 } else {
1175 assert!(!result);
1176 }
1177
1178 Ok(())
1179 }
1180
1181 #[test]
1182 #[tracing_test::traced_test]
1183 fn test_update_leader_certificate_odd() -> Result<()> {
1184 let rng = &mut TestRng::default();
1185
1186 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1188 assert_eq!(storage.max_gc_rounds(), 10);
1189
1190 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1192
1193 let result = bft.update_leader_certificate_to_even_round(1);
1195 assert!(!result);
1196 Ok(())
1197 }
1198
1199 #[test]
1200 #[tracing_test::traced_test]
1201 fn test_update_leader_certificate_bad_round() -> Result<()> {
1202 let rng = &mut TestRng::default();
1203
1204 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1206 assert_eq!(storage.max_gc_rounds(), 10);
1207
1208 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1210
1211 let result = bft.update_leader_certificate_to_even_round(6);
1213 assert!(!result);
1214 Ok(())
1215 }
1216
1217 #[test]
1218 #[tracing_test::traced_test]
1219 fn test_update_leader_certificate_even() -> Result<()> {
1220 let rng = &mut TestRng::default();
1221
1222 let current_round = 3;
1224
1225 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1227 current_round,
1228 rng,
1229 );
1230
1231 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1233 2,
1234 vec![
1235 certificates[0].author(),
1236 certificates[1].author(),
1237 certificates[2].author(),
1238 certificates[3].author(),
1239 ],
1240 rng,
1241 );
1242
1243 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1245
1246 let transmissions = Arc::new(BFTMemoryService::new());
1248 let storage = Storage::new(ledger.clone(), transmissions, 10, None);
1249 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1250 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1251 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1252 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1253 assert_eq!(storage.current_round(), 2);
1254
1255 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1258
1259 let account = Account::new(rng)?;
1261 let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1262
1263 *bft.leader_certificate.write() = Some(leader_certificate);
1265
1266 let result = bft.update_leader_certificate_to_even_round(2);
1269 assert!(result);
1270
1271 Ok(())
1272 }
1273
1274 #[tokio::test]
1275 #[tracing_test::traced_test]
1276 async fn test_order_dag_with_dfs() -> Result<()> {
1277 let rng = &mut TestRng::default();
1278
1279 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1281
1282 let previous_round = 2; let current_round = previous_round + 1;
1285
1286 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1288 current_round,
1289 rng,
1290 );
1291
1292 {
1296 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1298 let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1300
1301 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1303
1304 for certificate in previous_certificates.clone() {
1306 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1307 }
1308
1309 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1311 assert!(result.is_ok());
1312 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1313 assert_eq!(candidate_certificates.len(), 1);
1314 let expected_certificates = vec![certificate.clone()];
1315 assert_eq!(
1316 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1317 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1318 );
1319 assert_eq!(candidate_certificates, expected_certificates);
1320 }
1321
1322 {
1326 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1328 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1330
1331 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1333
1334 for certificate in previous_certificates.clone() {
1336 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1337 }
1338
1339 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1341 assert!(result.is_ok());
1342 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1343 assert_eq!(candidate_certificates.len(), 5);
1344 let expected_certificates = vec![
1345 previous_certificates[0].clone(),
1346 previous_certificates[1].clone(),
1347 previous_certificates[2].clone(),
1348 previous_certificates[3].clone(),
1349 certificate,
1350 ];
1351 assert_eq!(
1352 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1353 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1354 );
1355 assert_eq!(candidate_certificates, expected_certificates);
1356 }
1357
1358 Ok(())
1359 }
1360
1361 #[test]
1362 #[tracing_test::traced_test]
1363 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1364 let rng = &mut TestRng::default();
1365
1366 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1368 assert_eq!(committee.starting_round(), 1);
1369 assert_eq!(storage.current_round(), 1);
1370 assert_eq!(storage.max_gc_rounds(), 1);
1371
1372 let previous_round = 2; let current_round = previous_round + 1;
1375
1376 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1378 current_round,
1379 rng,
1380 );
1381 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1383
1384 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1388
1389 let error_msg = format!(
1391 "Missing previous certificate {} for round {previous_round}",
1392 crate::helpers::fmt_id(previous_certificate_ids[3]),
1393 );
1394
1395 let result = bft.order_dag_with_dfs::<false>(certificate);
1397 assert!(result.is_err());
1398 assert_eq!(result.unwrap_err().to_string(), error_msg);
1399 Ok(())
1400 }
1401
1402 #[tokio::test]
1403 #[tracing_test::traced_test]
1404 async fn test_bft_gc_on_commit() -> Result<()> {
1405 let rng = &mut TestRng::default();
1406
1407 let max_gc_rounds = 1;
1409 let committee_round = 0;
1410 let commit_round = 2;
1411 let current_round = commit_round + 1;
1412
1413 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1415 current_round,
1416 rng,
1417 );
1418
1419 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1421 committee_round,
1422 vec![
1423 certificates[0].author(),
1424 certificates[1].author(),
1425 certificates[2].author(),
1426 certificates[3].author(),
1427 ],
1428 rng,
1429 );
1430
1431 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1433
1434 let transmissions = Arc::new(BFTMemoryService::new());
1436 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
1437 for certificate in certificates.iter() {
1439 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1440 }
1441
1442 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1445
1446 let account = Account::new(rng)?;
1448 let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1449 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1451
1452 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1454
1455 for certificate in certificates {
1457 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1458 }
1459
1460 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1462
1463 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1465
1466 Ok(())
1467 }
1468
1469 #[tokio::test]
1470 #[tracing_test::traced_test]
1471 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1472 let rng = &mut TestRng::default();
1473
1474 let max_gc_rounds = 1;
1476 let committee_round = 0;
1477 let commit_round = 2;
1478 let current_round = commit_round + 1;
1479
1480 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1482 current_round,
1483 rng,
1484 );
1485
1486 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1488 committee_round,
1489 vec![
1490 certificates[0].author(),
1491 certificates[1].author(),
1492 certificates[2].author(),
1493 certificates[3].author(),
1494 ],
1495 rng,
1496 );
1497
1498 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1500
1501 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1503 for certificate in certificates.iter() {
1505 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1506 }
1507
1508 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1511
1512 let account = Account::new(rng)?;
1514 let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1515
1516 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1518
1519 for certificate in certificates.clone() {
1521 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1522 }
1523
1524 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1526
1527 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1531 let bootup_bft = BFT::new(account, storage_2, false, StorageMode::Development(0), ledger, None)?;
1533
1534 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1536
1537 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1539
1540 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1542 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1543
1544 for certificate in certificates {
1546 let certificate_round = certificate.round();
1547 let certificate_id = certificate.id();
1548 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1550 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1553 }
1554
1555 Ok(())
1556 }
1557
1558 #[tokio::test]
1559 #[tracing_test::traced_test]
1560 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1561 let rng = &mut TestRng::default();
1568 let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1569
1570 let private_keys = vec![
1571 PrivateKey::new(rng_pks).unwrap(),
1572 PrivateKey::new(rng_pks).unwrap(),
1573 PrivateKey::new(rng_pks).unwrap(),
1574 PrivateKey::new(rng_pks).unwrap(),
1575 ];
1576 let address0 = Address::try_from(private_keys[0])?;
1577
1578 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1580 let committee_round = 0;
1581 let commit_round = 2;
1582 let current_round = commit_round + 1;
1583 let next_round = current_round + 1;
1584
1585 let (round_to_certificates_map, committee) = {
1587 let addresses = vec![
1588 Address::try_from(private_keys[0])?,
1589 Address::try_from(private_keys[1])?,
1590 Address::try_from(private_keys[2])?,
1591 Address::try_from(private_keys[3])?,
1592 ];
1593 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1594 committee_round,
1595 addresses,
1596 rng,
1597 );
1598 let mut round_to_certificates_map: IndexMap<
1600 u64,
1601 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1602 > = IndexMap::new();
1603 let mut previous_certificates = IndexSet::with_capacity(4);
1604 for _ in 0..4 {
1606 previous_certificates.insert(sample_batch_certificate(rng));
1607 }
1608 for round in 0..commit_round + 3 {
1609 let mut current_certificates = IndexSet::new();
1610 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1611 IndexSet::new()
1612 } else {
1613 previous_certificates.iter().map(|c| c.id()).collect()
1614 };
1615 let transmission_ids =
1616 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1617 .into_iter()
1618 .collect::<IndexSet<_>>();
1619 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1620 let committee_id = committee.id();
1621 for (i, private_key_1) in private_keys.iter().enumerate() {
1622 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1623 private_key_1,
1624 round,
1625 timestamp,
1626 committee_id,
1627 transmission_ids.clone(),
1628 previous_certificate_ids.clone(),
1629 rng,
1630 )
1631 .unwrap();
1632 let mut signatures = IndexSet::with_capacity(4);
1633 for (j, private_key_2) in private_keys.iter().enumerate() {
1634 if i != j {
1635 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1636 }
1637 }
1638 let certificate =
1639 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1640 current_certificates.insert(certificate);
1641 }
1642 round_to_certificates_map.insert(round, current_certificates.clone());
1644 previous_certificates = current_certificates.clone();
1645 }
1646 (round_to_certificates_map, committee)
1647 };
1648
1649 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1651 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1653 let leader = address0; let next_leader = address0; let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1658 for i in 1..=commit_round {
1659 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1660 if i == commit_round {
1661 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1663 if let Some(c) = leader_certificate {
1664 pre_shutdown_certificates.push(c.clone());
1665 }
1666 continue;
1667 }
1668 pre_shutdown_certificates.extend(certificates);
1669 }
1670 for certificate in pre_shutdown_certificates.iter() {
1671 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1672 }
1673 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1675 Vec::new();
1676 for j in commit_round..=commit_round + 2 {
1677 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1678 post_shutdown_certificates.extend(certificate);
1679 }
1680 for certificate in post_shutdown_certificates.iter() {
1681 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1682 }
1683 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1685 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1686
1687 let account = Account::try_from(private_keys[0])?;
1689 let ledger_dir = default_ledger_dir(1, true, "0");
1690 let storage_mode = amareleo_storage_mode(ledger_dir);
1691 let bft = BFT::new(account.clone(), storage, true, storage_mode.clone(), ledger.clone(), None)?;
1692
1693 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1695
1696 for certificate in pre_shutdown_certificates.clone() {
1698 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1699 }
1700
1701 for certificate in post_shutdown_certificates.clone() {
1703 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1704 }
1705 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1707 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1708 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1709
1710 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1714
1715 let bootup_bft = BFT::new(account, bootup_storage.clone(), true, storage_mode.clone(), ledger.clone(), None)?;
1717
1718 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1720
1721 for certificate in post_shutdown_certificates.iter() {
1723 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1724 }
1725 for certificate in post_shutdown_certificates.clone() {
1726 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1727 }
1728 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1730 let commit_subdag_metadata_bootup =
1731 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1732 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1733 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1734
1735 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1739
1740 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1742 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1743 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1744 assert!(
1745 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1746 );
1747
1748 for certificate in pre_shutdown_certificates.clone() {
1750 let certificate_round = certificate.round();
1751 let certificate_id = certificate.id();
1752 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1754 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1755 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1758 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1759 }
1760
1761 for certificate in committed_certificates_bootup.clone() {
1763 let certificate_round = certificate.round();
1764 let certificate_id = certificate.id();
1765 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1767 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1768 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1771 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1772 }
1773
1774 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1776
1777 Ok(())
1778 }
1779
1780 #[tokio::test]
1781 #[tracing_test::traced_test]
1782 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1783 let rng = &mut TestRng::default();
1790 let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1791
1792 let private_keys = vec![
1793 PrivateKey::new(rng_pks).unwrap(),
1794 PrivateKey::new(rng_pks).unwrap(),
1795 PrivateKey::new(rng_pks).unwrap(),
1796 PrivateKey::new(rng_pks).unwrap(),
1797 ];
1798 let address0 = Address::try_from(private_keys[0])?;
1799
1800 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1802 let committee_round = 0;
1803 let commit_round = 2;
1804 let current_round = commit_round + 1;
1805 let next_round = current_round + 1;
1806
1807 let (round_to_certificates_map, committee) = {
1809 let addresses = vec![
1810 Address::try_from(private_keys[0])?,
1811 Address::try_from(private_keys[1])?,
1812 Address::try_from(private_keys[2])?,
1813 Address::try_from(private_keys[3])?,
1814 ];
1815 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1816 committee_round,
1817 addresses,
1818 rng,
1819 );
1820 let mut round_to_certificates_map: IndexMap<
1822 u64,
1823 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1824 > = IndexMap::new();
1825 let mut previous_certificates = IndexSet::with_capacity(4);
1826 for _ in 0..4 {
1828 previous_certificates.insert(sample_batch_certificate(rng));
1829 }
1830 for round in 0..=commit_round + 2 {
1831 let mut current_certificates = IndexSet::new();
1832 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1833 IndexSet::new()
1834 } else {
1835 previous_certificates.iter().map(|c| c.id()).collect()
1836 };
1837 let transmission_ids =
1838 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1839 .into_iter()
1840 .collect::<IndexSet<_>>();
1841 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1842 let committee_id = committee.id();
1843 for (i, private_key_1) in private_keys.iter().enumerate() {
1844 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1845 private_key_1,
1846 round,
1847 timestamp,
1848 committee_id,
1849 transmission_ids.clone(),
1850 previous_certificate_ids.clone(),
1851 rng,
1852 )
1853 .unwrap();
1854 let mut signatures = IndexSet::with_capacity(4);
1855 for (j, private_key_2) in private_keys.iter().enumerate() {
1856 if i != j {
1857 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1858 }
1859 }
1860 let certificate =
1861 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1862 current_certificates.insert(certificate);
1863 }
1864 round_to_certificates_map.insert(round, current_certificates.clone());
1866 previous_certificates = current_certificates.clone();
1867 }
1868 (round_to_certificates_map, committee)
1869 };
1870
1871 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1873 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1875 let leader = address0; let next_leader = address0; let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1880 for i in 1..=commit_round {
1881 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1882 if i == commit_round {
1883 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1885 if let Some(c) = leader_certificate {
1886 pre_shutdown_certificates.push(c.clone());
1887 }
1888 continue;
1889 }
1890 pre_shutdown_certificates.extend(certificates);
1891 }
1892 for certificate in pre_shutdown_certificates.iter() {
1893 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1894 }
1895 let account = Account::try_from(private_keys[0])?;
1897 let bootup_bft =
1898 BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1899 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1901 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1903
1904 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1906 Vec::new();
1907 for j in commit_round..=commit_round + 2 {
1908 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1909 post_shutdown_certificates.extend(certificate);
1910 }
1911 for certificate in post_shutdown_certificates.iter() {
1912 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1913 }
1914
1915 for certificate in post_shutdown_certificates.clone() {
1917 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1918 }
1919
1920 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1922 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1923 let committed_certificates = commit_subdag.values().flatten();
1924
1925 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1927 for committed_certificate in committed_certificates.clone() {
1928 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1929 }
1930 }
1931 Ok(())
1932 }
1933}