1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 Primary,
19 helpers::{
20 BFTReceiver,
21 ConsensusSender,
22 DAG,
23 PrimaryReceiver,
24 PrimarySender,
25 Storage,
26 fmt_id,
27 init_bft_channels,
28 now,
29 },
30};
31use aleo_std::StorageMode;
32use amareleo_chain_account::Account;
33use amareleo_node_bft_ledger_service::LedgerService;
34use snarkvm::{
35 console::account::Address,
36 ledger::{
37 block::Transaction,
38 committee::Committee,
39 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
40 puzzle::{Solution, SolutionID},
41 },
42 prelude::{Field, Network, Result, bail, ensure},
43};
44
45use colored::Colorize;
46use indexmap::{IndexMap, IndexSet};
47use parking_lot::{Mutex, RwLock};
48use std::{
49 collections::{BTreeMap, HashSet},
50 future::Future,
51 sync::{
52 Arc,
53 atomic::{AtomicI64, Ordering},
54 },
55};
56use tokio::{
57 sync::{Mutex as TMutex, OnceCell, oneshot},
58 task::JoinHandle,
59};
60
61#[derive(Clone)]
62pub struct BFT<N: Network> {
63 primary: Primary<N>,
65 dag: Arc<RwLock<DAG<N>>>,
67 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
69 leader_certificate_timer: Arc<AtomicI64>,
71 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
73 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
75 lock: Arc<TMutex<()>>,
77}
78
79impl<N: Network> BFT<N> {
80 pub fn new(
82 account: Account<N>,
83 storage: Storage<N>,
84 keep_state: bool,
85 storage_mode: StorageMode,
86 ledger: Arc<dyn LedgerService<N>>,
87 ) -> Result<Self> {
88 Ok(Self {
89 primary: Primary::new(account, storage, keep_state, storage_mode, ledger)?,
90 dag: Default::default(),
91 leader_certificate: Default::default(),
92 leader_certificate_timer: Default::default(),
93 consensus_sender: Default::default(),
94 handles: Default::default(),
95 lock: Default::default(),
96 })
97 }
98
99 pub async fn run(
101 &mut self,
102 consensus_sender: Option<ConsensusSender<N>>,
103 primary_sender: PrimarySender<N>,
104 primary_receiver: PrimaryReceiver<N>,
105 ) -> Result<()> {
106 info!("Starting the BFT instance...");
107 let (bft_sender, bft_receiver) = init_bft_channels::<N>();
109 self.start_handlers(bft_receiver);
111 self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await?;
113 if let Some(consensus_sender) = consensus_sender {
116 self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
117 }
118 Ok(())
119 }
120
121 pub fn is_synced(&self) -> bool {
123 self.primary.is_synced()
124 }
125
126 pub const fn primary(&self) -> &Primary<N> {
128 &self.primary
129 }
130
131 pub const fn storage(&self) -> &Storage<N> {
133 self.primary.storage()
134 }
135
136 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
138 self.primary.ledger()
139 }
140
141 pub fn leader(&self) -> Option<Address<N>> {
143 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
144 }
145
146 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
148 &self.leader_certificate
149 }
150}
151
152impl<N: Network> BFT<N> {
153 pub fn num_unconfirmed_transmissions(&self) -> usize {
155 self.primary.num_unconfirmed_transmissions()
156 }
157
158 pub fn num_unconfirmed_ratifications(&self) -> usize {
160 self.primary.num_unconfirmed_ratifications()
161 }
162
163 pub fn num_unconfirmed_solutions(&self) -> usize {
165 self.primary.num_unconfirmed_solutions()
166 }
167
168 pub fn num_unconfirmed_transactions(&self) -> usize {
170 self.primary.num_unconfirmed_transactions()
171 }
172}
173
174impl<N: Network> BFT<N> {
175 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
177 self.primary.worker_transmission_ids()
178 }
179
180 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
182 self.primary.worker_transmissions()
183 }
184
185 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
187 self.primary.worker_solutions()
188 }
189
190 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
192 self.primary.worker_transactions()
193 }
194}
195
196impl<N: Network> BFT<N> {
197 fn update_to_next_round(&self, current_round: u64) -> bool {
199 let storage_round = self.storage().current_round();
201 if current_round < storage_round {
202 debug!(
203 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
204 );
205 return false;
206 }
207
208 let is_ready = match current_round % 2 == 0 {
210 true => self.update_leader_certificate_to_even_round(current_round),
211 false => self.is_leader_quorum_or_nonleaders_available(current_round),
212 };
213
214 #[cfg(feature = "metrics")]
215 {
216 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
217 if start > 0 {
219 let end = now();
220 let elapsed = std::time::Duration::from_secs((end - start) as u64);
221 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
222 }
223 }
224
225 if current_round % 2 == 0 {
227 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
229 if !is_ready {
231 trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
232 }
233 let leader_round = leader_certificate.round();
235 match leader_round == current_round {
236 true => {
237 info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
238 #[cfg(feature = "metrics")]
239 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
240 }
241 false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
242 }
243 } else {
244 match is_ready {
245 true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
246 false => info!("{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed()),
247 }
248 }
249 }
250
251 if is_ready {
253 if let Err(e) = self.storage().increment_to_next_round(current_round) {
255 warn!("BFT failed to increment to the next round from round {current_round} - {e}");
256 return false;
257 }
258 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
260 }
261
262 is_ready
263 }
264
265 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
271 let current_round = self.storage().current_round();
273 if current_round != even_round {
275 warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
276 return false;
277 }
278
279 if current_round % 2 != 0 || current_round < 2 {
281 error!("BFT cannot update the leader certificate in an odd round");
282 return false;
283 }
284
285 let current_certificates = self.storage().get_certificates_for_round(current_round);
287 if current_certificates.is_empty() {
289 *self.leader_certificate.write() = None;
291 return false;
292 }
293
294 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
296 Ok(committee) => committee,
297 Err(e) => {
298 error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
299 return false;
300 }
301 };
302 let leader = match self.ledger().latest_leader() {
304 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
305 _ => {
306 let computed_leader = self.primary.account().address();
308 self.ledger().update_latest_leader(current_round, computed_leader);
318
319 computed_leader
320 }
321 };
322 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
324 *self.leader_certificate.write() = leader_certificate.cloned();
325
326 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
327 }
328
329 fn is_even_round_ready_for_next_round(
333 &self,
334 certificates: IndexSet<BatchCertificate<N>>,
335 committee: Committee<N>,
336 current_round: u64,
337 ) -> bool {
338 let authors = certificates.into_iter().map(|c| c.author()).collect();
340 if !committee.is_quorum_threshold_reached(&authors) {
342 trace!("BFT failed to reach quorum threshold in even round {current_round}");
343 return false;
344 }
345 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
347 if leader_certificate.round() == current_round {
348 return true;
349 }
350 }
351 if self.is_timer_expired() {
353 debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
354 return true;
355 }
356 false
358 }
359
360 fn is_timer_expired(&self) -> bool {
362 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
363 }
364
365 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
370 let current_round = self.storage().current_round();
372 if current_round != odd_round {
374 warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
375 return false;
376 }
377 if current_round % 2 != 1 {
379 error!("BFT does not compute stakes for the leader certificate in an even round");
380 return false;
381 }
382 let current_certificates = self.storage().get_certificates_for_round(current_round);
384 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
386 Ok(committee) => committee,
387 Err(e) => {
388 error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
389 return false;
390 }
391 };
392 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
394 if !committee_lookback.is_quorum_threshold_reached(&authors) {
396 trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
397 return false;
398 }
399 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
401 return true;
403 };
404 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
406 leader_certificate.id(),
407 current_certificates,
408 &committee_lookback,
409 );
410 stake_with_leader >= committee_lookback.availability_threshold()
412 || stake_without_leader >= committee_lookback.quorum_threshold()
413 || self.is_timer_expired()
414 }
415
416 fn compute_stake_for_leader_certificate(
418 &self,
419 leader_certificate_id: Field<N>,
420 current_certificates: IndexSet<BatchCertificate<N>>,
421 current_committee: &Committee<N>,
422 ) -> (u64, u64) {
423 if current_certificates.is_empty() {
425 return (0, 0);
426 }
427
428 let mut stake_with_leader = 0u64;
430 let mut stake_without_leader = 0u64;
432 for certificate in current_certificates {
434 let stake = current_committee.get_stake(certificate.author());
436 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
438 true => stake_with_leader = stake_with_leader.saturating_add(stake),
440 false => stake_without_leader = stake_without_leader.saturating_add(stake),
442 }
443 }
444 (stake_with_leader, stake_without_leader)
446 }
447}
448
449impl<N: Network> BFT<N> {
450 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
452 &self,
453 certificate: BatchCertificate<N>,
454 ) -> Result<()> {
455 let _lock = self.lock.lock().await;
457
458 let certificate_round = certificate.round();
460 self.dag.write().insert(certificate);
462
463 let commit_round = certificate_round.saturating_sub(1);
465 if commit_round % 2 != 0 || commit_round < 2 {
467 return Ok(());
468 }
469 if commit_round <= self.dag.read().last_committed_round() {
471 return Ok(());
472 }
473
474 info!("Checking if the leader is ready to be committed for round {commit_round}...");
476
477 let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
479 bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
480 };
481
482 let leader = match self.ledger().latest_leader() {
484 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
485 _ => {
486 let computed_leader = self.primary.account().address();
488 self.ledger().update_latest_leader(commit_round, computed_leader);
494
495 computed_leader
496 }
497 };
498
499 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
501 else {
502 trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
503 return Ok(());
504 };
505 let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
507 bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
509 };
510 let authors = certificates
512 .values()
513 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
514 true => Some(c.author()),
515 false => None,
516 })
517 .collect();
518 if !committee_lookback.is_availability_threshold_reached(&authors) {
520 trace!("BFT is not ready to commit {commit_round}");
522 return Ok(());
523 }
524
525 info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
527
528 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
530 }
531
532 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
534 &self,
535 leader_certificate: BatchCertificate<N>,
536 ) -> Result<()> {
537 let latest_leader_round = leader_certificate.round();
539 let mut leader_certificates = vec![leader_certificate.clone()];
542 {
543 let leader_round = leader_certificate.round();
545
546 let mut current_certificate = leader_certificate;
547 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
548 {
549 let leader = match self.ledger().latest_leader() {
559 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
560 _ => {
561 let computed_leader = self.primary.account().address();
563 self.ledger().update_latest_leader(round, computed_leader);
572
573 computed_leader
574 }
575 };
576 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
578 else {
579 continue;
580 };
581 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
583 leader_certificates.push(previous_certificate.clone());
585 current_certificate = previous_certificate;
587 }
588 }
589 }
590
591 for leader_certificate in leader_certificates.into_iter().rev() {
593 let leader_round = leader_certificate.round();
595 let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
597 Ok(subdag) => subdag,
598 Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
599 };
600 if !IS_SYNCING {
602 let mut transmissions = IndexMap::new();
604 let mut seen_transaction_ids = IndexSet::new();
606 let mut seen_solution_ids = IndexSet::new();
608 for certificate in commit_subdag.values().flatten() {
610 for transmission_id in certificate.transmission_ids() {
612 match transmission_id {
616 TransmissionID::Solution(solution_id, _) => {
617 if seen_solution_ids.contains(&solution_id) {
619 continue;
620 }
621 }
622 TransmissionID::Transaction(transaction_id, _) => {
623 if seen_transaction_ids.contains(transaction_id) {
625 continue;
626 }
627 }
628 TransmissionID::Ratification => {
629 bail!("Ratifications are currently not supported in the BFT.")
630 }
631 }
632 if transmissions.contains_key(transmission_id) {
634 continue;
635 }
636 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
639 continue;
640 }
641 let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
643 bail!(
644 "BFT failed to retrieve transmission '{}.{}' from round {}",
645 fmt_id(transmission_id),
646 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
647 certificate.round()
648 );
649 };
650 match transmission_id {
652 TransmissionID::Solution(id, _) => {
653 seen_solution_ids.insert(id);
654 }
655 TransmissionID::Transaction(id, _) => {
656 seen_transaction_ids.insert(id);
657 }
658 TransmissionID::Ratification => {}
659 }
660 transmissions.insert(*transmission_id, transmission);
662 }
663 }
664 let subdag = Subdag::from(commit_subdag.clone())?;
667 let anchor_round = subdag.anchor_round();
669 let num_transmissions = transmissions.len();
671 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
673
674 ensure!(
676 anchor_round == leader_round,
677 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
678 );
679
680 if let Some(consensus_sender) = self.consensus_sender.get() {
682 let (callback_sender, callback_receiver) = oneshot::channel();
684 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
686 match callback_receiver.await {
688 Ok(Ok(())) => (), Ok(Err(e)) => {
690 error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
691 return Ok(());
692 }
693 Err(e) => {
694 error!("BFT failed to receive the callback for round {anchor_round} - {e}");
695 return Ok(());
696 }
697 }
698 }
699
700 info!(
701 "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
702 );
703 }
704
705 let mut dag_write = self.dag.write();
707 for certificate in commit_subdag.values().flatten() {
708 dag_write.commit(certificate, self.storage().max_gc_rounds());
709 }
710 }
711
712 self.storage().garbage_collect_certificates(latest_leader_round);
714
715 Ok(())
716 }
717
718 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
720 &self,
721 leader_certificate: BatchCertificate<N>,
722 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
723 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
725 let mut already_ordered = HashSet::new();
727 let mut buffer = vec![leader_certificate];
729 while let Some(certificate) = buffer.pop() {
731 commit.entry(certificate.round()).or_default().insert(certificate.clone());
733
734 let previous_round = certificate.round().saturating_sub(1);
736 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
737 continue;
738 }
739 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
743 if already_ordered.contains(previous_certificate_id) {
745 continue;
746 }
747 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
749 continue;
750 }
751 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
753 continue;
754 }
755
756 let previous_certificate = {
758 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
760 Some(previous_certificate) => previous_certificate,
762 None => match self.storage().get_certificate(*previous_certificate_id) {
764 Some(previous_certificate) => previous_certificate,
766 None => bail!(
768 "Missing previous certificate {} for round {previous_round}",
769 fmt_id(previous_certificate_id)
770 ),
771 },
772 }
773 };
774 already_ordered.insert(previous_certificate.id());
776 buffer.push(previous_certificate);
778 }
779 }
780 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
782 Ok(commit)
784 }
785
786 fn is_linked(
788 &self,
789 previous_certificate: BatchCertificate<N>,
790 current_certificate: BatchCertificate<N>,
791 ) -> Result<bool> {
792 let mut traversal = vec![current_certificate.clone()];
794 for round in (previous_certificate.round()..current_certificate.round()).rev() {
796 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
798 bail!("BFT failed to retrieve the certificates for past round {round}");
801 };
802 traversal = certificates
804 .into_values()
805 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
806 .collect();
807 }
808 Ok(traversal.contains(&previous_certificate))
809 }
810}
811
812impl<N: Network> BFT<N> {
813 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
815 let BFTReceiver {
816 mut rx_primary_round,
817 mut rx_primary_certificate,
818 mut rx_sync_bft_dag_at_bootup,
819 mut rx_sync_bft,
820 } = bft_receiver;
821
822 let self_ = self.clone();
824 self.spawn(async move {
825 while let Some((current_round, callback)) = rx_primary_round.recv().await {
826 callback.send(self_.update_to_next_round(current_round)).ok();
827 }
828 });
829
830 let self_ = self.clone();
832 self.spawn(async move {
833 while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
834 let result = self_.update_dag::<true, false>(certificate).await;
836 callback.send(result).ok();
839 }
840 });
841
842 let self_ = self.clone();
844 self.spawn(async move {
845 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
846 self_.sync_bft_dag_at_bootup(certificates).await;
847 }
848 });
849
850 let self_ = self.clone();
852 self.spawn(async move {
853 while let Some((certificate, callback)) = rx_sync_bft.recv().await {
854 let result = self_.update_dag::<true, true>(certificate).await;
856 callback.send(result).ok();
859 }
860 });
861 }
862
863 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
870 let mut dag = self.dag.write();
872
873 for certificate in certificates {
875 dag.commit(&certificate, self.storage().max_gc_rounds());
876 }
877 }
878
879 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
881 self.handles.lock().push(tokio::spawn(future));
882 }
883
884 pub async fn shut_down(&self) {
886 info!("Shutting down the BFT...");
887 let _lock = self.lock.lock().await;
889 self.primary.shut_down().await;
891 self.handles.lock().iter().for_each(|handle| handle.abort());
893 }
894}
895
896#[cfg(test)]
897mod tests {
898 use crate::{
899 BFT,
900 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
901 helpers::{Storage, amareleo_storage_mode},
902 };
903
904 use amareleo_chain_account::Account;
905 use amareleo_node_bft_ledger_service::MockLedgerService;
906 use amareleo_node_bft_storage_service::BFTMemoryService;
907 use snarkvm::{
908 console::account::{Address, PrivateKey},
909 ledger::{
910 committee::Committee,
911 narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
912 },
913 utilities::TestRng,
914 };
915
916 use aleo_std::StorageMode;
917 use anyhow::Result;
918 use indexmap::{IndexMap, IndexSet};
919 use rand::SeedableRng;
920 use rand_chacha::ChaChaRng;
921 use std::sync::Arc;
922
923 type CurrentNetwork = snarkvm::console::network::MainnetV0;
924
925 fn sample_test_instance(
927 committee_round: Option<u64>,
928 max_gc_rounds: u64,
929 rng: &mut TestRng,
930 ) -> (
931 Committee<CurrentNetwork>,
932 Account<CurrentNetwork>,
933 Arc<MockLedgerService<CurrentNetwork>>,
934 Storage<CurrentNetwork>,
935 ) {
936 let committee = match committee_round {
937 Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
938 None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
939 };
940 let account = Account::new(rng).unwrap();
941 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
942 let transmissions = Arc::new(BFTMemoryService::new());
943 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
944
945 (committee, account, ledger, storage)
946 }
947
948 #[test]
949 #[tracing_test::traced_test]
950 fn test_is_leader_quorum_odd() -> Result<()> {
951 let rng = &mut TestRng::default();
952
953 let mut certificates = IndexSet::new();
955 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
956 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
957 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
958 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
959
960 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
962 1,
963 vec![
964 certificates[0].author(),
965 certificates[1].author(),
966 certificates[2].author(),
967 certificates[3].author(),
968 ],
969 rng,
970 );
971
972 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
974 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
976 let account = Account::new(rng)?;
978 let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone())?;
980 assert!(bft.is_timer_expired());
981 let result = bft.is_leader_quorum_or_nonleaders_available(1);
983 assert!(!result);
985 for certificate in certificates.iter() {
987 storage.testing_only_insert_certificate_testing_only(certificate.clone());
988 }
989 let result = bft.is_leader_quorum_or_nonleaders_available(1);
991 assert!(result); let leader_certificate = sample_batch_certificate(rng);
994 *bft.leader_certificate.write() = Some(leader_certificate);
995 let result = bft.is_leader_quorum_or_nonleaders_available(1);
997 assert!(result); Ok(())
1000 }
1001
1002 #[test]
1003 #[tracing_test::traced_test]
1004 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1005 let rng = &mut TestRng::default();
1006
1007 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1009 assert_eq!(committee.starting_round(), 1);
1010 assert_eq!(storage.current_round(), 1);
1011 assert_eq!(storage.max_gc_rounds(), 10);
1012
1013 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1015 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1020 assert!(!result);
1021 Ok(())
1022 }
1023
1024 #[test]
1025 #[tracing_test::traced_test]
1026 fn test_is_leader_quorum_even() -> Result<()> {
1027 let rng = &mut TestRng::default();
1028
1029 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1031 assert_eq!(committee.starting_round(), 2);
1032 assert_eq!(storage.current_round(), 2);
1033 assert_eq!(storage.max_gc_rounds(), 10);
1034
1035 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1037 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1041 assert!(!result);
1042 Ok(())
1043 }
1044
1045 #[test]
1046 #[tracing_test::traced_test]
1047 fn test_is_even_round_ready() -> Result<()> {
1048 let rng = &mut TestRng::default();
1049
1050 let mut certificates = IndexSet::new();
1052 certificates.insert(sample_batch_certificate_for_round(2, rng));
1053 certificates.insert(sample_batch_certificate_for_round(2, rng));
1054 certificates.insert(sample_batch_certificate_for_round(2, rng));
1055 certificates.insert(sample_batch_certificate_for_round(2, rng));
1056
1057 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1059 2,
1060 vec![
1061 certificates[0].author(),
1062 certificates[1].author(),
1063 certificates[2].author(),
1064 certificates[3].author(),
1065 ],
1066 rng,
1067 );
1068
1069 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1071 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1073 let account = Account::new(rng)?;
1075 let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone())?;
1077 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1079 *bft.leader_certificate.write() = Some(leader_certificate);
1080 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1081 assert!(!result);
1083 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1085 assert!(result);
1086
1087 let bft_timer = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone())?;
1089 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1091 if !bft_timer.is_timer_expired() {
1092 assert!(!result);
1093 }
1094 let leader_certificate_timeout =
1096 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1097 std::thread::sleep(leader_certificate_timeout);
1098 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1100 if bft_timer.is_timer_expired() {
1101 assert!(result);
1102 } else {
1103 assert!(!result);
1104 }
1105
1106 Ok(())
1107 }
1108
1109 #[test]
1110 #[tracing_test::traced_test]
1111 fn test_update_leader_certificate_odd() -> Result<()> {
1112 let rng = &mut TestRng::default();
1113
1114 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1116 assert_eq!(storage.max_gc_rounds(), 10);
1117
1118 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1120
1121 let result = bft.update_leader_certificate_to_even_round(1);
1123 assert!(!result);
1124 Ok(())
1125 }
1126
1127 #[test]
1128 #[tracing_test::traced_test]
1129 fn test_update_leader_certificate_bad_round() -> Result<()> {
1130 let rng = &mut TestRng::default();
1131
1132 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1134 assert_eq!(storage.max_gc_rounds(), 10);
1135
1136 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1138
1139 let result = bft.update_leader_certificate_to_even_round(6);
1141 assert!(!result);
1142 Ok(())
1143 }
1144
1145 #[test]
1146 #[tracing_test::traced_test]
1147 fn test_update_leader_certificate_even() -> Result<()> {
1148 let rng = &mut TestRng::default();
1149
1150 let current_round = 3;
1152
1153 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1155 current_round,
1156 rng,
1157 );
1158
1159 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1161 2,
1162 vec![
1163 certificates[0].author(),
1164 certificates[1].author(),
1165 certificates[2].author(),
1166 certificates[3].author(),
1167 ],
1168 rng,
1169 );
1170
1171 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1173
1174 let transmissions = Arc::new(BFTMemoryService::new());
1176 let storage = Storage::new(ledger.clone(), transmissions, 10);
1177 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1178 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1179 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1180 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1181 assert_eq!(storage.current_round(), 2);
1182
1183 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1186
1187 let account = Account::new(rng)?;
1189 let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger)?;
1190
1191 *bft.leader_certificate.write() = Some(leader_certificate);
1193
1194 let result = bft.update_leader_certificate_to_even_round(2);
1197 assert!(result);
1198
1199 Ok(())
1200 }
1201
1202 #[tokio::test]
1203 #[tracing_test::traced_test]
1204 async fn test_order_dag_with_dfs() -> Result<()> {
1205 let rng = &mut TestRng::default();
1206
1207 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1209
1210 let previous_round = 2; let current_round = previous_round + 1;
1213
1214 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1216 current_round,
1217 rng,
1218 );
1219
1220 {
1224 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1226 let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone())?;
1228
1229 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1231
1232 for certificate in previous_certificates.clone() {
1234 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1235 }
1236
1237 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1239 assert!(result.is_ok());
1240 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1241 assert_eq!(candidate_certificates.len(), 1);
1242 let expected_certificates = vec![certificate.clone()];
1243 assert_eq!(
1244 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1245 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1246 );
1247 assert_eq!(candidate_certificates, expected_certificates);
1248 }
1249
1250 {
1254 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1256 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1258
1259 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1261
1262 for certificate in previous_certificates.clone() {
1264 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1265 }
1266
1267 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1269 assert!(result.is_ok());
1270 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1271 assert_eq!(candidate_certificates.len(), 5);
1272 let expected_certificates = vec![
1273 previous_certificates[0].clone(),
1274 previous_certificates[1].clone(),
1275 previous_certificates[2].clone(),
1276 previous_certificates[3].clone(),
1277 certificate,
1278 ];
1279 assert_eq!(
1280 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1281 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1282 );
1283 assert_eq!(candidate_certificates, expected_certificates);
1284 }
1285
1286 Ok(())
1287 }
1288
1289 #[test]
1290 #[tracing_test::traced_test]
1291 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1292 let rng = &mut TestRng::default();
1293
1294 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1296 assert_eq!(committee.starting_round(), 1);
1297 assert_eq!(storage.current_round(), 1);
1298 assert_eq!(storage.max_gc_rounds(), 1);
1299
1300 let previous_round = 2; let current_round = previous_round + 1;
1303
1304 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1306 current_round,
1307 rng,
1308 );
1309 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1311
1312 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1316
1317 let error_msg = format!(
1319 "Missing previous certificate {} for round {previous_round}",
1320 crate::helpers::fmt_id(previous_certificate_ids[3]),
1321 );
1322
1323 let result = bft.order_dag_with_dfs::<false>(certificate);
1325 assert!(result.is_err());
1326 assert_eq!(result.unwrap_err().to_string(), error_msg);
1327 Ok(())
1328 }
1329
1330 #[tokio::test]
1331 #[tracing_test::traced_test]
1332 async fn test_bft_gc_on_commit() -> Result<()> {
1333 let rng = &mut TestRng::default();
1334
1335 let max_gc_rounds = 1;
1337 let committee_round = 0;
1338 let commit_round = 2;
1339 let current_round = commit_round + 1;
1340
1341 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1343 current_round,
1344 rng,
1345 );
1346
1347 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1349 committee_round,
1350 vec![
1351 certificates[0].author(),
1352 certificates[1].author(),
1353 certificates[2].author(),
1354 certificates[3].author(),
1355 ],
1356 rng,
1357 );
1358
1359 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1361
1362 let transmissions = Arc::new(BFTMemoryService::new());
1364 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1365 for certificate in certificates.iter() {
1367 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1368 }
1369
1370 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1373
1374 let account = Account::new(rng)?;
1376 let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger)?;
1377 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1379
1380 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1382
1383 for certificate in certificates {
1385 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1386 }
1387
1388 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1390
1391 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1393
1394 Ok(())
1395 }
1396
1397 #[tokio::test]
1398 #[tracing_test::traced_test]
1399 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1400 let rng = &mut TestRng::default();
1401
1402 let max_gc_rounds = 1;
1404 let committee_round = 0;
1405 let commit_round = 2;
1406 let current_round = commit_round + 1;
1407
1408 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1410 current_round,
1411 rng,
1412 );
1413
1414 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1416 committee_round,
1417 vec![
1418 certificates[0].author(),
1419 certificates[1].author(),
1420 certificates[2].author(),
1421 certificates[3].author(),
1422 ],
1423 rng,
1424 );
1425
1426 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1428
1429 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1431 for certificate in certificates.iter() {
1433 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1434 }
1435
1436 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1439
1440 let account = Account::new(rng)?;
1442 let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone())?;
1443
1444 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1446
1447 for certificate in certificates.clone() {
1449 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1450 }
1451
1452 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1454
1455 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1459 let bootup_bft = BFT::new(account, storage_2, false, StorageMode::Development(0), ledger)?;
1461
1462 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1464
1465 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1467
1468 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1470 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1471
1472 for certificate in certificates {
1474 let certificate_round = certificate.round();
1475 let certificate_id = certificate.id();
1476 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1478 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1481 }
1482
1483 Ok(())
1484 }
1485
1486 #[tokio::test]
1487 #[tracing_test::traced_test]
1488 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1489 let rng = &mut TestRng::default();
1496 let rng_pks = &mut ChaChaRng::seed_from_u64(1234567890u64);
1497
1498 let private_keys = vec![
1499 PrivateKey::new(rng_pks).unwrap(),
1500 PrivateKey::new(rng_pks).unwrap(),
1501 PrivateKey::new(rng_pks).unwrap(),
1502 PrivateKey::new(rng_pks).unwrap(),
1503 ];
1504 let address0 = Address::try_from(private_keys[0])?;
1505
1506 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1508 let committee_round = 0;
1509 let commit_round = 2;
1510 let current_round = commit_round + 1;
1511 let next_round = current_round + 1;
1512
1513 let (round_to_certificates_map, committee) = {
1515 let addresses = vec![
1516 Address::try_from(private_keys[0])?,
1517 Address::try_from(private_keys[1])?,
1518 Address::try_from(private_keys[2])?,
1519 Address::try_from(private_keys[3])?,
1520 ];
1521 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1522 committee_round,
1523 addresses,
1524 rng,
1525 );
1526 let mut round_to_certificates_map: IndexMap<
1528 u64,
1529 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1530 > = IndexMap::new();
1531 let mut previous_certificates = IndexSet::with_capacity(4);
1532 for _ in 0..4 {
1534 previous_certificates.insert(sample_batch_certificate(rng));
1535 }
1536 for round in 0..commit_round + 3 {
1537 let mut current_certificates = IndexSet::new();
1538 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1539 IndexSet::new()
1540 } else {
1541 previous_certificates.iter().map(|c| c.id()).collect()
1542 };
1543 let transmission_ids =
1544 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1545 .into_iter()
1546 .collect::<IndexSet<_>>();
1547 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1548 let committee_id = committee.id();
1549 for (i, private_key_1) in private_keys.iter().enumerate() {
1550 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1551 private_key_1,
1552 round,
1553 timestamp,
1554 committee_id,
1555 transmission_ids.clone(),
1556 previous_certificate_ids.clone(),
1557 rng,
1558 )
1559 .unwrap();
1560 let mut signatures = IndexSet::with_capacity(4);
1561 for (j, private_key_2) in private_keys.iter().enumerate() {
1562 if i != j {
1563 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1564 }
1565 }
1566 let certificate =
1567 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1568 current_certificates.insert(certificate);
1569 }
1570 round_to_certificates_map.insert(round, current_certificates.clone());
1572 previous_certificates = current_certificates.clone();
1573 }
1574 (round_to_certificates_map, committee)
1575 };
1576
1577 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1579 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1581 let leader = address0; let next_leader = address0; let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1586 for i in 1..=commit_round {
1587 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1588 if i == commit_round {
1589 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1591 if let Some(c) = leader_certificate {
1592 pre_shutdown_certificates.push(c.clone());
1593 }
1594 continue;
1595 }
1596 pre_shutdown_certificates.extend(certificates);
1597 }
1598 for certificate in pre_shutdown_certificates.iter() {
1599 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1600 }
1601 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1603 Vec::new();
1604 for j in commit_round..=commit_round + 2 {
1605 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1606 post_shutdown_certificates.extend(certificate);
1607 }
1608 for certificate in post_shutdown_certificates.iter() {
1609 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1610 }
1611 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1613 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1614
1615 let account = Account::try_from(private_keys[0])?;
1617 let bft = BFT::new(account.clone(), storage, true, amareleo_storage_mode(1, true, None), ledger.clone())?;
1618
1619 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1621
1622 for certificate in pre_shutdown_certificates.clone() {
1624 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1625 }
1626
1627 for certificate in post_shutdown_certificates.clone() {
1629 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1630 }
1631 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1633 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1634 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1635
1636 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1640
1641 let bootup_bft =
1643 BFT::new(account, bootup_storage.clone(), true, amareleo_storage_mode(1, true, None), ledger.clone())?;
1644
1645 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1647
1648 for certificate in post_shutdown_certificates.iter() {
1650 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1651 }
1652 for certificate in post_shutdown_certificates.clone() {
1653 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1654 }
1655 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1657 let commit_subdag_metadata_bootup =
1658 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1659 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1660 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1661
1662 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1666
1667 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1669 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1670 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1671 assert!(
1672 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1673 );
1674
1675 for certificate in pre_shutdown_certificates.clone() {
1677 let certificate_round = certificate.round();
1678 let certificate_id = certificate.id();
1679 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1681 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1682 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1685 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1686 }
1687
1688 for certificate in committed_certificates_bootup.clone() {
1690 let certificate_round = certificate.round();
1691 let certificate_id = certificate.id();
1692 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1694 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1695 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1698 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1699 }
1700
1701 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1703
1704 Ok(())
1705 }
1706
1707 #[tokio::test]
1708 #[tracing_test::traced_test]
1709 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1710 let rng = &mut TestRng::default();
1717 let rng_pks = &mut ChaChaRng::seed_from_u64(1234567890u64);
1718
1719 let private_keys = vec![
1720 PrivateKey::new(rng_pks).unwrap(),
1721 PrivateKey::new(rng_pks).unwrap(),
1722 PrivateKey::new(rng_pks).unwrap(),
1723 PrivateKey::new(rng_pks).unwrap(),
1724 ];
1725 let address0 = Address::try_from(private_keys[0])?;
1726
1727 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1729 let committee_round = 0;
1730 let commit_round = 2;
1731 let current_round = commit_round + 1;
1732 let next_round = current_round + 1;
1733
1734 let (round_to_certificates_map, committee) = {
1736 let addresses = vec![
1737 Address::try_from(private_keys[0])?,
1738 Address::try_from(private_keys[1])?,
1739 Address::try_from(private_keys[2])?,
1740 Address::try_from(private_keys[3])?,
1741 ];
1742 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1743 committee_round,
1744 addresses,
1745 rng,
1746 );
1747 let mut round_to_certificates_map: IndexMap<
1749 u64,
1750 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1751 > = IndexMap::new();
1752 let mut previous_certificates = IndexSet::with_capacity(4);
1753 for _ in 0..4 {
1755 previous_certificates.insert(sample_batch_certificate(rng));
1756 }
1757 for round in 0..=commit_round + 2 {
1758 let mut current_certificates = IndexSet::new();
1759 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1760 IndexSet::new()
1761 } else {
1762 previous_certificates.iter().map(|c| c.id()).collect()
1763 };
1764 let transmission_ids =
1765 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1766 .into_iter()
1767 .collect::<IndexSet<_>>();
1768 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1769 let committee_id = committee.id();
1770 for (i, private_key_1) in private_keys.iter().enumerate() {
1771 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1772 private_key_1,
1773 round,
1774 timestamp,
1775 committee_id,
1776 transmission_ids.clone(),
1777 previous_certificate_ids.clone(),
1778 rng,
1779 )
1780 .unwrap();
1781 let mut signatures = IndexSet::with_capacity(4);
1782 for (j, private_key_2) in private_keys.iter().enumerate() {
1783 if i != j {
1784 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1785 }
1786 }
1787 let certificate =
1788 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1789 current_certificates.insert(certificate);
1790 }
1791 round_to_certificates_map.insert(round, current_certificates.clone());
1793 previous_certificates = current_certificates.clone();
1794 }
1795 (round_to_certificates_map, committee)
1796 };
1797
1798 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1800 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1802 let leader = address0; let next_leader = address0; let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1807 for i in 1..=commit_round {
1808 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1809 if i == commit_round {
1810 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1812 if let Some(c) = leader_certificate {
1813 pre_shutdown_certificates.push(c.clone());
1814 }
1815 continue;
1816 }
1817 pre_shutdown_certificates.extend(certificates);
1818 }
1819 for certificate in pre_shutdown_certificates.iter() {
1820 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1821 }
1822 let account = Account::try_from(private_keys[0])?;
1824 let bootup_bft =
1825 BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone())?;
1826 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1828 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1830
1831 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1833 Vec::new();
1834 for j in commit_round..=commit_round + 2 {
1835 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1836 post_shutdown_certificates.extend(certificate);
1837 }
1838 for certificate in post_shutdown_certificates.iter() {
1839 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1840 }
1841
1842 for certificate in post_shutdown_certificates.clone() {
1844 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1845 }
1846
1847 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1849 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1850 let committed_certificates = commit_subdag.values().flatten();
1851
1852 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1854 for committed_certificate in committed_certificates.clone() {
1855 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1856 }
1857 }
1858 Ok(())
1859 }
1860}