1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 Primary,
19 helpers::{
20 BFTReceiver,
21 ConsensusSender,
22 DAG,
23 PrimaryReceiver,
24 PrimarySender,
25 Storage,
26 fmt_id,
27 init_bft_channels,
28 now,
29 },
30};
31use snarkos_account::Account;
32use snarkos_node_bft_ledger_service::LedgerService;
33use snarkos_node_sync::{BlockSync, Ping};
34use snarkvm::{
35 console::account::Address,
36 ledger::{
37 block::Transaction,
38 committee::Committee,
39 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
40 puzzle::{Solution, SolutionID},
41 },
42 prelude::{Field, Network, Result, bail, ensure},
43};
44
45use aleo_std::StorageMode;
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48#[cfg(feature = "locktick")]
49use locktick::{
50 parking_lot::{Mutex, RwLock},
51 tokio::Mutex as TMutex,
52};
53#[cfg(not(feature = "locktick"))]
54use parking_lot::{Mutex, RwLock};
55use std::{
56 collections::{BTreeMap, HashSet},
57 future::Future,
58 net::SocketAddr,
59 sync::{
60 Arc,
61 atomic::{AtomicI64, Ordering},
62 },
63};
64#[cfg(not(feature = "locktick"))]
65use tokio::sync::Mutex as TMutex;
66use tokio::{
67 sync::{OnceCell, oneshot},
68 task::JoinHandle,
69};
70
71#[derive(Clone)]
72pub struct BFT<N: Network> {
73 primary: Primary<N>,
75 dag: Arc<RwLock<DAG<N>>>,
77 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
79 leader_certificate_timer: Arc<AtomicI64>,
81 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
83 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
85 lock: Arc<TMutex<()>>,
87}
88
89impl<N: Network> BFT<N> {
90 #[allow(clippy::too_many_arguments)]
92 pub fn new(
93 account: Account<N>,
94 storage: Storage<N>,
95 ledger: Arc<dyn LedgerService<N>>,
96 block_sync: Arc<BlockSync<N>>,
97 ip: Option<SocketAddr>,
98 trusted_validators: &[SocketAddr],
99 storage_mode: StorageMode,
100 dev: Option<u16>,
101 ) -> Result<Self> {
102 Ok(Self {
103 primary: Primary::new(account, storage, ledger, block_sync, ip, trusted_validators, storage_mode, dev)?,
104 dag: Default::default(),
105 leader_certificate: Default::default(),
106 leader_certificate_timer: Default::default(),
107 consensus_sender: Default::default(),
108 handles: Default::default(),
109 lock: Default::default(),
110 })
111 }
112
113 pub async fn run(
118 &mut self,
119 ping: Option<Arc<Ping<N>>>,
120 consensus_sender: Option<ConsensusSender<N>>,
121 primary_sender: PrimarySender<N>,
122 primary_receiver: PrimaryReceiver<N>,
123 ) -> Result<()> {
124 info!("Starting the BFT instance...");
125 let (bft_sender, bft_receiver) = init_bft_channels::<N>();
127 self.start_handlers(bft_receiver);
129 self.primary.run(ping, Some(bft_sender), primary_sender, primary_receiver).await?;
131 if let Some(consensus_sender) = consensus_sender {
134 self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
135 }
136 Ok(())
137 }
138
139 pub fn is_synced(&self) -> bool {
141 self.primary.is_synced()
142 }
143
144 pub const fn primary(&self) -> &Primary<N> {
146 &self.primary
147 }
148
149 pub const fn storage(&self) -> &Storage<N> {
151 self.primary.storage()
152 }
153
154 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
156 self.primary.ledger()
157 }
158
159 pub fn leader(&self) -> Option<Address<N>> {
161 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
162 }
163
164 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
166 &self.leader_certificate
167 }
168}
169
170impl<N: Network> BFT<N> {
171 pub fn num_unconfirmed_transmissions(&self) -> usize {
173 self.primary.num_unconfirmed_transmissions()
174 }
175
176 pub fn num_unconfirmed_ratifications(&self) -> usize {
178 self.primary.num_unconfirmed_ratifications()
179 }
180
181 pub fn num_unconfirmed_solutions(&self) -> usize {
183 self.primary.num_unconfirmed_solutions()
184 }
185
186 pub fn num_unconfirmed_transactions(&self) -> usize {
188 self.primary.num_unconfirmed_transactions()
189 }
190}
191
192impl<N: Network> BFT<N> {
193 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
195 self.primary.worker_transmission_ids()
196 }
197
198 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
200 self.primary.worker_transmissions()
201 }
202
203 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
205 self.primary.worker_solutions()
206 }
207
208 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
210 self.primary.worker_transactions()
211 }
212}
213
214impl<N: Network> BFT<N> {
215 fn update_to_next_round(&self, current_round: u64) -> bool {
217 let storage_round = self.storage().current_round();
219 if current_round < storage_round {
220 debug!(
221 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
222 );
223 return false;
224 }
225
226 let is_ready = match current_round % 2 == 0 {
228 true => self.update_leader_certificate_to_even_round(current_round),
229 false => self.is_leader_quorum_or_nonleaders_available(current_round),
230 };
231
232 #[cfg(feature = "metrics")]
233 {
234 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
235 if start > 0 {
237 let end = now();
238 let elapsed = std::time::Duration::from_secs((end - start) as u64);
239 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
240 }
241 }
242
243 if current_round % 2 == 0 {
245 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
247 if !is_ready {
249 trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
250 }
251 let leader_round = leader_certificate.round();
253 match leader_round == current_round {
254 true => {
255 info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
256 #[cfg(feature = "metrics")]
257 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
258 }
259 false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
260 }
261 } else {
262 match is_ready {
263 true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
264 false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()),
265 }
266 }
267 }
268
269 if is_ready {
271 if let Err(e) = self.storage().increment_to_next_round(current_round) {
273 warn!("BFT failed to increment to the next round from round {current_round} - {e}");
274 return false;
275 }
276 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
278 }
279
280 is_ready
281 }
282
283 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
289 let current_round = self.storage().current_round();
291 if current_round != even_round {
293 warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
294 return false;
295 }
296
297 if current_round % 2 != 0 || current_round < 2 {
299 error!("BFT cannot update the leader certificate in an odd round");
300 return false;
301 }
302
303 let current_certificates = self.storage().get_certificates_for_round(current_round);
305 if current_certificates.is_empty() {
307 *self.leader_certificate.write() = None;
309 return false;
310 }
311
312 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
314 Ok(committee) => committee,
315 Err(e) => {
316 error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
317 return false;
318 }
319 };
320 let leader = match self.ledger().latest_leader() {
322 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
323 _ => {
324 let computed_leader = match committee_lookback.get_leader(current_round) {
326 Ok(leader) => leader,
327 Err(e) => {
328 error!("BFT failed to compute the leader for the even round {current_round} - {e}");
329 return false;
330 }
331 };
332
333 self.ledger().update_latest_leader(current_round, computed_leader);
335
336 computed_leader
337 }
338 };
339 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
341 *self.leader_certificate.write() = leader_certificate.cloned();
342
343 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
344 }
345
346 fn is_even_round_ready_for_next_round(
350 &self,
351 certificates: IndexSet<BatchCertificate<N>>,
352 committee: Committee<N>,
353 current_round: u64,
354 ) -> bool {
355 let authors = certificates.into_iter().map(|c| c.author()).collect();
357 if !committee.is_quorum_threshold_reached(&authors) {
359 trace!("BFT failed to reach quorum threshold in even round {current_round}");
360 return false;
361 }
362 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
364 if leader_certificate.round() == current_round {
365 return true;
366 }
367 }
368 if self.is_timer_expired() {
370 debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
371 return true;
372 }
373 false
375 }
376
377 fn is_timer_expired(&self) -> bool {
381 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
382 }
383
384 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
389 let current_round = self.storage().current_round();
391 if current_round != odd_round {
393 warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
394 return false;
395 }
396 if current_round % 2 != 1 {
398 error!("BFT does not compute stakes for the leader certificate in an even round");
399 return false;
400 }
401 let current_certificates = self.storage().get_certificates_for_round(current_round);
403 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
405 Ok(committee) => committee,
406 Err(e) => {
407 error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
408 return false;
409 }
410 };
411 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
413 if !committee_lookback.is_quorum_threshold_reached(&authors) {
415 trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
416 return false;
417 }
418 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
420 return true;
422 };
423 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
425 leader_certificate.id(),
426 current_certificates,
427 &committee_lookback,
428 );
429 stake_with_leader >= committee_lookback.availability_threshold()
431 || stake_without_leader >= committee_lookback.quorum_threshold()
432 || self.is_timer_expired()
433 }
434
435 fn compute_stake_for_leader_certificate(
437 &self,
438 leader_certificate_id: Field<N>,
439 current_certificates: IndexSet<BatchCertificate<N>>,
440 current_committee: &Committee<N>,
441 ) -> (u64, u64) {
442 if current_certificates.is_empty() {
444 return (0, 0);
445 }
446
447 let mut stake_with_leader = 0u64;
449 let mut stake_without_leader = 0u64;
451 for certificate in current_certificates {
453 let stake = current_committee.get_stake(certificate.author());
455 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
457 true => stake_with_leader = stake_with_leader.saturating_add(stake),
459 false => stake_without_leader = stake_without_leader.saturating_add(stake),
461 }
462 }
463 (stake_with_leader, stake_without_leader)
465 }
466}
467
468impl<N: Network> BFT<N> {
469 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
471 &self,
472 certificate: BatchCertificate<N>,
473 ) -> Result<()> {
474 let _lock = self.lock.lock().await;
476
477 let certificate_round = certificate.round();
479
480 self.dag.write().insert(certificate);
482
483 let commit_round = certificate_round.saturating_sub(1);
485
486 if commit_round % 2 != 0 || commit_round < 2 {
489 return Ok(());
490 }
491 if commit_round <= self.dag.read().last_committed_round() {
493 return Ok(());
494 }
495
496 trace!("Checking if the leader is ready to be committed for round {commit_round}...");
498
499 let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
501 bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
502 };
503
504 let leader = match self.ledger().latest_leader() {
506 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
507 _ => {
508 let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
510 bail!("BFT failed to compute the leader for commit round {commit_round}");
511 };
512
513 self.ledger().update_latest_leader(commit_round, computed_leader);
515
516 computed_leader
517 }
518 };
519
520 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
522 else {
523 trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
524 return Ok(());
525 };
526 let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
528 bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
530 };
531 let Ok(certificate_committee_lookback) = self.ledger().get_committee_lookback_for_round(certificate_round)
533 else {
534 bail!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}");
535 };
536 let authors = certificates
538 .values()
539 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
540 true => Some(c.author()),
541 false => None,
542 })
543 .collect();
544 if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
546 trace!("BFT is not ready to commit {commit_round}");
548 return Ok(());
549 }
550
551 info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
553
554 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
556 }
557
558 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
560 &self,
561 leader_certificate: BatchCertificate<N>,
562 ) -> Result<()> {
563 let latest_leader_round = leader_certificate.round();
565 let mut leader_certificates = vec![leader_certificate.clone()];
568 {
569 let leader_round = leader_certificate.round();
571
572 let mut current_certificate = leader_certificate;
573 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
574 {
575 let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
577 Ok(committee) => committee,
578 Err(e) => {
579 bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
580 }
581 };
582 let leader = match self.ledger().latest_leader() {
584 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
585 _ => {
586 let computed_leader = match previous_committee_lookback.get_leader(round) {
588 Ok(leader) => leader,
589 Err(e) => {
590 bail!("BFT failed to compute the leader for the even round {round} - {e}");
591 }
592 };
593
594 self.ledger().update_latest_leader(round, computed_leader);
596
597 computed_leader
598 }
599 };
600 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
602 else {
603 continue;
604 };
605 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
607 leader_certificates.push(previous_certificate.clone());
609 current_certificate = previous_certificate;
611 }
612 }
613 }
614
615 for leader_certificate in leader_certificates.into_iter().rev() {
617 let leader_round = leader_certificate.round();
619 let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
621 Ok(subdag) => subdag,
622 Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
623 };
624 if !IS_SYNCING {
626 let mut transmissions = IndexMap::new();
628 let mut seen_transaction_ids = IndexSet::new();
630 let mut seen_solution_ids = IndexSet::new();
632 for certificate in commit_subdag.values().flatten() {
634 for transmission_id in certificate.transmission_ids() {
636 match transmission_id {
640 TransmissionID::Solution(solution_id, _) => {
641 if seen_solution_ids.contains(&solution_id) {
643 continue;
644 }
645 }
646 TransmissionID::Transaction(transaction_id, _) => {
647 if seen_transaction_ids.contains(transaction_id) {
649 continue;
650 }
651 }
652 TransmissionID::Ratification => {
653 bail!("Ratifications are currently not supported in the BFT.")
654 }
655 }
656 if transmissions.contains_key(transmission_id) {
658 continue;
659 }
660 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
663 continue;
664 }
665 let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
667 bail!(
668 "BFT failed to retrieve transmission '{}.{}' from round {}",
669 fmt_id(transmission_id),
670 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
671 certificate.round()
672 );
673 };
674 match transmission_id {
676 TransmissionID::Solution(id, _) => {
677 seen_solution_ids.insert(id);
678 }
679 TransmissionID::Transaction(id, _) => {
680 seen_transaction_ids.insert(id);
681 }
682 TransmissionID::Ratification => {}
683 }
684 transmissions.insert(*transmission_id, transmission);
686 }
687 }
688 let subdag = Subdag::from(commit_subdag.clone())?;
691 let anchor_round = subdag.anchor_round();
693 let num_transmissions = transmissions.len();
695 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
697
698 ensure!(
700 anchor_round == leader_round,
701 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
702 );
703
704 if let Some(consensus_sender) = self.consensus_sender.get() {
706 let (callback_sender, callback_receiver) = oneshot::channel();
708 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
710 match callback_receiver.await {
712 Ok(Ok(())) => (), Ok(Err(e)) => {
714 error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
715 return Ok(());
716 }
717 Err(e) => {
718 error!("BFT failed to receive the callback for round {anchor_round} - {e}");
719 return Ok(());
720 }
721 }
722 }
723
724 info!(
725 "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
726 );
727 }
728
729 let mut dag_write = self.dag.write();
731 for certificate in commit_subdag.values().flatten() {
732 dag_write.commit(certificate, self.storage().max_gc_rounds());
733 }
734
735 #[cfg(feature = "telemetry")]
737 self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
738 }
739
740 self.storage().garbage_collect_certificates(latest_leader_round);
756
757 Ok(())
758 }
759
760 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
762 &self,
763 leader_certificate: BatchCertificate<N>,
764 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
765 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
767 let mut already_ordered = HashSet::new();
769 let mut buffer = vec![leader_certificate];
771 while let Some(certificate) = buffer.pop() {
773 commit.entry(certificate.round()).or_default().insert(certificate.clone());
775
776 let previous_round = certificate.round().saturating_sub(1);
781 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
782 continue;
783 }
784 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
788 if already_ordered.contains(previous_certificate_id) {
790 continue;
791 }
792 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
794 continue;
795 }
796 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
798 continue;
799 }
800
801 let previous_certificate = {
803 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
805 Some(previous_certificate) => previous_certificate,
807 None => match self.storage().get_certificate(*previous_certificate_id) {
809 Some(previous_certificate) => previous_certificate,
811 None => bail!(
813 "Missing previous certificate {} for round {previous_round}",
814 fmt_id(previous_certificate_id)
815 ),
816 },
817 }
818 };
819 already_ordered.insert(previous_certificate.id());
821 buffer.push(previous_certificate);
823 }
824 }
825 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
827 Ok(commit)
829 }
830
831 fn is_linked(
833 &self,
834 previous_certificate: BatchCertificate<N>,
835 current_certificate: BatchCertificate<N>,
836 ) -> Result<bool> {
837 let mut traversal = vec![current_certificate.clone()];
839 for round in (previous_certificate.round()..current_certificate.round()).rev() {
841 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
843 bail!("BFT failed to retrieve the certificates for past round {round}");
846 };
847 traversal = certificates
849 .into_values()
850 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
851 .collect();
852 }
853 Ok(traversal.contains(&previous_certificate))
854 }
855}
856
857impl<N: Network> BFT<N> {
858 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
860 let BFTReceiver {
861 mut rx_primary_round,
862 mut rx_primary_certificate,
863 mut rx_sync_bft_dag_at_bootup,
864 mut rx_sync_bft,
865 } = bft_receiver;
866
867 let self_ = self.clone();
869 self.spawn(async move {
870 while let Some((current_round, callback)) = rx_primary_round.recv().await {
871 callback.send(self_.update_to_next_round(current_round)).ok();
872 }
873 });
874
875 let self_ = self.clone();
877 self.spawn(async move {
878 while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
879 let result = self_.update_dag::<true, false>(certificate).await;
881 callback.send(result).ok();
884 }
885 });
886
887 let self_ = self.clone();
889 self.spawn(async move {
890 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
891 self_.sync_bft_dag_at_bootup(certificates).await;
892 }
893 });
894
895 let self_ = self.clone();
897 self.spawn(async move {
898 while let Some((certificate, callback)) = rx_sync_bft.recv().await {
899 let result = self_.update_dag::<true, true>(certificate).await;
901 callback.send(result).ok();
904 }
905 });
906 }
907
908 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
915 let mut dag = self.dag.write();
917
918 for certificate in certificates {
920 dag.commit(&certificate, self.storage().max_gc_rounds());
921 }
922 }
923
924 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
926 self.handles.lock().push(tokio::spawn(future));
927 }
928
929 pub async fn shut_down(&self) {
931 info!("Shutting down the BFT...");
932 let _lock = self.lock.lock().await;
934 self.primary.shut_down().await;
936 self.handles.lock().iter().for_each(|handle| handle.abort());
938 }
939}
940
941#[cfg(test)]
942mod tests {
943 use crate::{BFT, MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, helpers::Storage};
944 use snarkos_account::Account;
945 use snarkos_node_bft_ledger_service::MockLedgerService;
946 use snarkos_node_bft_storage_service::BFTMemoryService;
947 use snarkos_node_sync::BlockSync;
948 use snarkvm::{
949 console::account::{Address, PrivateKey},
950 ledger::{
951 committee::Committee,
952 narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
953 },
954 utilities::TestRng,
955 };
956
957 use aleo_std::StorageMode;
958 use anyhow::Result;
959 use indexmap::{IndexMap, IndexSet};
960 use std::sync::Arc;
961
962 type CurrentNetwork = snarkvm::console::network::MainnetV0;
963
964 fn sample_test_instance(
966 committee_round: Option<u64>,
967 max_gc_rounds: u64,
968 rng: &mut TestRng,
969 ) -> (
970 Committee<CurrentNetwork>,
971 Account<CurrentNetwork>,
972 Arc<MockLedgerService<CurrentNetwork>>,
973 Storage<CurrentNetwork>,
974 ) {
975 let committee = match committee_round {
976 Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
977 None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
978 };
979 let account = Account::new(rng).unwrap();
980 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
981 let transmissions = Arc::new(BFTMemoryService::new());
982 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
983
984 (committee, account, ledger, storage)
985 }
986
987 fn initialize_bft(
989 account: Account<CurrentNetwork>,
990 storage: Storage<CurrentNetwork>,
991 ledger: Arc<MockLedgerService<CurrentNetwork>>,
992 ) -> anyhow::Result<BFT<CurrentNetwork>> {
993 let block_sync = Arc::new(BlockSync::new(ledger.clone()));
995 BFT::new(
997 account.clone(),
998 storage.clone(),
999 ledger.clone(),
1000 block_sync,
1001 None,
1002 &[],
1003 StorageMode::new_test(None),
1004 None,
1005 )
1006 }
1007
1008 #[test]
1009 #[tracing_test::traced_test]
1010 fn test_is_leader_quorum_odd() -> Result<()> {
1011 let rng = &mut TestRng::default();
1012
1013 let mut certificates = IndexSet::new();
1015 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1016 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1017 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1018 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1019
1020 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1022 1,
1023 vec![
1024 certificates[0].author(),
1025 certificates[1].author(),
1026 certificates[2].author(),
1027 certificates[3].author(),
1028 ],
1029 rng,
1030 );
1031
1032 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1034 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1036 let account = Account::new(rng)?;
1038 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1040 assert!(bft.is_timer_expired());
1041 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1043 assert!(!result);
1045 for certificate in certificates.iter() {
1047 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1048 }
1049 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1051 assert!(result); let leader_certificate = sample_batch_certificate(rng);
1054 *bft.leader_certificate.write() = Some(leader_certificate);
1055 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1057 assert!(result); Ok(())
1060 }
1061
1062 #[test]
1063 #[tracing_test::traced_test]
1064 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1065 let rng = &mut TestRng::default();
1066
1067 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1069 assert_eq!(committee.starting_round(), 1);
1070 assert_eq!(storage.current_round(), 1);
1071 assert_eq!(storage.max_gc_rounds(), 10);
1072
1073 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1075 assert!(bft.is_timer_expired());
1076
1077 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1080 assert!(!result);
1081 Ok(())
1082 }
1083
1084 #[test]
1085 #[tracing_test::traced_test]
1086 fn test_is_leader_quorum_even() -> Result<()> {
1087 let rng = &mut TestRng::default();
1088
1089 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1091 assert_eq!(committee.starting_round(), 2);
1092 assert_eq!(storage.current_round(), 2);
1093 assert_eq!(storage.max_gc_rounds(), 10);
1094
1095 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1097 assert!(bft.is_timer_expired());
1098
1099 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1101 assert!(!result);
1102 Ok(())
1103 }
1104
1105 #[test]
1106 #[tracing_test::traced_test]
1107 fn test_is_even_round_ready() -> Result<()> {
1108 let rng = &mut TestRng::default();
1109
1110 let mut certificates = IndexSet::new();
1112 certificates.insert(sample_batch_certificate_for_round(2, rng));
1113 certificates.insert(sample_batch_certificate_for_round(2, rng));
1114 certificates.insert(sample_batch_certificate_for_round(2, rng));
1115 certificates.insert(sample_batch_certificate_for_round(2, rng));
1116
1117 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1119 2,
1120 vec![
1121 certificates[0].author(),
1122 certificates[1].author(),
1123 certificates[2].author(),
1124 certificates[3].author(),
1125 ],
1126 rng,
1127 );
1128
1129 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1131 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1133 let account = Account::new(rng)?;
1135
1136 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1138 assert!(bft.is_timer_expired());
1139
1140 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1142 *bft.leader_certificate.write() = Some(leader_certificate);
1143 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1144 assert!(!result);
1146 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1148 assert!(result);
1149
1150 let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1152 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1154 if !bft_timer.is_timer_expired() {
1155 assert!(!result);
1156 }
1157 let leader_certificate_timeout =
1159 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1160 std::thread::sleep(leader_certificate_timeout);
1161 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1163 if bft_timer.is_timer_expired() {
1164 assert!(result);
1165 } else {
1166 assert!(!result);
1167 }
1168
1169 Ok(())
1170 }
1171
1172 #[test]
1173 #[tracing_test::traced_test]
1174 fn test_update_leader_certificate_odd() -> Result<()> {
1175 let rng = &mut TestRng::default();
1176
1177 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1179 assert_eq!(storage.max_gc_rounds(), 10);
1180
1181 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1183 assert!(bft.is_timer_expired());
1184
1185 let result = bft.update_leader_certificate_to_even_round(1);
1187 assert!(!result);
1188 Ok(())
1189 }
1190
1191 #[test]
1192 #[tracing_test::traced_test]
1193 fn test_update_leader_certificate_bad_round() -> Result<()> {
1194 let rng = &mut TestRng::default();
1195
1196 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1198 assert_eq!(storage.max_gc_rounds(), 10);
1199
1200 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1202
1203 let result = bft.update_leader_certificate_to_even_round(6);
1205 assert!(!result);
1206 Ok(())
1207 }
1208
1209 #[test]
1210 #[tracing_test::traced_test]
1211 fn test_update_leader_certificate_even() -> Result<()> {
1212 let rng = &mut TestRng::default();
1213
1214 let current_round = 3;
1216
1217 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1219 current_round,
1220 rng,
1221 );
1222
1223 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1225 2,
1226 vec![
1227 certificates[0].author(),
1228 certificates[1].author(),
1229 certificates[2].author(),
1230 certificates[3].author(),
1231 ],
1232 rng,
1233 );
1234
1235 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1237
1238 let transmissions = Arc::new(BFTMemoryService::new());
1240 let storage = Storage::new(ledger.clone(), transmissions, 10);
1241 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1242 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1243 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1244 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1245 assert_eq!(storage.current_round(), 2);
1246
1247 let leader = committee.get_leader(2).unwrap();
1249 let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1250
1251 let account = Account::new(rng)?;
1253 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1254
1255 *bft.leader_certificate.write() = Some(leader_certificate);
1257
1258 let result = bft.update_leader_certificate_to_even_round(2);
1261 assert!(result);
1262
1263 Ok(())
1264 }
1265
1266 #[tokio::test]
1267 #[tracing_test::traced_test]
1268 async fn test_order_dag_with_dfs() -> Result<()> {
1269 let rng = &mut TestRng::default();
1270
1271 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1273
1274 let previous_round = 2; let current_round = previous_round + 1;
1277
1278 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1280 current_round,
1281 rng,
1282 );
1283
1284 {
1288 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1290 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1292
1293 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1295
1296 for certificate in previous_certificates.clone() {
1298 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1299 }
1300
1301 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1303 assert!(result.is_ok());
1304 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1305 assert_eq!(candidate_certificates.len(), 1);
1306 let expected_certificates = vec![certificate.clone()];
1307 assert_eq!(
1308 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1309 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1310 );
1311 assert_eq!(candidate_certificates, expected_certificates);
1312 }
1313
1314 {
1318 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1320 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1322
1323 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1325
1326 for certificate in previous_certificates.clone() {
1328 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1329 }
1330
1331 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1333 assert!(result.is_ok());
1334 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1335 assert_eq!(candidate_certificates.len(), 5);
1336 let expected_certificates = vec![
1337 previous_certificates[0].clone(),
1338 previous_certificates[1].clone(),
1339 previous_certificates[2].clone(),
1340 previous_certificates[3].clone(),
1341 certificate,
1342 ];
1343 assert_eq!(
1344 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1345 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1346 );
1347 assert_eq!(candidate_certificates, expected_certificates);
1348 }
1349
1350 Ok(())
1351 }
1352
1353 #[test]
1354 #[tracing_test::traced_test]
1355 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1356 let rng = &mut TestRng::default();
1357
1358 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1360 assert_eq!(committee.starting_round(), 1);
1361 assert_eq!(storage.current_round(), 1);
1362 assert_eq!(storage.max_gc_rounds(), 1);
1363
1364 let previous_round = 2; let current_round = previous_round + 1;
1367
1368 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1370 current_round,
1371 rng,
1372 );
1373 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1375
1376 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1380
1381 let error_msg = format!(
1383 "Missing previous certificate {} for round {previous_round}",
1384 crate::helpers::fmt_id(previous_certificate_ids[3]),
1385 );
1386
1387 let result = bft.order_dag_with_dfs::<false>(certificate);
1389 assert!(result.is_err());
1390 assert_eq!(result.unwrap_err().to_string(), error_msg);
1391 Ok(())
1392 }
1393
1394 #[tokio::test]
1395 #[tracing_test::traced_test]
1396 async fn test_bft_gc_on_commit() -> Result<()> {
1397 let rng = &mut TestRng::default();
1398
1399 let max_gc_rounds = 1;
1401 let committee_round = 0;
1402 let commit_round = 2;
1403 let current_round = commit_round + 1;
1404
1405 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1407 current_round,
1408 rng,
1409 );
1410
1411 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1413 committee_round,
1414 vec![
1415 certificates[0].author(),
1416 certificates[1].author(),
1417 certificates[2].author(),
1418 certificates[3].author(),
1419 ],
1420 rng,
1421 );
1422
1423 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1425
1426 let transmissions = Arc::new(BFTMemoryService::new());
1428 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1429 for certificate in certificates.iter() {
1431 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1432 }
1433
1434 let leader = committee.get_leader(commit_round).unwrap();
1436 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1437
1438 let account = Account::new(rng)?;
1440 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1441
1442 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1443
1444 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1446
1447 for certificate in certificates {
1449 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1450 }
1451
1452 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1454
1455 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1457
1458 Ok(())
1459 }
1460
1461 #[tokio::test]
1462 #[tracing_test::traced_test]
1463 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1464 let rng = &mut TestRng::default();
1465
1466 let max_gc_rounds = 1;
1468 let committee_round = 0;
1469 let commit_round = 2;
1470 let current_round = commit_round + 1;
1471
1472 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1474 current_round,
1475 rng,
1476 );
1477
1478 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1480 committee_round,
1481 vec![
1482 certificates[0].author(),
1483 certificates[1].author(),
1484 certificates[2].author(),
1485 certificates[3].author(),
1486 ],
1487 rng,
1488 );
1489
1490 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1492
1493 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1495 for certificate in certificates.iter() {
1497 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1498 }
1499
1500 let leader = committee.get_leader(commit_round).unwrap();
1502 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1503
1504 let account = Account::new(rng)?;
1506 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1507
1508 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1510
1511 for certificate in certificates.clone() {
1513 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1514 }
1515
1516 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1518
1519 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1523 let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1525
1526 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1528
1529 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1531
1532 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1534 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1535
1536 for certificate in certificates {
1538 let certificate_round = certificate.round();
1539 let certificate_id = certificate.id();
1540 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1542 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1545 }
1546
1547 Ok(())
1548 }
1549
1550 #[tokio::test]
1551 #[tracing_test::traced_test]
1552 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1553 let rng = &mut TestRng::default();
1560
1561 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1563 let committee_round = 0;
1564 let commit_round = 2;
1565 let current_round = commit_round + 1;
1566 let next_round = current_round + 1;
1567
1568 let (round_to_certificates_map, committee) = {
1570 let private_keys = vec![
1571 PrivateKey::new(rng).unwrap(),
1572 PrivateKey::new(rng).unwrap(),
1573 PrivateKey::new(rng).unwrap(),
1574 PrivateKey::new(rng).unwrap(),
1575 ];
1576 let addresses = vec![
1577 Address::try_from(private_keys[0])?,
1578 Address::try_from(private_keys[1])?,
1579 Address::try_from(private_keys[2])?,
1580 Address::try_from(private_keys[3])?,
1581 ];
1582 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1583 committee_round,
1584 addresses,
1585 rng,
1586 );
1587 let mut round_to_certificates_map: IndexMap<
1589 u64,
1590 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1591 > = IndexMap::new();
1592 let mut previous_certificates = IndexSet::with_capacity(4);
1593 for _ in 0..4 {
1595 previous_certificates.insert(sample_batch_certificate(rng));
1596 }
1597 for round in 0..commit_round + 3 {
1598 let mut current_certificates = IndexSet::new();
1599 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1600 IndexSet::new()
1601 } else {
1602 previous_certificates.iter().map(|c| c.id()).collect()
1603 };
1604 let transmission_ids =
1605 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1606 .into_iter()
1607 .collect::<IndexSet<_>>();
1608 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1609 let committee_id = committee.id();
1610 for (i, private_key_1) in private_keys.iter().enumerate() {
1611 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1612 private_key_1,
1613 round,
1614 timestamp,
1615 committee_id,
1616 transmission_ids.clone(),
1617 previous_certificate_ids.clone(),
1618 rng,
1619 )
1620 .unwrap();
1621 let mut signatures = IndexSet::with_capacity(4);
1622 for (j, private_key_2) in private_keys.iter().enumerate() {
1623 if i != j {
1624 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1625 }
1626 }
1627 let certificate =
1628 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1629 current_certificates.insert(certificate);
1630 }
1631 round_to_certificates_map.insert(round, current_certificates.clone());
1633 previous_certificates = current_certificates.clone();
1634 }
1635 (round_to_certificates_map, committee)
1636 };
1637
1638 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1640 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1642 let leader = committee.get_leader(commit_round).unwrap();
1644 let next_leader = committee.get_leader(next_round).unwrap();
1645 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1647 for i in 1..=commit_round {
1648 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1649 if i == commit_round {
1650 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1652 if let Some(c) = leader_certificate {
1653 pre_shutdown_certificates.push(c.clone());
1654 }
1655 continue;
1656 }
1657 pre_shutdown_certificates.extend(certificates);
1658 }
1659 for certificate in pre_shutdown_certificates.iter() {
1660 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1661 }
1662 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1664 Vec::new();
1665 for j in commit_round..=commit_round + 2 {
1666 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1667 post_shutdown_certificates.extend(certificate);
1668 }
1669 for certificate in post_shutdown_certificates.iter() {
1670 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1671 }
1672 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1674 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1675
1676 let account = Account::new(rng)?;
1678 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1679
1680 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1682
1683 for certificate in pre_shutdown_certificates.clone() {
1685 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1686 }
1687
1688 for certificate in post_shutdown_certificates.clone() {
1690 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1691 }
1692 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1694 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1695 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1696
1697 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1701
1702 let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1704
1705 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1707
1708 for certificate in post_shutdown_certificates.iter() {
1710 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1711 }
1712 for certificate in post_shutdown_certificates.clone() {
1713 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1714 }
1715 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1717 let commit_subdag_metadata_bootup =
1718 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1719 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1720 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1721
1722 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1726
1727 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1729 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1730 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1731 assert!(
1732 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1733 );
1734
1735 for certificate in pre_shutdown_certificates.clone() {
1737 let certificate_round = certificate.round();
1738 let certificate_id = certificate.id();
1739 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1741 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1742 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1745 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1746 }
1747
1748 for certificate in committed_certificates_bootup.clone() {
1750 let certificate_round = certificate.round();
1751 let certificate_id = certificate.id();
1752 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1754 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1755 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1758 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1759 }
1760
1761 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1763
1764 Ok(())
1765 }
1766
1767 #[tokio::test]
1768 #[tracing_test::traced_test]
1769 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1770 let rng = &mut TestRng::default();
1777
1778 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1780 let committee_round = 0;
1781 let commit_round = 2;
1782 let current_round = commit_round + 1;
1783 let next_round = current_round + 1;
1784
1785 let (round_to_certificates_map, committee) = {
1787 let private_keys = vec![
1788 PrivateKey::new(rng).unwrap(),
1789 PrivateKey::new(rng).unwrap(),
1790 PrivateKey::new(rng).unwrap(),
1791 PrivateKey::new(rng).unwrap(),
1792 ];
1793 let addresses = vec![
1794 Address::try_from(private_keys[0])?,
1795 Address::try_from(private_keys[1])?,
1796 Address::try_from(private_keys[2])?,
1797 Address::try_from(private_keys[3])?,
1798 ];
1799 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1800 committee_round,
1801 addresses,
1802 rng,
1803 );
1804 let mut round_to_certificates_map: IndexMap<
1806 u64,
1807 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1808 > = IndexMap::new();
1809 let mut previous_certificates = IndexSet::with_capacity(4);
1810 for _ in 0..4 {
1812 previous_certificates.insert(sample_batch_certificate(rng));
1813 }
1814 for round in 0..=commit_round + 2 {
1815 let mut current_certificates = IndexSet::new();
1816 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1817 IndexSet::new()
1818 } else {
1819 previous_certificates.iter().map(|c| c.id()).collect()
1820 };
1821 let transmission_ids =
1822 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1823 .into_iter()
1824 .collect::<IndexSet<_>>();
1825 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1826 let committee_id = committee.id();
1827 for (i, private_key_1) in private_keys.iter().enumerate() {
1828 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1829 private_key_1,
1830 round,
1831 timestamp,
1832 committee_id,
1833 transmission_ids.clone(),
1834 previous_certificate_ids.clone(),
1835 rng,
1836 )
1837 .unwrap();
1838 let mut signatures = IndexSet::with_capacity(4);
1839 for (j, private_key_2) in private_keys.iter().enumerate() {
1840 if i != j {
1841 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1842 }
1843 }
1844 let certificate =
1845 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1846 current_certificates.insert(certificate);
1847 }
1848 round_to_certificates_map.insert(round, current_certificates.clone());
1850 previous_certificates = current_certificates.clone();
1851 }
1852 (round_to_certificates_map, committee)
1853 };
1854
1855 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1857 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1859 let leader = committee.get_leader(commit_round).unwrap();
1861 let next_leader = committee.get_leader(next_round).unwrap();
1862 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1864 for i in 1..=commit_round {
1865 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1866 if i == commit_round {
1867 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1869 if let Some(c) = leader_certificate {
1870 pre_shutdown_certificates.push(c.clone());
1871 }
1872 continue;
1873 }
1874 pre_shutdown_certificates.extend(certificates);
1875 }
1876 for certificate in pre_shutdown_certificates.iter() {
1877 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1878 }
1879 let account = Account::new(rng)?;
1881 let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1882
1883 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1885 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1887
1888 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1890 Vec::new();
1891 for j in commit_round..=commit_round + 2 {
1892 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1893 post_shutdown_certificates.extend(certificate);
1894 }
1895 for certificate in post_shutdown_certificates.iter() {
1896 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1897 }
1898
1899 for certificate in post_shutdown_certificates.clone() {
1901 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1902 }
1903
1904 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1906 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1907 let committed_certificates = commit_subdag.values().flatten();
1908
1909 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1911 for committed_certificate in committed_certificates.clone() {
1912 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1913 }
1914 }
1915 Ok(())
1916 }
1917}