1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 Primary,
19 helpers::{
20 BFTReceiver,
21 ConsensusSender,
22 DAG,
23 PrimaryReceiver,
24 PrimarySender,
25 Storage,
26 fmt_id,
27 init_bft_channels,
28 now,
29 },
30};
31use snarkos_account::Account;
32use snarkos_node_bft_ledger_service::LedgerService;
33use snarkvm::{
34 console::account::Address,
35 ledger::{
36 block::Transaction,
37 committee::Committee,
38 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
39 puzzle::{Solution, SolutionID},
40 },
41 prelude::{Field, Network, Result, bail, ensure},
42};
43
44use aleo_std::StorageMode;
45use colored::Colorize;
46use indexmap::{IndexMap, IndexSet};
47#[cfg(feature = "locktick")]
48use locktick::{
49 parking_lot::{Mutex, RwLock},
50 tokio::Mutex as TMutex,
51};
52#[cfg(not(feature = "locktick"))]
53use parking_lot::{Mutex, RwLock};
54use std::{
55 collections::{BTreeMap, HashSet},
56 future::Future,
57 net::SocketAddr,
58 sync::{
59 Arc,
60 atomic::{AtomicI64, Ordering},
61 },
62};
63#[cfg(not(feature = "locktick"))]
64use tokio::sync::Mutex as TMutex;
65use tokio::{
66 sync::{OnceCell, oneshot},
67 task::JoinHandle,
68};
69
70#[derive(Clone)]
71pub struct BFT<N: Network> {
72 primary: Primary<N>,
74 dag: Arc<RwLock<DAG<N>>>,
76 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
78 leader_certificate_timer: Arc<AtomicI64>,
80 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
82 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
84 lock: Arc<TMutex<()>>,
86}
87
88impl<N: Network> BFT<N> {
89 pub fn new(
91 account: Account<N>,
92 storage: Storage<N>,
93 ledger: Arc<dyn LedgerService<N>>,
94 ip: Option<SocketAddr>,
95 trusted_validators: &[SocketAddr],
96 storage_mode: StorageMode,
97 ) -> Result<Self> {
98 Ok(Self {
99 primary: Primary::new(account, storage, ledger, ip, trusted_validators, storage_mode)?,
100 dag: Default::default(),
101 leader_certificate: Default::default(),
102 leader_certificate_timer: Default::default(),
103 consensus_sender: Default::default(),
104 handles: Default::default(),
105 lock: Default::default(),
106 })
107 }
108
109 pub async fn run(
111 &mut self,
112 consensus_sender: Option<ConsensusSender<N>>,
113 primary_sender: PrimarySender<N>,
114 primary_receiver: PrimaryReceiver<N>,
115 ) -> Result<()> {
116 info!("Starting the BFT instance...");
117 let (bft_sender, bft_receiver) = init_bft_channels::<N>();
119 self.start_handlers(bft_receiver);
121 self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await?;
123 if let Some(consensus_sender) = consensus_sender {
126 self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
127 }
128 Ok(())
129 }
130
131 pub fn is_synced(&self) -> bool {
133 self.primary.is_synced()
134 }
135
136 pub const fn primary(&self) -> &Primary<N> {
138 &self.primary
139 }
140
141 pub const fn storage(&self) -> &Storage<N> {
143 self.primary.storage()
144 }
145
146 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
148 self.primary.ledger()
149 }
150
151 pub fn leader(&self) -> Option<Address<N>> {
153 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
154 }
155
156 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
158 &self.leader_certificate
159 }
160}
161
162impl<N: Network> BFT<N> {
163 pub fn num_unconfirmed_transmissions(&self) -> usize {
165 self.primary.num_unconfirmed_transmissions()
166 }
167
168 pub fn num_unconfirmed_ratifications(&self) -> usize {
170 self.primary.num_unconfirmed_ratifications()
171 }
172
173 pub fn num_unconfirmed_solutions(&self) -> usize {
175 self.primary.num_unconfirmed_solutions()
176 }
177
178 pub fn num_unconfirmed_transactions(&self) -> usize {
180 self.primary.num_unconfirmed_transactions()
181 }
182}
183
184impl<N: Network> BFT<N> {
185 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
187 self.primary.worker_transmission_ids()
188 }
189
190 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
192 self.primary.worker_transmissions()
193 }
194
195 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
197 self.primary.worker_solutions()
198 }
199
200 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
202 self.primary.worker_transactions()
203 }
204}
205
206impl<N: Network> BFT<N> {
207 fn update_to_next_round(&self, current_round: u64) -> bool {
209 let storage_round = self.storage().current_round();
211 if current_round < storage_round {
212 debug!(
213 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
214 );
215 return false;
216 }
217
218 let is_ready = match current_round % 2 == 0 {
220 true => self.update_leader_certificate_to_even_round(current_round),
221 false => self.is_leader_quorum_or_nonleaders_available(current_round),
222 };
223
224 #[cfg(feature = "metrics")]
225 {
226 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
227 if start > 0 {
229 let end = now();
230 let elapsed = std::time::Duration::from_secs((end - start) as u64);
231 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
232 }
233 }
234
235 if current_round % 2 == 0 {
237 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
239 if !is_ready {
241 trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
242 }
243 let leader_round = leader_certificate.round();
245 match leader_round == current_round {
246 true => {
247 info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
248 #[cfg(feature = "metrics")]
249 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
250 }
251 false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
252 }
253 } else {
254 match is_ready {
255 true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
256 false => info!("{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed()),
257 }
258 }
259 }
260
261 if is_ready {
263 if let Err(e) = self.storage().increment_to_next_round(current_round) {
265 warn!("BFT failed to increment to the next round from round {current_round} - {e}");
266 return false;
267 }
268 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
270 }
271
272 is_ready
273 }
274
275 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
281 let current_round = self.storage().current_round();
283 if current_round != even_round {
285 warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
286 return false;
287 }
288
289 if current_round % 2 != 0 || current_round < 2 {
291 error!("BFT cannot update the leader certificate in an odd round");
292 return false;
293 }
294
295 let current_certificates = self.storage().get_certificates_for_round(current_round);
297 if current_certificates.is_empty() {
299 *self.leader_certificate.write() = None;
301 return false;
302 }
303
304 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
306 Ok(committee) => committee,
307 Err(e) => {
308 error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
309 return false;
310 }
311 };
312 let leader = match self.ledger().latest_leader() {
314 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
315 _ => {
316 let computed_leader = match committee_lookback.get_leader(current_round) {
318 Ok(leader) => leader,
319 Err(e) => {
320 error!("BFT failed to compute the leader for the even round {current_round} - {e}");
321 return false;
322 }
323 };
324
325 self.ledger().update_latest_leader(current_round, computed_leader);
327
328 computed_leader
329 }
330 };
331 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
333 *self.leader_certificate.write() = leader_certificate.cloned();
334
335 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
336 }
337
338 fn is_even_round_ready_for_next_round(
342 &self,
343 certificates: IndexSet<BatchCertificate<N>>,
344 committee: Committee<N>,
345 current_round: u64,
346 ) -> bool {
347 let authors = certificates.into_iter().map(|c| c.author()).collect();
349 if !committee.is_quorum_threshold_reached(&authors) {
351 trace!("BFT failed to reach quorum threshold in even round {current_round}");
352 return false;
353 }
354 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
356 if leader_certificate.round() == current_round {
357 return true;
358 }
359 }
360 if self.is_timer_expired() {
362 debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
363 return true;
364 }
365 false
367 }
368
369 fn is_timer_expired(&self) -> bool {
371 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
372 }
373
374 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
379 let current_round = self.storage().current_round();
381 if current_round != odd_round {
383 warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
384 return false;
385 }
386 if current_round % 2 != 1 {
388 error!("BFT does not compute stakes for the leader certificate in an even round");
389 return false;
390 }
391 let current_certificates = self.storage().get_certificates_for_round(current_round);
393 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
395 Ok(committee) => committee,
396 Err(e) => {
397 error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
398 return false;
399 }
400 };
401 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
403 if !committee_lookback.is_quorum_threshold_reached(&authors) {
405 trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
406 return false;
407 }
408 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
410 return true;
412 };
413 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
415 leader_certificate.id(),
416 current_certificates,
417 &committee_lookback,
418 );
419 stake_with_leader >= committee_lookback.availability_threshold()
421 || stake_without_leader >= committee_lookback.quorum_threshold()
422 || self.is_timer_expired()
423 }
424
425 fn compute_stake_for_leader_certificate(
427 &self,
428 leader_certificate_id: Field<N>,
429 current_certificates: IndexSet<BatchCertificate<N>>,
430 current_committee: &Committee<N>,
431 ) -> (u64, u64) {
432 if current_certificates.is_empty() {
434 return (0, 0);
435 }
436
437 let mut stake_with_leader = 0u64;
439 let mut stake_without_leader = 0u64;
441 for certificate in current_certificates {
443 let stake = current_committee.get_stake(certificate.author());
445 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
447 true => stake_with_leader = stake_with_leader.saturating_add(stake),
449 false => stake_without_leader = stake_without_leader.saturating_add(stake),
451 }
452 }
453 (stake_with_leader, stake_without_leader)
455 }
456}
457
458impl<N: Network> BFT<N> {
459 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
461 &self,
462 certificate: BatchCertificate<N>,
463 ) -> Result<()> {
464 let _lock = self.lock.lock().await;
466
467 let certificate_round = certificate.round();
469 self.dag.write().insert(certificate);
471
472 let commit_round = certificate_round.saturating_sub(1);
474 if commit_round % 2 != 0 || commit_round < 2 {
476 return Ok(());
477 }
478 if commit_round <= self.dag.read().last_committed_round() {
480 return Ok(());
481 }
482
483 trace!("Checking if the leader is ready to be committed for round {commit_round}...");
485
486 let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
488 bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
489 };
490
491 let leader = match self.ledger().latest_leader() {
493 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
494 _ => {
495 let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
497 bail!("BFT failed to compute the leader for commit round {commit_round}");
498 };
499
500 self.ledger().update_latest_leader(commit_round, computed_leader);
502
503 computed_leader
504 }
505 };
506
507 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
509 else {
510 trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
511 return Ok(());
512 };
513 let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
515 bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
517 };
518 let Ok(certificate_committee_lookback) = self.ledger().get_committee_lookback_for_round(certificate_round)
520 else {
521 bail!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}");
522 };
523 let authors = certificates
525 .values()
526 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
527 true => Some(c.author()),
528 false => None,
529 })
530 .collect();
531 if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
533 trace!("BFT is not ready to commit {commit_round}");
535 return Ok(());
536 }
537
538 info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
540
541 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
543 }
544
545 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
547 &self,
548 leader_certificate: BatchCertificate<N>,
549 ) -> Result<()> {
550 let latest_leader_round = leader_certificate.round();
552 let mut leader_certificates = vec![leader_certificate.clone()];
555 {
556 let leader_round = leader_certificate.round();
558
559 let mut current_certificate = leader_certificate;
560 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
561 {
562 let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
564 Ok(committee) => committee,
565 Err(e) => {
566 bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
567 }
568 };
569 let leader = match self.ledger().latest_leader() {
571 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
572 _ => {
573 let computed_leader = match previous_committee_lookback.get_leader(round) {
575 Ok(leader) => leader,
576 Err(e) => {
577 bail!("BFT failed to compute the leader for the even round {round} - {e}");
578 }
579 };
580
581 self.ledger().update_latest_leader(round, computed_leader);
583
584 computed_leader
585 }
586 };
587 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
589 else {
590 continue;
591 };
592 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
594 leader_certificates.push(previous_certificate.clone());
596 current_certificate = previous_certificate;
598 }
599 }
600 }
601
602 for leader_certificate in leader_certificates.into_iter().rev() {
604 let leader_round = leader_certificate.round();
606 let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
608 Ok(subdag) => subdag,
609 Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
610 };
611 if !IS_SYNCING {
613 let mut transmissions = IndexMap::new();
615 let mut seen_transaction_ids = IndexSet::new();
617 let mut seen_solution_ids = IndexSet::new();
619 for certificate in commit_subdag.values().flatten() {
621 for transmission_id in certificate.transmission_ids() {
623 match transmission_id {
627 TransmissionID::Solution(solution_id, _) => {
628 if seen_solution_ids.contains(&solution_id) {
630 continue;
631 }
632 }
633 TransmissionID::Transaction(transaction_id, _) => {
634 if seen_transaction_ids.contains(transaction_id) {
636 continue;
637 }
638 }
639 TransmissionID::Ratification => {
640 bail!("Ratifications are currently not supported in the BFT.")
641 }
642 }
643 if transmissions.contains_key(transmission_id) {
645 continue;
646 }
647 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
650 continue;
651 }
652 let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
654 bail!(
655 "BFT failed to retrieve transmission '{}.{}' from round {}",
656 fmt_id(transmission_id),
657 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
658 certificate.round()
659 );
660 };
661 match transmission_id {
663 TransmissionID::Solution(id, _) => {
664 seen_solution_ids.insert(id);
665 }
666 TransmissionID::Transaction(id, _) => {
667 seen_transaction_ids.insert(id);
668 }
669 TransmissionID::Ratification => {}
670 }
671 transmissions.insert(*transmission_id, transmission);
673 }
674 }
675 let subdag = Subdag::from(commit_subdag.clone())?;
678 let anchor_round = subdag.anchor_round();
680 let num_transmissions = transmissions.len();
682 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
684
685 ensure!(
687 anchor_round == leader_round,
688 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
689 );
690
691 if let Some(consensus_sender) = self.consensus_sender.get() {
693 let (callback_sender, callback_receiver) = oneshot::channel();
695 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
697 match callback_receiver.await {
699 Ok(Ok(())) => (), Ok(Err(e)) => {
701 error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
702 return Ok(());
703 }
704 Err(e) => {
705 error!("BFT failed to receive the callback for round {anchor_round} - {e}");
706 return Ok(());
707 }
708 }
709 }
710
711 info!(
712 "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
713 );
714 }
715
716 let mut dag_write = self.dag.write();
718 for certificate in commit_subdag.values().flatten() {
719 dag_write.commit(certificate, self.storage().max_gc_rounds());
720 }
721
722 #[cfg(feature = "telemetry")]
724 self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
725 }
726
727 self.storage().garbage_collect_certificates(latest_leader_round);
729
730 Ok(())
731 }
732
733 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
735 &self,
736 leader_certificate: BatchCertificate<N>,
737 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
738 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
740 let mut already_ordered = HashSet::new();
742 let mut buffer = vec![leader_certificate];
744 while let Some(certificate) = buffer.pop() {
746 commit.entry(certificate.round()).or_default().insert(certificate.clone());
748
749 let previous_round = certificate.round().saturating_sub(1);
751 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
752 continue;
753 }
754 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
758 if already_ordered.contains(previous_certificate_id) {
760 continue;
761 }
762 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
764 continue;
765 }
766 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
768 continue;
769 }
770
771 let previous_certificate = {
773 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
775 Some(previous_certificate) => previous_certificate,
777 None => match self.storage().get_certificate(*previous_certificate_id) {
779 Some(previous_certificate) => previous_certificate,
781 None => bail!(
783 "Missing previous certificate {} for round {previous_round}",
784 fmt_id(previous_certificate_id)
785 ),
786 },
787 }
788 };
789 already_ordered.insert(previous_certificate.id());
791 buffer.push(previous_certificate);
793 }
794 }
795 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
797 Ok(commit)
799 }
800
801 fn is_linked(
803 &self,
804 previous_certificate: BatchCertificate<N>,
805 current_certificate: BatchCertificate<N>,
806 ) -> Result<bool> {
807 let mut traversal = vec![current_certificate.clone()];
809 for round in (previous_certificate.round()..current_certificate.round()).rev() {
811 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
813 bail!("BFT failed to retrieve the certificates for past round {round}");
816 };
817 traversal = certificates
819 .into_values()
820 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
821 .collect();
822 }
823 Ok(traversal.contains(&previous_certificate))
824 }
825}
826
827impl<N: Network> BFT<N> {
828 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
830 let BFTReceiver {
831 mut rx_primary_round,
832 mut rx_primary_certificate,
833 mut rx_sync_bft_dag_at_bootup,
834 mut rx_sync_bft,
835 } = bft_receiver;
836
837 let self_ = self.clone();
839 self.spawn(async move {
840 while let Some((current_round, callback)) = rx_primary_round.recv().await {
841 callback.send(self_.update_to_next_round(current_round)).ok();
842 }
843 });
844
845 let self_ = self.clone();
847 self.spawn(async move {
848 while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
849 let result = self_.update_dag::<true, false>(certificate).await;
851 callback.send(result).ok();
854 }
855 });
856
857 let self_ = self.clone();
859 self.spawn(async move {
860 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
861 self_.sync_bft_dag_at_bootup(certificates).await;
862 }
863 });
864
865 let self_ = self.clone();
867 self.spawn(async move {
868 while let Some((certificate, callback)) = rx_sync_bft.recv().await {
869 let result = self_.update_dag::<true, true>(certificate).await;
871 callback.send(result).ok();
874 }
875 });
876 }
877
878 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
885 let mut dag = self.dag.write();
887
888 for certificate in certificates {
890 dag.commit(&certificate, self.storage().max_gc_rounds());
891 }
892 }
893
894 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
896 self.handles.lock().push(tokio::spawn(future));
897 }
898
899 pub async fn shut_down(&self) {
901 info!("Shutting down the BFT...");
902 let _lock = self.lock.lock().await;
904 self.primary.shut_down().await;
906 self.handles.lock().iter().for_each(|handle| handle.abort());
908 }
909}
910
911#[cfg(test)]
912mod tests {
913 use crate::{BFT, MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, helpers::Storage};
914 use snarkos_account::Account;
915 use snarkos_node_bft_ledger_service::MockLedgerService;
916 use snarkos_node_bft_storage_service::BFTMemoryService;
917 use snarkvm::{
918 console::account::{Address, PrivateKey},
919 ledger::{
920 committee::Committee,
921 narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
922 },
923 utilities::TestRng,
924 };
925
926 use aleo_std::StorageMode;
927 use anyhow::Result;
928 use indexmap::{IndexMap, IndexSet};
929 use std::sync::Arc;
930
931 type CurrentNetwork = snarkvm::console::network::MainnetV0;
932
933 fn sample_test_instance(
935 committee_round: Option<u64>,
936 max_gc_rounds: u64,
937 rng: &mut TestRng,
938 ) -> (
939 Committee<CurrentNetwork>,
940 Account<CurrentNetwork>,
941 Arc<MockLedgerService<CurrentNetwork>>,
942 Storage<CurrentNetwork>,
943 ) {
944 let committee = match committee_round {
945 Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
946 None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
947 };
948 let account = Account::new(rng).unwrap();
949 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
950 let transmissions = Arc::new(BFTMemoryService::new());
951 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
952
953 (committee, account, ledger, storage)
954 }
955
956 #[test]
957 #[tracing_test::traced_test]
958 fn test_is_leader_quorum_odd() -> Result<()> {
959 let rng = &mut TestRng::default();
960
961 let mut certificates = IndexSet::new();
963 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
964 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
965 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
966 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
967
968 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
970 1,
971 vec![
972 certificates[0].author(),
973 certificates[1].author(),
974 certificates[2].author(),
975 certificates[3].author(),
976 ],
977 rng,
978 );
979
980 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
982 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
984 let account = Account::new(rng)?;
986 let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
988 assert!(bft.is_timer_expired());
989 let result = bft.is_leader_quorum_or_nonleaders_available(1);
991 assert!(!result);
993 for certificate in certificates.iter() {
995 storage.testing_only_insert_certificate_testing_only(certificate.clone());
996 }
997 let result = bft.is_leader_quorum_or_nonleaders_available(1);
999 assert!(result); let leader_certificate = sample_batch_certificate(rng);
1002 *bft.leader_certificate.write() = Some(leader_certificate);
1003 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1005 assert!(result); Ok(())
1008 }
1009
1010 #[test]
1011 #[tracing_test::traced_test]
1012 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1013 let rng = &mut TestRng::default();
1014
1015 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1017 assert_eq!(committee.starting_round(), 1);
1018 assert_eq!(storage.current_round(), 1);
1019 assert_eq!(storage.max_gc_rounds(), 10);
1020
1021 let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1023 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1028 assert!(!result);
1029 Ok(())
1030 }
1031
1032 #[test]
1033 #[tracing_test::traced_test]
1034 fn test_is_leader_quorum_even() -> Result<()> {
1035 let rng = &mut TestRng::default();
1036
1037 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1039 assert_eq!(committee.starting_round(), 2);
1040 assert_eq!(storage.current_round(), 2);
1041 assert_eq!(storage.max_gc_rounds(), 10);
1042
1043 let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1045 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1049 assert!(!result);
1050 Ok(())
1051 }
1052
1053 #[test]
1054 #[tracing_test::traced_test]
1055 fn test_is_even_round_ready() -> Result<()> {
1056 let rng = &mut TestRng::default();
1057
1058 let mut certificates = IndexSet::new();
1060 certificates.insert(sample_batch_certificate_for_round(2, rng));
1061 certificates.insert(sample_batch_certificate_for_round(2, rng));
1062 certificates.insert(sample_batch_certificate_for_round(2, rng));
1063 certificates.insert(sample_batch_certificate_for_round(2, rng));
1064
1065 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1067 2,
1068 vec![
1069 certificates[0].author(),
1070 certificates[1].author(),
1071 certificates[2].author(),
1072 certificates[3].author(),
1073 ],
1074 rng,
1075 );
1076
1077 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1079 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1081 let account = Account::new(rng)?;
1083 let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1085 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1087 *bft.leader_certificate.write() = Some(leader_certificate);
1088 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1089 assert!(!result);
1091 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1093 assert!(result);
1094
1095 let bft_timer =
1097 BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1098 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1100 if !bft_timer.is_timer_expired() {
1101 assert!(!result);
1102 }
1103 let leader_certificate_timeout =
1105 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1106 std::thread::sleep(leader_certificate_timeout);
1107 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1109 if bft_timer.is_timer_expired() {
1110 assert!(result);
1111 } else {
1112 assert!(!result);
1113 }
1114
1115 Ok(())
1116 }
1117
1118 #[test]
1119 #[tracing_test::traced_test]
1120 fn test_update_leader_certificate_odd() -> Result<()> {
1121 let rng = &mut TestRng::default();
1122
1123 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1125 assert_eq!(storage.max_gc_rounds(), 10);
1126
1127 let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1129
1130 let result = bft.update_leader_certificate_to_even_round(1);
1132 assert!(!result);
1133 Ok(())
1134 }
1135
1136 #[test]
1137 #[tracing_test::traced_test]
1138 fn test_update_leader_certificate_bad_round() -> Result<()> {
1139 let rng = &mut TestRng::default();
1140
1141 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1143 assert_eq!(storage.max_gc_rounds(), 10);
1144
1145 let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1147
1148 let result = bft.update_leader_certificate_to_even_round(6);
1150 assert!(!result);
1151 Ok(())
1152 }
1153
1154 #[test]
1155 #[tracing_test::traced_test]
1156 fn test_update_leader_certificate_even() -> Result<()> {
1157 let rng = &mut TestRng::default();
1158
1159 let current_round = 3;
1161
1162 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1164 current_round,
1165 rng,
1166 );
1167
1168 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1170 2,
1171 vec![
1172 certificates[0].author(),
1173 certificates[1].author(),
1174 certificates[2].author(),
1175 certificates[3].author(),
1176 ],
1177 rng,
1178 );
1179
1180 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1182
1183 let transmissions = Arc::new(BFTMemoryService::new());
1185 let storage = Storage::new(ledger.clone(), transmissions, 10);
1186 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1187 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1188 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1189 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1190 assert_eq!(storage.current_round(), 2);
1191
1192 let leader = committee.get_leader(2).unwrap();
1194 let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1195
1196 let account = Account::new(rng)?;
1198 let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;
1199
1200 *bft.leader_certificate.write() = Some(leader_certificate);
1202
1203 let result = bft.update_leader_certificate_to_even_round(2);
1206 assert!(result);
1207
1208 Ok(())
1209 }
1210
1211 #[tokio::test]
1212 #[tracing_test::traced_test]
1213 async fn test_order_dag_with_dfs() -> Result<()> {
1214 let rng = &mut TestRng::default();
1215
1216 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1218
1219 let previous_round = 2; let current_round = previous_round + 1;
1222
1223 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1225 current_round,
1226 rng,
1227 );
1228
1229 {
1233 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1235 let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
1237
1238 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1240
1241 for certificate in previous_certificates.clone() {
1243 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1244 }
1245
1246 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1248 assert!(result.is_ok());
1249 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1250 assert_eq!(candidate_certificates.len(), 1);
1251 let expected_certificates = vec![certificate.clone()];
1252 assert_eq!(
1253 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1254 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1255 );
1256 assert_eq!(candidate_certificates, expected_certificates);
1257 }
1258
1259 {
1263 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1265 let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1267
1268 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1270
1271 for certificate in previous_certificates.clone() {
1273 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1274 }
1275
1276 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1278 assert!(result.is_ok());
1279 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1280 assert_eq!(candidate_certificates.len(), 5);
1281 let expected_certificates = vec![
1282 previous_certificates[0].clone(),
1283 previous_certificates[1].clone(),
1284 previous_certificates[2].clone(),
1285 previous_certificates[3].clone(),
1286 certificate,
1287 ];
1288 assert_eq!(
1289 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1290 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1291 );
1292 assert_eq!(candidate_certificates, expected_certificates);
1293 }
1294
1295 Ok(())
1296 }
1297
1298 #[test]
1299 #[tracing_test::traced_test]
1300 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1301 let rng = &mut TestRng::default();
1302
1303 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1305 assert_eq!(committee.starting_round(), 1);
1306 assert_eq!(storage.current_round(), 1);
1307 assert_eq!(storage.max_gc_rounds(), 1);
1308
1309 let previous_round = 2; let current_round = previous_round + 1;
1312
1313 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1315 current_round,
1316 rng,
1317 );
1318 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1320
1321 let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1325
1326 let error_msg = format!(
1328 "Missing previous certificate {} for round {previous_round}",
1329 crate::helpers::fmt_id(previous_certificate_ids[3]),
1330 );
1331
1332 let result = bft.order_dag_with_dfs::<false>(certificate);
1334 assert!(result.is_err());
1335 assert_eq!(result.unwrap_err().to_string(), error_msg);
1336 Ok(())
1337 }
1338
1339 #[tokio::test]
1340 #[tracing_test::traced_test]
1341 async fn test_bft_gc_on_commit() -> Result<()> {
1342 let rng = &mut TestRng::default();
1343
1344 let max_gc_rounds = 1;
1346 let committee_round = 0;
1347 let commit_round = 2;
1348 let current_round = commit_round + 1;
1349
1350 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1352 current_round,
1353 rng,
1354 );
1355
1356 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1358 committee_round,
1359 vec![
1360 certificates[0].author(),
1361 certificates[1].author(),
1362 certificates[2].author(),
1363 certificates[3].author(),
1364 ],
1365 rng,
1366 );
1367
1368 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1370
1371 let transmissions = Arc::new(BFTMemoryService::new());
1373 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1374 for certificate in certificates.iter() {
1376 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1377 }
1378
1379 let leader = committee.get_leader(commit_round).unwrap();
1381 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1382
1383 let account = Account::new(rng)?;
1385 let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;
1386 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1388
1389 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1391
1392 for certificate in certificates {
1394 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1395 }
1396
1397 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1399
1400 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1402
1403 Ok(())
1404 }
1405
1406 #[tokio::test]
1407 #[tracing_test::traced_test]
1408 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1409 let rng = &mut TestRng::default();
1410
1411 let max_gc_rounds = 1;
1413 let committee_round = 0;
1414 let commit_round = 2;
1415 let current_round = commit_round + 1;
1416
1417 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1419 current_round,
1420 rng,
1421 );
1422
1423 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1425 committee_round,
1426 vec![
1427 certificates[0].author(),
1428 certificates[1].author(),
1429 certificates[2].author(),
1430 certificates[3].author(),
1431 ],
1432 rng,
1433 );
1434
1435 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1437
1438 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1440 for certificate in certificates.iter() {
1442 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1443 }
1444
1445 let leader = committee.get_leader(commit_round).unwrap();
1447 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1448
1449 let account = Account::new(rng)?;
1451 let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
1452
1453 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1455
1456 for certificate in certificates.clone() {
1458 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1459 }
1460
1461 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1463
1464 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1468 let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], StorageMode::new_test(None))?;
1470
1471 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1473
1474 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1476
1477 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1479 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1480
1481 for certificate in certificates {
1483 let certificate_round = certificate.round();
1484 let certificate_id = certificate.id();
1485 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1487 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1490 }
1491
1492 Ok(())
1493 }
1494
1495 #[tokio::test]
1496 #[tracing_test::traced_test]
1497 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1498 let rng = &mut TestRng::default();
1505
1506 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1508 let committee_round = 0;
1509 let commit_round = 2;
1510 let current_round = commit_round + 1;
1511 let next_round = current_round + 1;
1512
1513 let (round_to_certificates_map, committee) = {
1515 let private_keys = vec![
1516 PrivateKey::new(rng).unwrap(),
1517 PrivateKey::new(rng).unwrap(),
1518 PrivateKey::new(rng).unwrap(),
1519 PrivateKey::new(rng).unwrap(),
1520 ];
1521 let addresses = vec![
1522 Address::try_from(private_keys[0])?,
1523 Address::try_from(private_keys[1])?,
1524 Address::try_from(private_keys[2])?,
1525 Address::try_from(private_keys[3])?,
1526 ];
1527 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1528 committee_round,
1529 addresses,
1530 rng,
1531 );
1532 let mut round_to_certificates_map: IndexMap<
1534 u64,
1535 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1536 > = IndexMap::new();
1537 let mut previous_certificates = IndexSet::with_capacity(4);
1538 for _ in 0..4 {
1540 previous_certificates.insert(sample_batch_certificate(rng));
1541 }
1542 for round in 0..commit_round + 3 {
1543 let mut current_certificates = IndexSet::new();
1544 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1545 IndexSet::new()
1546 } else {
1547 previous_certificates.iter().map(|c| c.id()).collect()
1548 };
1549 let transmission_ids =
1550 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1551 .into_iter()
1552 .collect::<IndexSet<_>>();
1553 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1554 let committee_id = committee.id();
1555 for (i, private_key_1) in private_keys.iter().enumerate() {
1556 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1557 private_key_1,
1558 round,
1559 timestamp,
1560 committee_id,
1561 transmission_ids.clone(),
1562 previous_certificate_ids.clone(),
1563 rng,
1564 )
1565 .unwrap();
1566 let mut signatures = IndexSet::with_capacity(4);
1567 for (j, private_key_2) in private_keys.iter().enumerate() {
1568 if i != j {
1569 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1570 }
1571 }
1572 let certificate =
1573 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1574 current_certificates.insert(certificate);
1575 }
1576 round_to_certificates_map.insert(round, current_certificates.clone());
1578 previous_certificates = current_certificates.clone();
1579 }
1580 (round_to_certificates_map, committee)
1581 };
1582
1583 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1585 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1587 let leader = committee.get_leader(commit_round).unwrap();
1589 let next_leader = committee.get_leader(next_round).unwrap();
1590 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1592 for i in 1..=commit_round {
1593 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1594 if i == commit_round {
1595 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1597 if let Some(c) = leader_certificate {
1598 pre_shutdown_certificates.push(c.clone());
1599 }
1600 continue;
1601 }
1602 pre_shutdown_certificates.extend(certificates);
1603 }
1604 for certificate in pre_shutdown_certificates.iter() {
1605 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1606 }
1607 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1609 Vec::new();
1610 for j in commit_round..=commit_round + 2 {
1611 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1612 post_shutdown_certificates.extend(certificate);
1613 }
1614 for certificate in post_shutdown_certificates.iter() {
1615 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1616 }
1617 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1619 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1620
1621 let account = Account::new(rng)?;
1623 let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
1624
1625 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1627
1628 for certificate in pre_shutdown_certificates.clone() {
1630 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1631 }
1632
1633 for certificate in post_shutdown_certificates.clone() {
1635 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1636 }
1637 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1639 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1640 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1641
1642 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1646
1647 let bootup_bft =
1649 BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1650
1651 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1653
1654 for certificate in post_shutdown_certificates.iter() {
1656 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1657 }
1658 for certificate in post_shutdown_certificates.clone() {
1659 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1660 }
1661 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1663 let commit_subdag_metadata_bootup =
1664 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1665 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1666 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1667
1668 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1672
1673 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1675 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1676 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1677 assert!(
1678 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1679 );
1680
1681 for certificate in pre_shutdown_certificates.clone() {
1683 let certificate_round = certificate.round();
1684 let certificate_id = certificate.id();
1685 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1687 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1688 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1691 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1692 }
1693
1694 for certificate in committed_certificates_bootup.clone() {
1696 let certificate_round = certificate.round();
1697 let certificate_id = certificate.id();
1698 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1700 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1701 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1704 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1705 }
1706
1707 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1709
1710 Ok(())
1711 }
1712
1713 #[tokio::test]
1714 #[tracing_test::traced_test]
1715 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1716 let rng = &mut TestRng::default();
1723
1724 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1726 let committee_round = 0;
1727 let commit_round = 2;
1728 let current_round = commit_round + 1;
1729 let next_round = current_round + 1;
1730
1731 let (round_to_certificates_map, committee) = {
1733 let private_keys = vec![
1734 PrivateKey::new(rng).unwrap(),
1735 PrivateKey::new(rng).unwrap(),
1736 PrivateKey::new(rng).unwrap(),
1737 PrivateKey::new(rng).unwrap(),
1738 ];
1739 let addresses = vec![
1740 Address::try_from(private_keys[0])?,
1741 Address::try_from(private_keys[1])?,
1742 Address::try_from(private_keys[2])?,
1743 Address::try_from(private_keys[3])?,
1744 ];
1745 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1746 committee_round,
1747 addresses,
1748 rng,
1749 );
1750 let mut round_to_certificates_map: IndexMap<
1752 u64,
1753 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1754 > = IndexMap::new();
1755 let mut previous_certificates = IndexSet::with_capacity(4);
1756 for _ in 0..4 {
1758 previous_certificates.insert(sample_batch_certificate(rng));
1759 }
1760 for round in 0..=commit_round + 2 {
1761 let mut current_certificates = IndexSet::new();
1762 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1763 IndexSet::new()
1764 } else {
1765 previous_certificates.iter().map(|c| c.id()).collect()
1766 };
1767 let transmission_ids =
1768 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1769 .into_iter()
1770 .collect::<IndexSet<_>>();
1771 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1772 let committee_id = committee.id();
1773 for (i, private_key_1) in private_keys.iter().enumerate() {
1774 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1775 private_key_1,
1776 round,
1777 timestamp,
1778 committee_id,
1779 transmission_ids.clone(),
1780 previous_certificate_ids.clone(),
1781 rng,
1782 )
1783 .unwrap();
1784 let mut signatures = IndexSet::with_capacity(4);
1785 for (j, private_key_2) in private_keys.iter().enumerate() {
1786 if i != j {
1787 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1788 }
1789 }
1790 let certificate =
1791 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1792 current_certificates.insert(certificate);
1793 }
1794 round_to_certificates_map.insert(round, current_certificates.clone());
1796 previous_certificates = current_certificates.clone();
1797 }
1798 (round_to_certificates_map, committee)
1799 };
1800
1801 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1803 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1805 let leader = committee.get_leader(commit_round).unwrap();
1807 let next_leader = committee.get_leader(next_round).unwrap();
1808 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1810 for i in 1..=commit_round {
1811 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1812 if i == commit_round {
1813 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1815 if let Some(c) = leader_certificate {
1816 pre_shutdown_certificates.push(c.clone());
1817 }
1818 continue;
1819 }
1820 pre_shutdown_certificates.extend(certificates);
1821 }
1822 for certificate in pre_shutdown_certificates.iter() {
1823 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1824 }
1825 let account = Account::new(rng)?;
1827 let bootup_bft =
1828 BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1829 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1831 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1833
1834 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1836 Vec::new();
1837 for j in commit_round..=commit_round + 2 {
1838 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1839 post_shutdown_certificates.extend(certificate);
1840 }
1841 for certificate in post_shutdown_certificates.iter() {
1842 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1843 }
1844
1845 for certificate in post_shutdown_certificates.clone() {
1847 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1848 }
1849
1850 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1852 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1853 let committed_certificates = commit_subdag.values().flatten();
1854
1855 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1857 for committed_certificate in committed_certificates.clone() {
1858 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1859 }
1860 }
1861 Ok(())
1862 }
1863}