1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 Primary,
19 helpers::{
20 BFTReceiver,
21 ConsensusSender,
22 DAG,
23 PrimaryReceiver,
24 PrimarySender,
25 Storage,
26 fmt_id,
27 init_bft_channels,
28 now,
29 },
30};
31use snarkos_account::Account;
32use snarkos_node_bft_ledger_service::LedgerService;
33use snarkos_node_sync::{BlockSync, Ping};
34use snarkvm::{
35 console::account::Address,
36 ledger::{
37 block::Transaction,
38 committee::Committee,
39 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
40 puzzle::{Solution, SolutionID},
41 },
42 prelude::{Field, Network, Result, bail, ensure},
43};
44
45use aleo_std::StorageMode;
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48#[cfg(feature = "locktick")]
49use locktick::{
50 parking_lot::{Mutex, RwLock},
51 tokio::Mutex as TMutex,
52};
53#[cfg(not(feature = "locktick"))]
54use parking_lot::{Mutex, RwLock};
55use std::{
56 collections::{BTreeMap, HashSet},
57 future::Future,
58 net::SocketAddr,
59 sync::{
60 Arc,
61 atomic::{AtomicI64, Ordering},
62 },
63};
64#[cfg(not(feature = "locktick"))]
65use tokio::sync::Mutex as TMutex;
66use tokio::{
67 sync::{OnceCell, oneshot},
68 task::JoinHandle,
69};
70
71#[derive(Clone)]
72pub struct BFT<N: Network> {
73 primary: Primary<N>,
75 dag: Arc<RwLock<DAG<N>>>,
77 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
79 leader_certificate_timer: Arc<AtomicI64>,
81 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
83 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
85 lock: Arc<TMutex<()>>,
87}
88
89impl<N: Network> BFT<N> {
90 #[allow(clippy::too_many_arguments)]
92 pub fn new(
93 account: Account<N>,
94 storage: Storage<N>,
95 ledger: Arc<dyn LedgerService<N>>,
96 block_sync: Arc<BlockSync<N>>,
97 ip: Option<SocketAddr>,
98 trusted_validators: &[SocketAddr],
99 trusted_peers_only: bool,
100 storage_mode: StorageMode,
101 dev: Option<u16>,
102 ) -> Result<Self> {
103 Ok(Self {
104 primary: Primary::new(
105 account,
106 storage,
107 ledger,
108 block_sync,
109 ip,
110 trusted_validators,
111 trusted_peers_only,
112 storage_mode,
113 dev,
114 )?,
115 dag: Default::default(),
116 leader_certificate: Default::default(),
117 leader_certificate_timer: Default::default(),
118 consensus_sender: Default::default(),
119 handles: Default::default(),
120 lock: Default::default(),
121 })
122 }
123
124 pub async fn run(
129 &mut self,
130 ping: Option<Arc<Ping<N>>>,
131 consensus_sender: Option<ConsensusSender<N>>,
132 primary_sender: PrimarySender<N>,
133 primary_receiver: PrimaryReceiver<N>,
134 ) -> Result<()> {
135 info!("Starting the BFT instance...");
136 let (bft_sender, bft_receiver) = init_bft_channels::<N>();
138 self.start_handlers(bft_receiver);
140 self.primary.run(ping, Some(bft_sender), primary_sender, primary_receiver).await?;
142 if let Some(consensus_sender) = consensus_sender {
145 self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
146 }
147 Ok(())
148 }
149
150 pub fn is_synced(&self) -> bool {
152 self.primary.is_synced()
153 }
154
155 pub const fn primary(&self) -> &Primary<N> {
157 &self.primary
158 }
159
160 pub const fn storage(&self) -> &Storage<N> {
162 self.primary.storage()
163 }
164
165 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
167 self.primary.ledger()
168 }
169
170 pub fn leader(&self) -> Option<Address<N>> {
172 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
173 }
174
175 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
177 &self.leader_certificate
178 }
179}
180
181impl<N: Network> BFT<N> {
182 pub fn num_unconfirmed_transmissions(&self) -> usize {
184 self.primary.num_unconfirmed_transmissions()
185 }
186
187 pub fn num_unconfirmed_ratifications(&self) -> usize {
189 self.primary.num_unconfirmed_ratifications()
190 }
191
192 pub fn num_unconfirmed_solutions(&self) -> usize {
194 self.primary.num_unconfirmed_solutions()
195 }
196
197 pub fn num_unconfirmed_transactions(&self) -> usize {
199 self.primary.num_unconfirmed_transactions()
200 }
201}
202
203impl<N: Network> BFT<N> {
204 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
206 self.primary.worker_transmission_ids()
207 }
208
209 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
211 self.primary.worker_transmissions()
212 }
213
214 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
216 self.primary.worker_solutions()
217 }
218
219 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
221 self.primary.worker_transactions()
222 }
223}
224
225impl<N: Network> BFT<N> {
226 fn update_to_next_round(&self, current_round: u64) -> bool {
228 let storage_round = self.storage().current_round();
230 if current_round < storage_round {
231 debug!(
232 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
233 );
234 return false;
235 }
236
237 let is_ready = match current_round % 2 == 0 {
239 true => self.update_leader_certificate_to_even_round(current_round),
240 false => self.is_leader_quorum_or_nonleaders_available(current_round),
241 };
242
243 #[cfg(feature = "metrics")]
244 {
245 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
246 if start > 0 {
248 let end = now();
249 let elapsed = std::time::Duration::from_secs((end - start) as u64);
250 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
251 }
252 }
253
254 if current_round % 2 == 0 {
256 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
258 if !is_ready {
260 trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
261 }
262 let leader_round = leader_certificate.round();
264 match leader_round == current_round {
265 true => {
266 info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
267 #[cfg(feature = "metrics")]
268 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
269 }
270 false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
271 }
272 } else {
273 match is_ready {
274 true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
275 false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()),
276 }
277 }
278 }
279
280 if is_ready {
282 if let Err(e) = self.storage().increment_to_next_round(current_round) {
284 warn!("BFT failed to increment to the next round from round {current_round} - {e}");
285 return false;
286 }
287 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
289 }
290
291 is_ready
292 }
293
294 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
300 let current_round = self.storage().current_round();
302 if current_round != even_round {
304 warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
305 return false;
306 }
307
308 if current_round % 2 != 0 || current_round < 2 {
310 error!("BFT cannot update the leader certificate in an odd round");
311 return false;
312 }
313
314 let current_certificates = self.storage().get_certificates_for_round(current_round);
316 if current_certificates.is_empty() {
318 *self.leader_certificate.write() = None;
320 return false;
321 }
322
323 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
325 Ok(committee) => committee,
326 Err(e) => {
327 error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
328 return false;
329 }
330 };
331 let leader = match self.ledger().latest_leader() {
333 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
334 _ => {
335 let computed_leader = match committee_lookback.get_leader(current_round) {
337 Ok(leader) => leader,
338 Err(e) => {
339 error!("BFT failed to compute the leader for the even round {current_round} - {e}");
340 return false;
341 }
342 };
343
344 self.ledger().update_latest_leader(current_round, computed_leader);
346
347 computed_leader
348 }
349 };
350 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
352 *self.leader_certificate.write() = leader_certificate.cloned();
353
354 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
355 }
356
357 fn is_even_round_ready_for_next_round(
361 &self,
362 certificates: IndexSet<BatchCertificate<N>>,
363 committee: Committee<N>,
364 current_round: u64,
365 ) -> bool {
366 let authors = certificates.into_iter().map(|c| c.author()).collect();
368 if !committee.is_quorum_threshold_reached(&authors) {
370 trace!("BFT failed to reach quorum threshold in even round {current_round}");
371 return false;
372 }
373 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
375 if leader_certificate.round() == current_round {
376 return true;
377 }
378 }
379 if self.is_timer_expired() {
381 debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
382 return true;
383 }
384 false
386 }
387
388 fn is_timer_expired(&self) -> bool {
392 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
393 }
394
395 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
400 let current_round = self.storage().current_round();
402 if current_round != odd_round {
404 warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
405 return false;
406 }
407 if current_round % 2 != 1 {
409 error!("BFT does not compute stakes for the leader certificate in an even round");
410 return false;
411 }
412 let current_certificates = self.storage().get_certificates_for_round(current_round);
414 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
416 Ok(committee) => committee,
417 Err(e) => {
418 error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
419 return false;
420 }
421 };
422 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
424 if !committee_lookback.is_quorum_threshold_reached(&authors) {
426 trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
427 return false;
428 }
429 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
431 return true;
433 };
434 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
436 leader_certificate.id(),
437 current_certificates,
438 &committee_lookback,
439 );
440 stake_with_leader >= committee_lookback.availability_threshold()
442 || stake_without_leader >= committee_lookback.quorum_threshold()
443 || self.is_timer_expired()
444 }
445
446 fn compute_stake_for_leader_certificate(
448 &self,
449 leader_certificate_id: Field<N>,
450 current_certificates: IndexSet<BatchCertificate<N>>,
451 current_committee: &Committee<N>,
452 ) -> (u64, u64) {
453 if current_certificates.is_empty() {
455 return (0, 0);
456 }
457
458 let mut stake_with_leader = 0u64;
460 let mut stake_without_leader = 0u64;
462 for certificate in current_certificates {
464 let stake = current_committee.get_stake(certificate.author());
466 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
468 true => stake_with_leader = stake_with_leader.saturating_add(stake),
470 false => stake_without_leader = stake_without_leader.saturating_add(stake),
472 }
473 }
474 (stake_with_leader, stake_without_leader)
476 }
477}
478
479impl<N: Network> BFT<N> {
480 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
482 &self,
483 certificate: BatchCertificate<N>,
484 ) -> Result<()> {
485 let _lock = self.lock.lock().await;
487
488 let certificate_round = certificate.round();
490
491 self.dag.write().insert(certificate);
493
494 let commit_round = certificate_round.saturating_sub(1);
496
497 if commit_round % 2 != 0 || commit_round < 2 {
500 return Ok(());
501 }
502 if commit_round <= self.dag.read().last_committed_round() {
504 return Ok(());
505 }
506
507 trace!("Checking if the leader is ready to be committed for round {commit_round}...");
509
510 let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
512 bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
513 };
514
515 let leader = match self.ledger().latest_leader() {
517 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
518 _ => {
519 let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
521 bail!("BFT failed to compute the leader for commit round {commit_round}");
522 };
523
524 self.ledger().update_latest_leader(commit_round, computed_leader);
526
527 computed_leader
528 }
529 };
530
531 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
533 else {
534 trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
535 return Ok(());
536 };
537 let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
539 bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
541 };
542 let Ok(certificate_committee_lookback) = self.ledger().get_committee_lookback_for_round(certificate_round)
544 else {
545 bail!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}");
546 };
547 let authors = certificates
549 .values()
550 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
551 true => Some(c.author()),
552 false => None,
553 })
554 .collect();
555 if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
557 trace!("BFT is not ready to commit {commit_round}");
559 return Ok(());
560 }
561
562 info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
564
565 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
567 }
568
569 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
571 &self,
572 leader_certificate: BatchCertificate<N>,
573 ) -> Result<()> {
574 let latest_leader_round = leader_certificate.round();
576 let mut leader_certificates = vec![leader_certificate.clone()];
579 {
580 let leader_round = leader_certificate.round();
582
583 let mut current_certificate = leader_certificate;
584 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
585 {
586 let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
588 Ok(committee) => committee,
589 Err(e) => {
590 bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
591 }
592 };
593 let leader = match self.ledger().latest_leader() {
595 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
596 _ => {
597 let computed_leader = match previous_committee_lookback.get_leader(round) {
599 Ok(leader) => leader,
600 Err(e) => {
601 bail!("BFT failed to compute the leader for the even round {round} - {e}");
602 }
603 };
604
605 self.ledger().update_latest_leader(round, computed_leader);
607
608 computed_leader
609 }
610 };
611 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
613 else {
614 continue;
615 };
616 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
618 leader_certificates.push(previous_certificate.clone());
620 current_certificate = previous_certificate;
622 }
623 }
624 }
625
626 for leader_certificate in leader_certificates.into_iter().rev() {
628 let leader_round = leader_certificate.round();
630 let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
632 Ok(subdag) => subdag,
633 Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
634 };
635 if !IS_SYNCING {
637 let mut transmissions = IndexMap::new();
639 let mut seen_transaction_ids = IndexSet::new();
641 let mut seen_solution_ids = IndexSet::new();
643 for certificate in commit_subdag.values().flatten() {
645 for transmission_id in certificate.transmission_ids() {
647 match transmission_id {
651 TransmissionID::Solution(solution_id, _) => {
652 if seen_solution_ids.contains(&solution_id) {
654 continue;
655 }
656 }
657 TransmissionID::Transaction(transaction_id, _) => {
658 if seen_transaction_ids.contains(transaction_id) {
660 continue;
661 }
662 }
663 TransmissionID::Ratification => {
664 bail!("Ratifications are currently not supported in the BFT.")
665 }
666 }
667 if transmissions.contains_key(transmission_id) {
669 continue;
670 }
671 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
674 continue;
675 }
676 let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
678 bail!(
679 "BFT failed to retrieve transmission '{}.{}' from round {}",
680 fmt_id(transmission_id),
681 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
682 certificate.round()
683 );
684 };
685 match transmission_id {
687 TransmissionID::Solution(id, _) => {
688 seen_solution_ids.insert(id);
689 }
690 TransmissionID::Transaction(id, _) => {
691 seen_transaction_ids.insert(id);
692 }
693 TransmissionID::Ratification => {}
694 }
695 transmissions.insert(*transmission_id, transmission);
697 }
698 }
699 let subdag = Subdag::from(commit_subdag.clone())?;
702 let anchor_round = subdag.anchor_round();
704 let num_transmissions = transmissions.len();
706 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
708
709 ensure!(
711 anchor_round == leader_round,
712 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
713 );
714
715 if let Some(consensus_sender) = self.consensus_sender.get() {
717 let (callback_sender, callback_receiver) = oneshot::channel();
719 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
721 match callback_receiver.await {
723 Ok(Ok(())) => (), Ok(Err(e)) => {
725 error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
726 return Ok(());
727 }
728 Err(e) => {
729 error!("BFT failed to receive the callback for round {anchor_round} - {e}");
730 return Ok(());
731 }
732 }
733 }
734
735 info!(
736 "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
737 );
738 }
739
740 let mut dag_write = self.dag.write();
742 for certificate in commit_subdag.values().flatten() {
743 dag_write.commit(certificate, self.storage().max_gc_rounds());
744 }
745
746 #[cfg(feature = "telemetry")]
748 self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
749 }
750
751 self.storage().garbage_collect_certificates(latest_leader_round);
767
768 Ok(())
769 }
770
771 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
773 &self,
774 leader_certificate: BatchCertificate<N>,
775 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
776 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
778 let mut already_ordered = HashSet::new();
780 let mut buffer = vec![leader_certificate];
782 while let Some(certificate) = buffer.pop() {
784 commit.entry(certificate.round()).or_default().insert(certificate.clone());
786
787 let previous_round = certificate.round().saturating_sub(1);
792 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
793 continue;
794 }
795 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
799 if already_ordered.contains(previous_certificate_id) {
801 continue;
802 }
803 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
805 continue;
806 }
807 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
809 continue;
810 }
811
812 let previous_certificate = {
814 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
816 Some(previous_certificate) => previous_certificate,
818 None => match self.storage().get_certificate(*previous_certificate_id) {
820 Some(previous_certificate) => previous_certificate,
822 None => bail!(
824 "Missing previous certificate {} for round {previous_round}",
825 fmt_id(previous_certificate_id)
826 ),
827 },
828 }
829 };
830 already_ordered.insert(previous_certificate.id());
832 buffer.push(previous_certificate);
834 }
835 }
836 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
838 Ok(commit)
840 }
841
842 fn is_linked(
844 &self,
845 previous_certificate: BatchCertificate<N>,
846 current_certificate: BatchCertificate<N>,
847 ) -> Result<bool> {
848 let mut traversal = vec![current_certificate.clone()];
850 for round in (previous_certificate.round()..current_certificate.round()).rev() {
852 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
854 bail!("BFT failed to retrieve the certificates for past round {round}");
857 };
858 traversal = certificates
860 .into_values()
861 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
862 .collect();
863 }
864 Ok(traversal.contains(&previous_certificate))
865 }
866}
867
868impl<N: Network> BFT<N> {
869 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
871 let BFTReceiver {
872 mut rx_primary_round,
873 mut rx_primary_certificate,
874 mut rx_sync_bft_dag_at_bootup,
875 mut rx_sync_bft,
876 } = bft_receiver;
877
878 let self_ = self.clone();
880 self.spawn(async move {
881 while let Some((current_round, callback)) = rx_primary_round.recv().await {
882 callback.send(self_.update_to_next_round(current_round)).ok();
883 }
884 });
885
886 let self_ = self.clone();
888 self.spawn(async move {
889 while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
890 let result = self_.update_dag::<true, false>(certificate).await;
892 callback.send(result).ok();
895 }
896 });
897
898 let self_ = self.clone();
900 self.spawn(async move {
901 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
902 self_.sync_bft_dag_at_bootup(certificates).await;
903 }
904 });
905
906 let self_ = self.clone();
908 self.spawn(async move {
909 while let Some((certificate, callback)) = rx_sync_bft.recv().await {
910 let result = self_.update_dag::<true, true>(certificate).await;
912 callback.send(result).ok();
915 }
916 });
917 }
918
919 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
926 let mut dag = self.dag.write();
928
929 for certificate in certificates {
931 dag.commit(&certificate, self.storage().max_gc_rounds());
932 }
933 }
934
935 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
937 self.handles.lock().push(tokio::spawn(future));
938 }
939
940 pub async fn shut_down(&self) {
942 info!("Shutting down the BFT...");
943 let _lock = self.lock.lock().await;
945 self.primary.shut_down().await;
947 self.handles.lock().iter().for_each(|handle| handle.abort());
949 }
950}
951
952#[cfg(test)]
953mod tests {
954 use crate::{BFT, MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, helpers::Storage};
955 use snarkos_account::Account;
956 use snarkos_node_bft_ledger_service::MockLedgerService;
957 use snarkos_node_bft_storage_service::BFTMemoryService;
958 use snarkos_node_sync::BlockSync;
959 use snarkvm::{
960 console::account::{Address, PrivateKey},
961 ledger::{
962 committee::Committee,
963 narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
964 },
965 utilities::TestRng,
966 };
967
968 use aleo_std::StorageMode;
969 use anyhow::Result;
970 use indexmap::{IndexMap, IndexSet};
971 use std::sync::Arc;
972
973 type CurrentNetwork = snarkvm::console::network::MainnetV0;
974
975 fn sample_test_instance(
977 committee_round: Option<u64>,
978 max_gc_rounds: u64,
979 rng: &mut TestRng,
980 ) -> (
981 Committee<CurrentNetwork>,
982 Account<CurrentNetwork>,
983 Arc<MockLedgerService<CurrentNetwork>>,
984 Storage<CurrentNetwork>,
985 ) {
986 let committee = match committee_round {
987 Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
988 None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
989 };
990 let account = Account::new(rng).unwrap();
991 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
992 let transmissions = Arc::new(BFTMemoryService::new());
993 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
994
995 (committee, account, ledger, storage)
996 }
997
998 fn initialize_bft(
1000 account: Account<CurrentNetwork>,
1001 storage: Storage<CurrentNetwork>,
1002 ledger: Arc<MockLedgerService<CurrentNetwork>>,
1003 ) -> anyhow::Result<BFT<CurrentNetwork>> {
1004 let block_sync = Arc::new(BlockSync::new(ledger.clone()));
1006 BFT::new(
1008 account.clone(),
1009 storage.clone(),
1010 ledger.clone(),
1011 block_sync,
1012 None,
1013 &[],
1014 false,
1015 StorageMode::new_test(None),
1016 None,
1017 )
1018 }
1019
1020 #[test]
1021 #[tracing_test::traced_test]
1022 fn test_is_leader_quorum_odd() -> Result<()> {
1023 let rng = &mut TestRng::default();
1024
1025 let mut certificates = IndexSet::new();
1027 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1028 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1029 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1030 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1031
1032 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1034 1,
1035 vec![
1036 certificates[0].author(),
1037 certificates[1].author(),
1038 certificates[2].author(),
1039 certificates[3].author(),
1040 ],
1041 rng,
1042 );
1043
1044 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1046 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1048 let account = Account::new(rng)?;
1050 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1052 assert!(bft.is_timer_expired());
1053 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1055 assert!(!result);
1057 for certificate in certificates.iter() {
1059 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1060 }
1061 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1063 assert!(result); let leader_certificate = sample_batch_certificate(rng);
1066 *bft.leader_certificate.write() = Some(leader_certificate);
1067 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1069 assert!(result); Ok(())
1072 }
1073
1074 #[test]
1075 #[tracing_test::traced_test]
1076 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1077 let rng = &mut TestRng::default();
1078
1079 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1081 assert_eq!(committee.starting_round(), 1);
1082 assert_eq!(storage.current_round(), 1);
1083 assert_eq!(storage.max_gc_rounds(), 10);
1084
1085 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1087 assert!(bft.is_timer_expired());
1088
1089 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1092 assert!(!result);
1093 Ok(())
1094 }
1095
1096 #[test]
1097 #[tracing_test::traced_test]
1098 fn test_is_leader_quorum_even() -> Result<()> {
1099 let rng = &mut TestRng::default();
1100
1101 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1103 assert_eq!(committee.starting_round(), 2);
1104 assert_eq!(storage.current_round(), 2);
1105 assert_eq!(storage.max_gc_rounds(), 10);
1106
1107 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1109 assert!(bft.is_timer_expired());
1110
1111 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1113 assert!(!result);
1114 Ok(())
1115 }
1116
1117 #[test]
1118 #[tracing_test::traced_test]
1119 fn test_is_even_round_ready() -> Result<()> {
1120 let rng = &mut TestRng::default();
1121
1122 let mut certificates = IndexSet::new();
1124 certificates.insert(sample_batch_certificate_for_round(2, rng));
1125 certificates.insert(sample_batch_certificate_for_round(2, rng));
1126 certificates.insert(sample_batch_certificate_for_round(2, rng));
1127 certificates.insert(sample_batch_certificate_for_round(2, rng));
1128
1129 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1131 2,
1132 vec![
1133 certificates[0].author(),
1134 certificates[1].author(),
1135 certificates[2].author(),
1136 certificates[3].author(),
1137 ],
1138 rng,
1139 );
1140
1141 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1143 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1145 let account = Account::new(rng)?;
1147
1148 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1150 assert!(bft.is_timer_expired());
1151
1152 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1154 *bft.leader_certificate.write() = Some(leader_certificate);
1155 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1156 assert!(!result);
1158 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1160 assert!(result);
1161
1162 let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1164 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1166 if !bft_timer.is_timer_expired() {
1167 assert!(!result);
1168 }
1169 let leader_certificate_timeout =
1171 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1172 std::thread::sleep(leader_certificate_timeout);
1173 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1175 if bft_timer.is_timer_expired() {
1176 assert!(result);
1177 } else {
1178 assert!(!result);
1179 }
1180
1181 Ok(())
1182 }
1183
1184 #[test]
1185 #[tracing_test::traced_test]
1186 fn test_update_leader_certificate_odd() -> Result<()> {
1187 let rng = &mut TestRng::default();
1188
1189 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1191 assert_eq!(storage.max_gc_rounds(), 10);
1192
1193 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1195 assert!(bft.is_timer_expired());
1196
1197 let result = bft.update_leader_certificate_to_even_round(1);
1199 assert!(!result);
1200 Ok(())
1201 }
1202
1203 #[test]
1204 #[tracing_test::traced_test]
1205 fn test_update_leader_certificate_bad_round() -> Result<()> {
1206 let rng = &mut TestRng::default();
1207
1208 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1210 assert_eq!(storage.max_gc_rounds(), 10);
1211
1212 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1214
1215 let result = bft.update_leader_certificate_to_even_round(6);
1217 assert!(!result);
1218 Ok(())
1219 }
1220
1221 #[test]
1222 #[tracing_test::traced_test]
1223 fn test_update_leader_certificate_even() -> Result<()> {
1224 let rng = &mut TestRng::default();
1225
1226 let current_round = 3;
1228
1229 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1231 current_round,
1232 rng,
1233 );
1234
1235 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1237 2,
1238 vec![
1239 certificates[0].author(),
1240 certificates[1].author(),
1241 certificates[2].author(),
1242 certificates[3].author(),
1243 ],
1244 rng,
1245 );
1246
1247 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1249
1250 let transmissions = Arc::new(BFTMemoryService::new());
1252 let storage = Storage::new(ledger.clone(), transmissions, 10);
1253 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1254 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1255 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1256 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1257 assert_eq!(storage.current_round(), 2);
1258
1259 let leader = committee.get_leader(2).unwrap();
1261 let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1262
1263 let account = Account::new(rng)?;
1265 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1266
1267 *bft.leader_certificate.write() = Some(leader_certificate);
1269
1270 let result = bft.update_leader_certificate_to_even_round(2);
1273 assert!(result);
1274
1275 Ok(())
1276 }
1277
1278 #[tokio::test]
1279 #[tracing_test::traced_test]
1280 async fn test_order_dag_with_dfs() -> Result<()> {
1281 let rng = &mut TestRng::default();
1282
1283 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1285
1286 let previous_round = 2; let current_round = previous_round + 1;
1289
1290 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1292 current_round,
1293 rng,
1294 );
1295
1296 {
1300 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1302 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1304
1305 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1307
1308 for certificate in previous_certificates.clone() {
1310 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1311 }
1312
1313 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1315 assert!(result.is_ok());
1316 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1317 assert_eq!(candidate_certificates.len(), 1);
1318 let expected_certificates = vec![certificate.clone()];
1319 assert_eq!(
1320 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1321 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1322 );
1323 assert_eq!(candidate_certificates, expected_certificates);
1324 }
1325
1326 {
1330 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1332 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1334
1335 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1337
1338 for certificate in previous_certificates.clone() {
1340 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1341 }
1342
1343 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1345 assert!(result.is_ok());
1346 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1347 assert_eq!(candidate_certificates.len(), 5);
1348 let expected_certificates = vec![
1349 previous_certificates[0].clone(),
1350 previous_certificates[1].clone(),
1351 previous_certificates[2].clone(),
1352 previous_certificates[3].clone(),
1353 certificate,
1354 ];
1355 assert_eq!(
1356 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1357 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1358 );
1359 assert_eq!(candidate_certificates, expected_certificates);
1360 }
1361
1362 Ok(())
1363 }
1364
1365 #[test]
1366 #[tracing_test::traced_test]
1367 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1368 let rng = &mut TestRng::default();
1369
1370 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1372 assert_eq!(committee.starting_round(), 1);
1373 assert_eq!(storage.current_round(), 1);
1374 assert_eq!(storage.max_gc_rounds(), 1);
1375
1376 let previous_round = 2; let current_round = previous_round + 1;
1379
1380 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1382 current_round,
1383 rng,
1384 );
1385 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1387
1388 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1392
1393 let error_msg = format!(
1395 "Missing previous certificate {} for round {previous_round}",
1396 crate::helpers::fmt_id(previous_certificate_ids[3]),
1397 );
1398
1399 let result = bft.order_dag_with_dfs::<false>(certificate);
1401 assert!(result.is_err());
1402 assert_eq!(result.unwrap_err().to_string(), error_msg);
1403 Ok(())
1404 }
1405
1406 #[tokio::test]
1407 #[tracing_test::traced_test]
1408 async fn test_bft_gc_on_commit() -> Result<()> {
1409 let rng = &mut TestRng::default();
1410
1411 let max_gc_rounds = 1;
1413 let committee_round = 0;
1414 let commit_round = 2;
1415 let current_round = commit_round + 1;
1416
1417 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1419 current_round,
1420 rng,
1421 );
1422
1423 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1425 committee_round,
1426 vec![
1427 certificates[0].author(),
1428 certificates[1].author(),
1429 certificates[2].author(),
1430 certificates[3].author(),
1431 ],
1432 rng,
1433 );
1434
1435 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1437
1438 let transmissions = Arc::new(BFTMemoryService::new());
1440 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1441 for certificate in certificates.iter() {
1443 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1444 }
1445
1446 let leader = committee.get_leader(commit_round).unwrap();
1448 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1449
1450 let account = Account::new(rng)?;
1452 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1453
1454 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1455
1456 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1458
1459 for certificate in certificates {
1461 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1462 }
1463
1464 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1466
1467 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1469
1470 Ok(())
1471 }
1472
1473 #[tokio::test]
1474 #[tracing_test::traced_test]
1475 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1476 let rng = &mut TestRng::default();
1477
1478 let max_gc_rounds = 1;
1480 let committee_round = 0;
1481 let commit_round = 2;
1482 let current_round = commit_round + 1;
1483
1484 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1486 current_round,
1487 rng,
1488 );
1489
1490 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1492 committee_round,
1493 vec![
1494 certificates[0].author(),
1495 certificates[1].author(),
1496 certificates[2].author(),
1497 certificates[3].author(),
1498 ],
1499 rng,
1500 );
1501
1502 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1504
1505 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1507 for certificate in certificates.iter() {
1509 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1510 }
1511
1512 let leader = committee.get_leader(commit_round).unwrap();
1514 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1515
1516 let account = Account::new(rng)?;
1518 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1519
1520 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1522
1523 for certificate in certificates.clone() {
1525 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1526 }
1527
1528 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1530
1531 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1535 let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1537
1538 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1540
1541 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1543
1544 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1546 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1547
1548 for certificate in certificates {
1550 let certificate_round = certificate.round();
1551 let certificate_id = certificate.id();
1552 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1554 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1557 }
1558
1559 Ok(())
1560 }
1561
1562 #[tokio::test]
1563 #[tracing_test::traced_test]
1564 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1565 let rng = &mut TestRng::default();
1572
1573 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1575 let committee_round = 0;
1576 let commit_round = 2;
1577 let current_round = commit_round + 1;
1578 let next_round = current_round + 1;
1579
1580 let (round_to_certificates_map, committee) = {
1582 let private_keys = vec![
1583 PrivateKey::new(rng).unwrap(),
1584 PrivateKey::new(rng).unwrap(),
1585 PrivateKey::new(rng).unwrap(),
1586 PrivateKey::new(rng).unwrap(),
1587 ];
1588 let addresses = vec![
1589 Address::try_from(private_keys[0])?,
1590 Address::try_from(private_keys[1])?,
1591 Address::try_from(private_keys[2])?,
1592 Address::try_from(private_keys[3])?,
1593 ];
1594 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1595 committee_round,
1596 addresses,
1597 rng,
1598 );
1599 let mut round_to_certificates_map: IndexMap<
1601 u64,
1602 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1603 > = IndexMap::new();
1604 let mut previous_certificates = IndexSet::with_capacity(4);
1605 for _ in 0..4 {
1607 previous_certificates.insert(sample_batch_certificate(rng));
1608 }
1609 for round in 0..commit_round + 3 {
1610 let mut current_certificates = IndexSet::new();
1611 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1612 IndexSet::new()
1613 } else {
1614 previous_certificates.iter().map(|c| c.id()).collect()
1615 };
1616 let transmission_ids =
1617 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1618 .into_iter()
1619 .collect::<IndexSet<_>>();
1620 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1621 let committee_id = committee.id();
1622 for (i, private_key_1) in private_keys.iter().enumerate() {
1623 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1624 private_key_1,
1625 round,
1626 timestamp,
1627 committee_id,
1628 transmission_ids.clone(),
1629 previous_certificate_ids.clone(),
1630 rng,
1631 )
1632 .unwrap();
1633 let mut signatures = IndexSet::with_capacity(4);
1634 for (j, private_key_2) in private_keys.iter().enumerate() {
1635 if i != j {
1636 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1637 }
1638 }
1639 let certificate =
1640 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1641 current_certificates.insert(certificate);
1642 }
1643 round_to_certificates_map.insert(round, current_certificates.clone());
1645 previous_certificates = current_certificates.clone();
1646 }
1647 (round_to_certificates_map, committee)
1648 };
1649
1650 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1652 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1654 let leader = committee.get_leader(commit_round).unwrap();
1656 let next_leader = committee.get_leader(next_round).unwrap();
1657 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1659 for i in 1..=commit_round {
1660 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1661 if i == commit_round {
1662 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1664 if let Some(c) = leader_certificate {
1665 pre_shutdown_certificates.push(c.clone());
1666 }
1667 continue;
1668 }
1669 pre_shutdown_certificates.extend(certificates);
1670 }
1671 for certificate in pre_shutdown_certificates.iter() {
1672 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1673 }
1674 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1676 Vec::new();
1677 for j in commit_round..=commit_round + 2 {
1678 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1679 post_shutdown_certificates.extend(certificate);
1680 }
1681 for certificate in post_shutdown_certificates.iter() {
1682 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1683 }
1684 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1686 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1687
1688 let account = Account::new(rng)?;
1690 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1691
1692 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1694
1695 for certificate in pre_shutdown_certificates.clone() {
1697 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1698 }
1699
1700 for certificate in post_shutdown_certificates.clone() {
1702 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1703 }
1704 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1706 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1707 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1708
1709 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1713
1714 let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1716
1717 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1719
1720 for certificate in post_shutdown_certificates.iter() {
1722 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1723 }
1724 for certificate in post_shutdown_certificates.clone() {
1725 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1726 }
1727 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1729 let commit_subdag_metadata_bootup =
1730 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1731 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1732 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1733
1734 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1738
1739 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1741 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1742 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1743 assert!(
1744 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1745 );
1746
1747 for certificate in pre_shutdown_certificates.clone() {
1749 let certificate_round = certificate.round();
1750 let certificate_id = certificate.id();
1751 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1753 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1754 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1757 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1758 }
1759
1760 for certificate in committed_certificates_bootup.clone() {
1762 let certificate_round = certificate.round();
1763 let certificate_id = certificate.id();
1764 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1766 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1767 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1770 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1771 }
1772
1773 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1775
1776 Ok(())
1777 }
1778
1779 #[tokio::test]
1780 #[tracing_test::traced_test]
1781 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1782 let rng = &mut TestRng::default();
1789
1790 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1792 let committee_round = 0;
1793 let commit_round = 2;
1794 let current_round = commit_round + 1;
1795 let next_round = current_round + 1;
1796
1797 let (round_to_certificates_map, committee) = {
1799 let private_keys = vec![
1800 PrivateKey::new(rng).unwrap(),
1801 PrivateKey::new(rng).unwrap(),
1802 PrivateKey::new(rng).unwrap(),
1803 PrivateKey::new(rng).unwrap(),
1804 ];
1805 let addresses = vec![
1806 Address::try_from(private_keys[0])?,
1807 Address::try_from(private_keys[1])?,
1808 Address::try_from(private_keys[2])?,
1809 Address::try_from(private_keys[3])?,
1810 ];
1811 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1812 committee_round,
1813 addresses,
1814 rng,
1815 );
1816 let mut round_to_certificates_map: IndexMap<
1818 u64,
1819 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1820 > = IndexMap::new();
1821 let mut previous_certificates = IndexSet::with_capacity(4);
1822 for _ in 0..4 {
1824 previous_certificates.insert(sample_batch_certificate(rng));
1825 }
1826 for round in 0..=commit_round + 2 {
1827 let mut current_certificates = IndexSet::new();
1828 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1829 IndexSet::new()
1830 } else {
1831 previous_certificates.iter().map(|c| c.id()).collect()
1832 };
1833 let transmission_ids =
1834 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1835 .into_iter()
1836 .collect::<IndexSet<_>>();
1837 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1838 let committee_id = committee.id();
1839 for (i, private_key_1) in private_keys.iter().enumerate() {
1840 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1841 private_key_1,
1842 round,
1843 timestamp,
1844 committee_id,
1845 transmission_ids.clone(),
1846 previous_certificate_ids.clone(),
1847 rng,
1848 )
1849 .unwrap();
1850 let mut signatures = IndexSet::with_capacity(4);
1851 for (j, private_key_2) in private_keys.iter().enumerate() {
1852 if i != j {
1853 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1854 }
1855 }
1856 let certificate =
1857 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1858 current_certificates.insert(certificate);
1859 }
1860 round_to_certificates_map.insert(round, current_certificates.clone());
1862 previous_certificates = current_certificates.clone();
1863 }
1864 (round_to_certificates_map, committee)
1865 };
1866
1867 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1869 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1871 let leader = committee.get_leader(commit_round).unwrap();
1873 let next_leader = committee.get_leader(next_round).unwrap();
1874 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1876 for i in 1..=commit_round {
1877 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1878 if i == commit_round {
1879 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1881 if let Some(c) = leader_certificate {
1882 pre_shutdown_certificates.push(c.clone());
1883 }
1884 continue;
1885 }
1886 pre_shutdown_certificates.extend(certificates);
1887 }
1888 for certificate in pre_shutdown_certificates.iter() {
1889 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1890 }
1891 let account = Account::new(rng)?;
1893 let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1894
1895 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1897 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1899
1900 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1902 Vec::new();
1903 for j in commit_round..=commit_round + 2 {
1904 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1905 post_shutdown_certificates.extend(certificate);
1906 }
1907 for certificate in post_shutdown_certificates.iter() {
1908 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1909 }
1910
1911 for certificate in post_shutdown_certificates.clone() {
1913 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1914 }
1915
1916 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1918 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1919 let committed_certificates = commit_subdag.values().flatten();
1920
1921 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1923 for committed_certificate in committed_certificates.clone() {
1924 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1925 }
1926 }
1927 Ok(())
1928 }
1929}