1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 Primary,
19 helpers::{
20 BFTReceiver,
21 ConsensusSender,
22 DAG,
23 PrimaryReceiver,
24 PrimarySender,
25 Storage,
26 fmt_id,
27 init_bft_channels,
28 now,
29 },
30};
31use aleo_std::StorageMode;
32use amareleo_chain_account::Account;
33use amareleo_chain_tracing::TracingHandler;
34use amareleo_node_bft_ledger_service::LedgerService;
35use snarkvm::{
36 console::account::Address,
37 ledger::{
38 block::Transaction,
39 committee::Committee,
40 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
41 puzzle::{Solution, SolutionID},
42 },
43 prelude::{Field, Network, Result, bail, ensure},
44};
45
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48use parking_lot::{Mutex, RwLock};
49use std::{
50 collections::{BTreeMap, HashSet},
51 future::Future,
52 sync::{
53 Arc,
54 atomic::{AtomicI64, Ordering},
55 },
56};
57use tokio::{
58 sync::{Mutex as TMutex, OnceCell, oneshot},
59 task::JoinHandle,
60};
61use tracing::subscriber::DefaultGuard;
62
63#[derive(Clone)]
64pub struct BFT<N: Network> {
65 primary: Primary<N>,
67 dag: Arc<RwLock<DAG<N>>>,
69 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
71 leader_certificate_timer: Arc<AtomicI64>,
73 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
75 tracing: Option<TracingHandler>,
77 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
79 lock: Arc<TMutex<()>>,
81}
82
83impl<N: Network> BFT<N> {
84 pub fn new(
86 account: Account<N>,
87 storage: Storage<N>,
88 keep_state: bool,
89 storage_mode: StorageMode,
90 ledger: Arc<dyn LedgerService<N>>,
91 tracing: Option<TracingHandler>,
92 ) -> Result<Self> {
93 Ok(Self {
94 primary: Primary::new(account, storage, keep_state, storage_mode, ledger, tracing.clone())?,
95 dag: Default::default(),
96 leader_certificate: Default::default(),
97 leader_certificate_timer: Default::default(),
98 consensus_sender: Default::default(),
99 tracing: tracing.clone(),
100 handles: Default::default(),
101 lock: Default::default(),
102 })
103 }
104
105 pub async fn run(
107 &mut self,
108 consensus_sender: Option<ConsensusSender<N>>,
109 primary_sender: PrimarySender<N>,
110 primary_receiver: PrimaryReceiver<N>,
111 ) -> Result<()> {
112 let _guard = self.get_tracing_guard();
113 info!("Starting the BFT instance...");
114 let (bft_sender, bft_receiver) = init_bft_channels::<N>();
116 self.start_handlers(bft_receiver);
118 self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await?;
120 if let Some(consensus_sender) = consensus_sender {
123 self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
124 }
125 Ok(())
126 }
127
128 pub fn is_synced(&self) -> bool {
130 self.primary.is_synced()
131 }
132
133 pub const fn primary(&self) -> &Primary<N> {
135 &self.primary
136 }
137
138 pub const fn storage(&self) -> &Storage<N> {
140 self.primary.storage()
141 }
142
143 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
145 self.primary.ledger()
146 }
147
148 pub fn leader(&self) -> Option<Address<N>> {
150 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
151 }
152
153 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
155 &self.leader_certificate
156 }
157
158 pub fn get_tracing_guard(&self) -> Option<DefaultGuard> {
160 self.tracing.clone().map(|trace_handle| trace_handle.subscribe_thread())
161 }
162}
163
164impl<N: Network> BFT<N> {
165 pub fn num_unconfirmed_transmissions(&self) -> usize {
167 self.primary.num_unconfirmed_transmissions()
168 }
169
170 pub fn num_unconfirmed_ratifications(&self) -> usize {
172 self.primary.num_unconfirmed_ratifications()
173 }
174
175 pub fn num_unconfirmed_solutions(&self) -> usize {
177 self.primary.num_unconfirmed_solutions()
178 }
179
180 pub fn num_unconfirmed_transactions(&self) -> usize {
182 self.primary.num_unconfirmed_transactions()
183 }
184}
185
186impl<N: Network> BFT<N> {
187 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
189 self.primary.worker_transmission_ids()
190 }
191
192 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
194 self.primary.worker_transmissions()
195 }
196
197 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
199 self.primary.worker_solutions()
200 }
201
202 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
204 self.primary.worker_transactions()
205 }
206}
207
208impl<N: Network> BFT<N> {
209 fn update_to_next_round(&self, current_round: u64) -> bool {
211 let _guard = self.get_tracing_guard();
212
213 let storage_round = self.storage().current_round();
215 if current_round < storage_round {
216 debug!(
217 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
218 );
219 return false;
220 }
221
222 let is_ready = match current_round % 2 == 0 {
224 true => self.update_leader_certificate_to_even_round(current_round),
225 false => self.is_leader_quorum_or_nonleaders_available(current_round),
226 };
227
228 #[cfg(feature = "metrics")]
229 {
230 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
231 if start > 0 {
233 let end = now();
234 let elapsed = std::time::Duration::from_secs((end - start) as u64);
235 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
236 }
237 }
238
239 if current_round % 2 == 0 {
241 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
243 if !is_ready {
245 trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
246 }
247 let leader_round = leader_certificate.round();
249 match leader_round == current_round {
250 true => {
251 info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
252 #[cfg(feature = "metrics")]
253 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
254 }
255 false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
256 }
257 } else {
258 match is_ready {
259 true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
260 false => info!("{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed()),
261 }
262 }
263 }
264
265 if is_ready {
267 if let Err(e) = self.storage().increment_to_next_round(current_round) {
269 warn!("BFT failed to increment to the next round from round {current_round} - {e}");
270 return false;
271 }
272 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
274 }
275
276 is_ready
277 }
278
279 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
285 let _guard = self.get_tracing_guard();
286 let current_round = self.storage().current_round();
288 if current_round != even_round {
290 warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
291 return false;
292 }
293
294 if current_round % 2 != 0 || current_round < 2 {
296 error!("BFT cannot update the leader certificate in an odd round");
297 return false;
298 }
299
300 let current_certificates = self.storage().get_certificates_for_round(current_round);
302 if current_certificates.is_empty() {
304 *self.leader_certificate.write() = None;
306 return false;
307 }
308
309 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
311 Ok(committee) => committee,
312 Err(e) => {
313 error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
314 return false;
315 }
316 };
317 let leader = match self.ledger().latest_leader() {
319 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
320 _ => {
321 let computed_leader = self.primary.account().address();
323 self.ledger().update_latest_leader(current_round, computed_leader);
333
334 computed_leader
335 }
336 };
337 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
339 *self.leader_certificate.write() = leader_certificate.cloned();
340
341 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
342 }
343
344 fn is_even_round_ready_for_next_round(
348 &self,
349 certificates: IndexSet<BatchCertificate<N>>,
350 committee: Committee<N>,
351 current_round: u64,
352 ) -> bool {
353 let _guard = self.get_tracing_guard();
354
355 let authors = certificates.into_iter().map(|c| c.author()).collect();
357 if !committee.is_quorum_threshold_reached(&authors) {
359 trace!("BFT failed to reach quorum threshold in even round {current_round}");
360 return false;
361 }
362 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
364 if leader_certificate.round() == current_round {
365 return true;
366 }
367 }
368 if self.is_timer_expired() {
370 debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
371 return true;
372 }
373 false
375 }
376
377 fn is_timer_expired(&self) -> bool {
379 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
380 }
381
382 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
387 let _guard = self.get_tracing_guard();
388
389 let current_round = self.storage().current_round();
391 if current_round != odd_round {
393 warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
394 return false;
395 }
396 if current_round % 2 != 1 {
398 error!("BFT does not compute stakes for the leader certificate in an even round");
399 return false;
400 }
401 let current_certificates = self.storage().get_certificates_for_round(current_round);
403 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
405 Ok(committee) => committee,
406 Err(e) => {
407 error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
408 return false;
409 }
410 };
411 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
413 if !committee_lookback.is_quorum_threshold_reached(&authors) {
415 trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
416 return false;
417 }
418 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
420 return true;
422 };
423 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
425 leader_certificate.id(),
426 current_certificates,
427 &committee_lookback,
428 );
429 stake_with_leader >= committee_lookback.availability_threshold()
431 || stake_without_leader >= committee_lookback.quorum_threshold()
432 || self.is_timer_expired()
433 }
434
435 fn compute_stake_for_leader_certificate(
437 &self,
438 leader_certificate_id: Field<N>,
439 current_certificates: IndexSet<BatchCertificate<N>>,
440 current_committee: &Committee<N>,
441 ) -> (u64, u64) {
442 if current_certificates.is_empty() {
444 return (0, 0);
445 }
446
447 let mut stake_with_leader = 0u64;
449 let mut stake_without_leader = 0u64;
451 for certificate in current_certificates {
453 let stake = current_committee.get_stake(certificate.author());
455 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
457 true => stake_with_leader = stake_with_leader.saturating_add(stake),
459 false => stake_without_leader = stake_without_leader.saturating_add(stake),
461 }
462 }
463 (stake_with_leader, stake_without_leader)
465 }
466}
467
468impl<N: Network> BFT<N> {
469 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
471 &self,
472 certificate: BatchCertificate<N>,
473 ) -> Result<()> {
474 let _lock = self.lock.lock().await;
476 let _guard = self.get_tracing_guard();
477
478 let certificate_round = certificate.round();
480 self.dag.write().insert(certificate, self.tracing.clone());
482
483 let commit_round = certificate_round.saturating_sub(1);
485 if commit_round % 2 != 0 || commit_round < 2 {
487 return Ok(());
488 }
489 if commit_round <= self.dag.read().last_committed_round() {
491 return Ok(());
492 }
493
494 info!("Checking if the leader is ready to be committed for round {commit_round}...");
496
497 let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
499 bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
500 };
501
502 let leader = match self.ledger().latest_leader() {
504 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
505 _ => {
506 let computed_leader = self.primary.account().address();
508 self.ledger().update_latest_leader(commit_round, computed_leader);
514
515 computed_leader
516 }
517 };
518
519 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
521 else {
522 trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
523 return Ok(());
524 };
525 let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
527 bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
529 };
530 let authors = certificates
532 .values()
533 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
534 true => Some(c.author()),
535 false => None,
536 })
537 .collect();
538 if !committee_lookback.is_availability_threshold_reached(&authors) {
540 trace!("BFT is not ready to commit {commit_round}");
542 return Ok(());
543 }
544
545 info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
547
548 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
550 }
551
552 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
554 &self,
555 leader_certificate: BatchCertificate<N>,
556 ) -> Result<()> {
557 let _guard = self.get_tracing_guard();
558
559 let latest_leader_round = leader_certificate.round();
561 let mut leader_certificates = vec![leader_certificate.clone()];
564 {
565 let leader_round = leader_certificate.round();
567
568 let mut current_certificate = leader_certificate;
569 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
570 {
571 let leader = match self.ledger().latest_leader() {
581 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
582 _ => {
583 let computed_leader = self.primary.account().address();
585 self.ledger().update_latest_leader(round, computed_leader);
594
595 computed_leader
596 }
597 };
598 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
600 else {
601 continue;
602 };
603 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
605 leader_certificates.push(previous_certificate.clone());
607 current_certificate = previous_certificate;
609 }
610 }
611 }
612
613 for leader_certificate in leader_certificates.into_iter().rev() {
615 let leader_round = leader_certificate.round();
617 let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
619 Ok(subdag) => subdag,
620 Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
621 };
622 if !IS_SYNCING {
624 let mut transmissions = IndexMap::new();
626 let mut seen_transaction_ids = IndexSet::new();
628 let mut seen_solution_ids = IndexSet::new();
630 for certificate in commit_subdag.values().flatten() {
632 for transmission_id in certificate.transmission_ids() {
634 match transmission_id {
638 TransmissionID::Solution(solution_id, _) => {
639 if seen_solution_ids.contains(&solution_id) {
641 continue;
642 }
643 }
644 TransmissionID::Transaction(transaction_id, _) => {
645 if seen_transaction_ids.contains(transaction_id) {
647 continue;
648 }
649 }
650 TransmissionID::Ratification => {
651 bail!("Ratifications are currently not supported in the BFT.")
652 }
653 }
654 if transmissions.contains_key(transmission_id) {
656 continue;
657 }
658 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
661 continue;
662 }
663 let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
665 bail!(
666 "BFT failed to retrieve transmission '{}.{}' from round {}",
667 fmt_id(transmission_id),
668 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
669 certificate.round()
670 );
671 };
672 match transmission_id {
674 TransmissionID::Solution(id, _) => {
675 seen_solution_ids.insert(id);
676 }
677 TransmissionID::Transaction(id, _) => {
678 seen_transaction_ids.insert(id);
679 }
680 TransmissionID::Ratification => {}
681 }
682 transmissions.insert(*transmission_id, transmission);
684 }
685 }
686 let subdag = Subdag::from(commit_subdag.clone())?;
689 let anchor_round = subdag.anchor_round();
691 let num_transmissions = transmissions.len();
693 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
695
696 ensure!(
698 anchor_round == leader_round,
699 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
700 );
701
702 if let Some(consensus_sender) = self.consensus_sender.get() {
704 let (callback_sender, callback_receiver) = oneshot::channel();
706 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
708 match callback_receiver.await {
710 Ok(Ok(())) => (), Ok(Err(e)) => {
712 error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
713 return Ok(());
714 }
715 Err(e) => {
716 error!("BFT failed to receive the callback for round {anchor_round} - {e}");
717 return Ok(());
718 }
719 }
720 }
721
722 info!(
723 "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
724 );
725 }
726
727 let mut dag_write = self.dag.write();
729 for certificate in commit_subdag.values().flatten() {
730 dag_write.commit(certificate, self.storage().max_gc_rounds());
731 }
732 }
733
734 self.storage().garbage_collect_certificates(latest_leader_round);
736
737 Ok(())
738 }
739
740 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
742 &self,
743 leader_certificate: BatchCertificate<N>,
744 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
745 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
747 let mut already_ordered = HashSet::new();
749 let mut buffer = vec![leader_certificate];
751 while let Some(certificate) = buffer.pop() {
753 commit.entry(certificate.round()).or_default().insert(certificate.clone());
755
756 let previous_round = certificate.round().saturating_sub(1);
758 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
759 continue;
760 }
761 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
765 if already_ordered.contains(previous_certificate_id) {
767 continue;
768 }
769 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
771 continue;
772 }
773 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
775 continue;
776 }
777
778 let previous_certificate = {
780 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
782 Some(previous_certificate) => previous_certificate,
784 None => match self.storage().get_certificate(*previous_certificate_id) {
786 Some(previous_certificate) => previous_certificate,
788 None => bail!(
790 "Missing previous certificate {} for round {previous_round}",
791 fmt_id(previous_certificate_id)
792 ),
793 },
794 }
795 };
796 already_ordered.insert(previous_certificate.id());
798 buffer.push(previous_certificate);
800 }
801 }
802 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
804 Ok(commit)
806 }
807
808 fn is_linked(
810 &self,
811 previous_certificate: BatchCertificate<N>,
812 current_certificate: BatchCertificate<N>,
813 ) -> Result<bool> {
814 let mut traversal = vec![current_certificate.clone()];
816 for round in (previous_certificate.round()..current_certificate.round()).rev() {
818 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
820 bail!("BFT failed to retrieve the certificates for past round {round}");
823 };
824 traversal = certificates
826 .into_values()
827 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
828 .collect();
829 }
830 Ok(traversal.contains(&previous_certificate))
831 }
832}
833
834impl<N: Network> BFT<N> {
835 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
837 let BFTReceiver {
838 mut rx_primary_round,
839 mut rx_primary_certificate,
840 mut rx_sync_bft_dag_at_bootup,
841 mut rx_sync_bft,
842 } = bft_receiver;
843
844 let self_ = self.clone();
846 self.spawn(async move {
847 while let Some((current_round, callback)) = rx_primary_round.recv().await {
848 callback.send(self_.update_to_next_round(current_round)).ok();
849 }
850 });
851
852 let self_ = self.clone();
854 self.spawn(async move {
855 while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
856 let result = self_.update_dag::<true, false>(certificate).await;
858 callback.send(result).ok();
861 }
862 });
863
864 let self_ = self.clone();
866 self.spawn(async move {
867 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
868 self_.sync_bft_dag_at_bootup(certificates).await;
869 }
870 });
871
872 let self_ = self.clone();
874 self.spawn(async move {
875 while let Some((certificate, callback)) = rx_sync_bft.recv().await {
876 let result = self_.update_dag::<true, true>(certificate).await;
878 callback.send(result).ok();
881 }
882 });
883 }
884
885 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
892 let mut dag = self.dag.write();
894
895 for certificate in certificates {
897 dag.commit(&certificate, self.storage().max_gc_rounds());
898 }
899 }
900
901 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
903 self.handles.lock().push(tokio::spawn(future));
904 }
905
906 pub async fn shut_down(&self) {
908 let _guard = self.get_tracing_guard();
909 info!("Shutting down the BFT...");
910 let _lock = self.lock.lock().await;
912 self.primary.shut_down().await;
914 self.handles.lock().iter().for_each(|handle| handle.abort());
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use crate::{
922 BFT,
923 DEVELOPMENT_MODE_RNG_SEED,
924 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
925 helpers::{Storage, amareleo_storage_mode, default_ledger_dir},
926 };
927
928 use amareleo_chain_account::Account;
929 use amareleo_node_bft_ledger_service::MockLedgerService;
930 use amareleo_node_bft_storage_service::BFTMemoryService;
931 use snarkvm::{
932 console::account::{Address, PrivateKey},
933 ledger::{
934 committee::Committee,
935 narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
936 },
937 utilities::TestRng,
938 };
939
940 use aleo_std::StorageMode;
941 use anyhow::Result;
942 use indexmap::{IndexMap, IndexSet};
943 use rand::SeedableRng;
944 use rand_chacha::ChaChaRng;
945 use std::sync::Arc;
946
947 type CurrentNetwork = snarkvm::console::network::MainnetV0;
948
949 fn sample_test_instance(
951 committee_round: Option<u64>,
952 max_gc_rounds: u64,
953 rng: &mut TestRng,
954 ) -> (
955 Committee<CurrentNetwork>,
956 Account<CurrentNetwork>,
957 Arc<MockLedgerService<CurrentNetwork>>,
958 Storage<CurrentNetwork>,
959 ) {
960 let committee = match committee_round {
961 Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
962 None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
963 };
964 let account = Account::new(rng).unwrap();
965 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
966 let transmissions = Arc::new(BFTMemoryService::new());
967 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
968
969 (committee, account, ledger, storage)
970 }
971
972 #[test]
973 #[tracing_test::traced_test]
974 fn test_is_leader_quorum_odd() -> Result<()> {
975 let rng = &mut TestRng::default();
976
977 let mut certificates = IndexSet::new();
979 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
980 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
981 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
982 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
983
984 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
986 1,
987 vec![
988 certificates[0].author(),
989 certificates[1].author(),
990 certificates[2].author(),
991 certificates[3].author(),
992 ],
993 rng,
994 );
995
996 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
998 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1000 let account = Account::new(rng)?;
1002 let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1004 assert!(bft.is_timer_expired());
1005 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1007 assert!(!result);
1009 for certificate in certificates.iter() {
1011 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1012 }
1013 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1015 assert!(result); let leader_certificate = sample_batch_certificate(rng);
1018 *bft.leader_certificate.write() = Some(leader_certificate);
1019 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1021 assert!(result); Ok(())
1024 }
1025
1026 #[test]
1027 #[tracing_test::traced_test]
1028 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1029 let rng = &mut TestRng::default();
1030
1031 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1033 assert_eq!(committee.starting_round(), 1);
1034 assert_eq!(storage.current_round(), 1);
1035 assert_eq!(storage.max_gc_rounds(), 10);
1036
1037 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1039 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1044 assert!(!result);
1045 Ok(())
1046 }
1047
1048 #[test]
1049 #[tracing_test::traced_test]
1050 fn test_is_leader_quorum_even() -> Result<()> {
1051 let rng = &mut TestRng::default();
1052
1053 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1055 assert_eq!(committee.starting_round(), 2);
1056 assert_eq!(storage.current_round(), 2);
1057 assert_eq!(storage.max_gc_rounds(), 10);
1058
1059 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1061 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1065 assert!(!result);
1066 Ok(())
1067 }
1068
1069 #[test]
1070 #[tracing_test::traced_test]
1071 fn test_is_even_round_ready() -> Result<()> {
1072 let rng = &mut TestRng::default();
1073
1074 let mut certificates = IndexSet::new();
1076 certificates.insert(sample_batch_certificate_for_round(2, rng));
1077 certificates.insert(sample_batch_certificate_for_round(2, rng));
1078 certificates.insert(sample_batch_certificate_for_round(2, rng));
1079 certificates.insert(sample_batch_certificate_for_round(2, rng));
1080
1081 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1083 2,
1084 vec![
1085 certificates[0].author(),
1086 certificates[1].author(),
1087 certificates[2].author(),
1088 certificates[3].author(),
1089 ],
1090 rng,
1091 );
1092
1093 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1095 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1097 let account = Account::new(rng)?;
1099 let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1101 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1103 *bft.leader_certificate.write() = Some(leader_certificate);
1104 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1105 assert!(!result);
1107 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1109 assert!(result);
1110
1111 let bft_timer =
1113 BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1114 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1116 if !bft_timer.is_timer_expired() {
1117 assert!(!result);
1118 }
1119 let leader_certificate_timeout =
1121 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1122 std::thread::sleep(leader_certificate_timeout);
1123 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1125 if bft_timer.is_timer_expired() {
1126 assert!(result);
1127 } else {
1128 assert!(!result);
1129 }
1130
1131 Ok(())
1132 }
1133
1134 #[test]
1135 #[tracing_test::traced_test]
1136 fn test_update_leader_certificate_odd() -> Result<()> {
1137 let rng = &mut TestRng::default();
1138
1139 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1141 assert_eq!(storage.max_gc_rounds(), 10);
1142
1143 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1145
1146 let result = bft.update_leader_certificate_to_even_round(1);
1148 assert!(!result);
1149 Ok(())
1150 }
1151
1152 #[test]
1153 #[tracing_test::traced_test]
1154 fn test_update_leader_certificate_bad_round() -> Result<()> {
1155 let rng = &mut TestRng::default();
1156
1157 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1159 assert_eq!(storage.max_gc_rounds(), 10);
1160
1161 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1163
1164 let result = bft.update_leader_certificate_to_even_round(6);
1166 assert!(!result);
1167 Ok(())
1168 }
1169
1170 #[test]
1171 #[tracing_test::traced_test]
1172 fn test_update_leader_certificate_even() -> Result<()> {
1173 let rng = &mut TestRng::default();
1174
1175 let current_round = 3;
1177
1178 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1180 current_round,
1181 rng,
1182 );
1183
1184 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1186 2,
1187 vec![
1188 certificates[0].author(),
1189 certificates[1].author(),
1190 certificates[2].author(),
1191 certificates[3].author(),
1192 ],
1193 rng,
1194 );
1195
1196 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1198
1199 let transmissions = Arc::new(BFTMemoryService::new());
1201 let storage = Storage::new(ledger.clone(), transmissions, 10, None);
1202 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1203 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1204 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1205 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1206 assert_eq!(storage.current_round(), 2);
1207
1208 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1211
1212 let account = Account::new(rng)?;
1214 let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1215
1216 *bft.leader_certificate.write() = Some(leader_certificate);
1218
1219 let result = bft.update_leader_certificate_to_even_round(2);
1222 assert!(result);
1223
1224 Ok(())
1225 }
1226
1227 #[tokio::test]
1228 #[tracing_test::traced_test]
1229 async fn test_order_dag_with_dfs() -> Result<()> {
1230 let rng = &mut TestRng::default();
1231
1232 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1234
1235 let previous_round = 2; let current_round = previous_round + 1;
1238
1239 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1241 current_round,
1242 rng,
1243 );
1244
1245 {
1249 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1251 let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1253
1254 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1256
1257 for certificate in previous_certificates.clone() {
1259 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1260 }
1261
1262 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1264 assert!(result.is_ok());
1265 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1266 assert_eq!(candidate_certificates.len(), 1);
1267 let expected_certificates = vec![certificate.clone()];
1268 assert_eq!(
1269 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1270 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1271 );
1272 assert_eq!(candidate_certificates, expected_certificates);
1273 }
1274
1275 {
1279 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1281 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1283
1284 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1286
1287 for certificate in previous_certificates.clone() {
1289 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1290 }
1291
1292 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1294 assert!(result.is_ok());
1295 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1296 assert_eq!(candidate_certificates.len(), 5);
1297 let expected_certificates = vec![
1298 previous_certificates[0].clone(),
1299 previous_certificates[1].clone(),
1300 previous_certificates[2].clone(),
1301 previous_certificates[3].clone(),
1302 certificate,
1303 ];
1304 assert_eq!(
1305 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1306 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1307 );
1308 assert_eq!(candidate_certificates, expected_certificates);
1309 }
1310
1311 Ok(())
1312 }
1313
1314 #[test]
1315 #[tracing_test::traced_test]
1316 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1317 let rng = &mut TestRng::default();
1318
1319 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1321 assert_eq!(committee.starting_round(), 1);
1322 assert_eq!(storage.current_round(), 1);
1323 assert_eq!(storage.max_gc_rounds(), 1);
1324
1325 let previous_round = 2; let current_round = previous_round + 1;
1328
1329 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1331 current_round,
1332 rng,
1333 );
1334 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1336
1337 let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1341
1342 let error_msg = format!(
1344 "Missing previous certificate {} for round {previous_round}",
1345 crate::helpers::fmt_id(previous_certificate_ids[3]),
1346 );
1347
1348 let result = bft.order_dag_with_dfs::<false>(certificate);
1350 assert!(result.is_err());
1351 assert_eq!(result.unwrap_err().to_string(), error_msg);
1352 Ok(())
1353 }
1354
1355 #[tokio::test]
1356 #[tracing_test::traced_test]
1357 async fn test_bft_gc_on_commit() -> Result<()> {
1358 let rng = &mut TestRng::default();
1359
1360 let max_gc_rounds = 1;
1362 let committee_round = 0;
1363 let commit_round = 2;
1364 let current_round = commit_round + 1;
1365
1366 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1368 current_round,
1369 rng,
1370 );
1371
1372 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1374 committee_round,
1375 vec![
1376 certificates[0].author(),
1377 certificates[1].author(),
1378 certificates[2].author(),
1379 certificates[3].author(),
1380 ],
1381 rng,
1382 );
1383
1384 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1386
1387 let transmissions = Arc::new(BFTMemoryService::new());
1389 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
1390 for certificate in certificates.iter() {
1392 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1393 }
1394
1395 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1398
1399 let account = Account::new(rng)?;
1401 let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1402 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1404
1405 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1407
1408 for certificate in certificates {
1410 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1411 }
1412
1413 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1415
1416 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1418
1419 Ok(())
1420 }
1421
1422 #[tokio::test]
1423 #[tracing_test::traced_test]
1424 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1425 let rng = &mut TestRng::default();
1426
1427 let max_gc_rounds = 1;
1429 let committee_round = 0;
1430 let commit_round = 2;
1431 let current_round = commit_round + 1;
1432
1433 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1435 current_round,
1436 rng,
1437 );
1438
1439 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1441 committee_round,
1442 vec![
1443 certificates[0].author(),
1444 certificates[1].author(),
1445 certificates[2].author(),
1446 certificates[3].author(),
1447 ],
1448 rng,
1449 );
1450
1451 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1453
1454 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1456 for certificate in certificates.iter() {
1458 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1459 }
1460
1461 let leader = certificates[0].author(); let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1464
1465 let account = Account::new(rng)?;
1467 let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1468
1469 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1471
1472 for certificate in certificates.clone() {
1474 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1475 }
1476
1477 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1479
1480 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1484 let bootup_bft = BFT::new(account, storage_2, false, StorageMode::Development(0), ledger, None)?;
1486
1487 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1489
1490 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1492
1493 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1495 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1496
1497 for certificate in certificates {
1499 let certificate_round = certificate.round();
1500 let certificate_id = certificate.id();
1501 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1503 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1506 }
1507
1508 Ok(())
1509 }
1510
1511 #[tokio::test]
1512 #[tracing_test::traced_test]
1513 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1514 let rng = &mut TestRng::default();
1521 let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1522
1523 let private_keys = vec![
1524 PrivateKey::new(rng_pks).unwrap(),
1525 PrivateKey::new(rng_pks).unwrap(),
1526 PrivateKey::new(rng_pks).unwrap(),
1527 PrivateKey::new(rng_pks).unwrap(),
1528 ];
1529 let address0 = Address::try_from(private_keys[0])?;
1530
1531 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1533 let committee_round = 0;
1534 let commit_round = 2;
1535 let current_round = commit_round + 1;
1536 let next_round = current_round + 1;
1537
1538 let (round_to_certificates_map, committee) = {
1540 let addresses = vec![
1541 Address::try_from(private_keys[0])?,
1542 Address::try_from(private_keys[1])?,
1543 Address::try_from(private_keys[2])?,
1544 Address::try_from(private_keys[3])?,
1545 ];
1546 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1547 committee_round,
1548 addresses,
1549 rng,
1550 );
1551 let mut round_to_certificates_map: IndexMap<
1553 u64,
1554 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1555 > = IndexMap::new();
1556 let mut previous_certificates = IndexSet::with_capacity(4);
1557 for _ in 0..4 {
1559 previous_certificates.insert(sample_batch_certificate(rng));
1560 }
1561 for round in 0..commit_round + 3 {
1562 let mut current_certificates = IndexSet::new();
1563 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1564 IndexSet::new()
1565 } else {
1566 previous_certificates.iter().map(|c| c.id()).collect()
1567 };
1568 let transmission_ids =
1569 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1570 .into_iter()
1571 .collect::<IndexSet<_>>();
1572 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1573 let committee_id = committee.id();
1574 for (i, private_key_1) in private_keys.iter().enumerate() {
1575 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1576 private_key_1,
1577 round,
1578 timestamp,
1579 committee_id,
1580 transmission_ids.clone(),
1581 previous_certificate_ids.clone(),
1582 rng,
1583 )
1584 .unwrap();
1585 let mut signatures = IndexSet::with_capacity(4);
1586 for (j, private_key_2) in private_keys.iter().enumerate() {
1587 if i != j {
1588 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1589 }
1590 }
1591 let certificate =
1592 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1593 current_certificates.insert(certificate);
1594 }
1595 round_to_certificates_map.insert(round, current_certificates.clone());
1597 previous_certificates = current_certificates.clone();
1598 }
1599 (round_to_certificates_map, committee)
1600 };
1601
1602 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1604 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1606 let leader = address0; let next_leader = address0; let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1611 for i in 1..=commit_round {
1612 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1613 if i == commit_round {
1614 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1616 if let Some(c) = leader_certificate {
1617 pre_shutdown_certificates.push(c.clone());
1618 }
1619 continue;
1620 }
1621 pre_shutdown_certificates.extend(certificates);
1622 }
1623 for certificate in pre_shutdown_certificates.iter() {
1624 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1625 }
1626 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1628 Vec::new();
1629 for j in commit_round..=commit_round + 2 {
1630 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1631 post_shutdown_certificates.extend(certificate);
1632 }
1633 for certificate in post_shutdown_certificates.iter() {
1634 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1635 }
1636 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1638 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1639
1640 let account = Account::try_from(private_keys[0])?;
1642 let ledger_dir = default_ledger_dir(1, true, "0");
1643 let storage_mode = amareleo_storage_mode(ledger_dir);
1644 let bft = BFT::new(account.clone(), storage, true, storage_mode.clone(), ledger.clone(), None)?;
1645
1646 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1648
1649 for certificate in pre_shutdown_certificates.clone() {
1651 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1652 }
1653
1654 for certificate in post_shutdown_certificates.clone() {
1656 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1657 }
1658 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1660 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1661 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1662
1663 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1667
1668 let bootup_bft = BFT::new(account, bootup_storage.clone(), true, storage_mode.clone(), ledger.clone(), None)?;
1670
1671 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1673
1674 for certificate in post_shutdown_certificates.iter() {
1676 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1677 }
1678 for certificate in post_shutdown_certificates.clone() {
1679 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1680 }
1681 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1683 let commit_subdag_metadata_bootup =
1684 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1685 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1686 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1687
1688 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1692
1693 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1695 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1696 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1697 assert!(
1698 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1699 );
1700
1701 for certificate in pre_shutdown_certificates.clone() {
1703 let certificate_round = certificate.round();
1704 let certificate_id = certificate.id();
1705 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1707 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1708 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1711 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1712 }
1713
1714 for certificate in committed_certificates_bootup.clone() {
1716 let certificate_round = certificate.round();
1717 let certificate_id = certificate.id();
1718 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1720 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1721 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1724 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1725 }
1726
1727 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1729
1730 Ok(())
1731 }
1732
1733 #[tokio::test]
1734 #[tracing_test::traced_test]
1735 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1736 let rng = &mut TestRng::default();
1743 let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1744
1745 let private_keys = vec![
1746 PrivateKey::new(rng_pks).unwrap(),
1747 PrivateKey::new(rng_pks).unwrap(),
1748 PrivateKey::new(rng_pks).unwrap(),
1749 PrivateKey::new(rng_pks).unwrap(),
1750 ];
1751 let address0 = Address::try_from(private_keys[0])?;
1752
1753 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1755 let committee_round = 0;
1756 let commit_round = 2;
1757 let current_round = commit_round + 1;
1758 let next_round = current_round + 1;
1759
1760 let (round_to_certificates_map, committee) = {
1762 let addresses = vec![
1763 Address::try_from(private_keys[0])?,
1764 Address::try_from(private_keys[1])?,
1765 Address::try_from(private_keys[2])?,
1766 Address::try_from(private_keys[3])?,
1767 ];
1768 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1769 committee_round,
1770 addresses,
1771 rng,
1772 );
1773 let mut round_to_certificates_map: IndexMap<
1775 u64,
1776 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1777 > = IndexMap::new();
1778 let mut previous_certificates = IndexSet::with_capacity(4);
1779 for _ in 0..4 {
1781 previous_certificates.insert(sample_batch_certificate(rng));
1782 }
1783 for round in 0..=commit_round + 2 {
1784 let mut current_certificates = IndexSet::new();
1785 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1786 IndexSet::new()
1787 } else {
1788 previous_certificates.iter().map(|c| c.id()).collect()
1789 };
1790 let transmission_ids =
1791 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1792 .into_iter()
1793 .collect::<IndexSet<_>>();
1794 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1795 let committee_id = committee.id();
1796 for (i, private_key_1) in private_keys.iter().enumerate() {
1797 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1798 private_key_1,
1799 round,
1800 timestamp,
1801 committee_id,
1802 transmission_ids.clone(),
1803 previous_certificate_ids.clone(),
1804 rng,
1805 )
1806 .unwrap();
1807 let mut signatures = IndexSet::with_capacity(4);
1808 for (j, private_key_2) in private_keys.iter().enumerate() {
1809 if i != j {
1810 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1811 }
1812 }
1813 let certificate =
1814 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1815 current_certificates.insert(certificate);
1816 }
1817 round_to_certificates_map.insert(round, current_certificates.clone());
1819 previous_certificates = current_certificates.clone();
1820 }
1821 (round_to_certificates_map, committee)
1822 };
1823
1824 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1826 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1828 let leader = address0; let next_leader = address0; let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1833 for i in 1..=commit_round {
1834 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1835 if i == commit_round {
1836 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1838 if let Some(c) = leader_certificate {
1839 pre_shutdown_certificates.push(c.clone());
1840 }
1841 continue;
1842 }
1843 pre_shutdown_certificates.extend(certificates);
1844 }
1845 for certificate in pre_shutdown_certificates.iter() {
1846 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1847 }
1848 let account = Account::try_from(private_keys[0])?;
1850 let bootup_bft =
1851 BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1852 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1854 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1856
1857 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1859 Vec::new();
1860 for j in commit_round..=commit_round + 2 {
1861 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1862 post_shutdown_certificates.extend(certificate);
1863 }
1864 for certificate in post_shutdown_certificates.iter() {
1865 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1866 }
1867
1868 for certificate in post_shutdown_certificates.clone() {
1870 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1871 }
1872
1873 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1875 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1876 let committed_certificates = commit_subdag.values().flatten();
1877
1878 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1880 for committed_certificate in committed_certificates.clone() {
1881 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1882 }
1883 }
1884 Ok(())
1885 }
1886}