1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 helpers::{ConsensusSender, DAG, PrimaryReceiver, PrimarySender, Storage, fmt_id, now},
19 primary::{Primary, PrimaryCallback},
20 sync::SyncCallback,
21};
22
23use snarkos_account::Account;
24use snarkos_node_bft_ledger_service::LedgerService;
25use snarkos_node_sync::{BlockSync, Ping};
26use snarkos_utilities::NodeDataDir;
27
28use snarkvm::{
29 console::account::Address,
30 ledger::{
31 block::Transaction,
32 committee::Committee,
33 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
34 puzzle::{Solution, SolutionID},
35 },
36 prelude::{Field, Network, Result, bail, ensure},
37 utilities::flatten_error,
38};
39
40use anyhow::Context;
41use colored::Colorize;
42use indexmap::{IndexMap, IndexSet};
43#[cfg(feature = "locktick")]
44use locktick::{parking_lot::RwLock, tokio::Mutex as TMutex};
45#[cfg(not(feature = "locktick"))]
46use parking_lot::RwLock;
47use std::{
48 collections::{BTreeMap, HashSet},
49 net::SocketAddr,
50 sync::{
51 Arc,
52 atomic::{AtomicI64, Ordering},
53 },
54};
55#[cfg(not(feature = "locktick"))]
56use tokio::sync::Mutex as TMutex;
57use tokio::sync::{OnceCell, oneshot};
58
59#[derive(Clone)]
60pub struct BFT<N: Network> {
61 primary: Primary<N>,
63 dag: Arc<RwLock<DAG<N>>>,
65 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
67 leader_certificate_timer: Arc<AtomicI64>,
69 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
71 lock: Arc<TMutex<()>>,
73}
74
75impl<N: Network> BFT<N> {
76 #[allow(clippy::too_many_arguments)]
78 pub fn new(
79 account: Account<N>,
80 storage: Storage<N>,
81 ledger: Arc<dyn LedgerService<N>>,
82 block_sync: Arc<BlockSync<N>>,
83 ip: Option<SocketAddr>,
84 trusted_validators: &[SocketAddr],
85 trusted_peers_only: bool,
86 node_data_dir: NodeDataDir,
87 dev: Option<u16>,
88 ) -> Result<Self> {
89 Ok(Self {
90 primary: Primary::new(
91 account,
92 storage,
93 ledger,
94 block_sync,
95 ip,
96 trusted_validators,
97 trusted_peers_only,
98 node_data_dir,
99 dev,
100 )?,
101 dag: Default::default(),
102 leader_certificate: Default::default(),
103 leader_certificate_timer: Default::default(),
104 consensus_sender: Default::default(),
105 lock: Default::default(),
106 })
107 }
108
109 pub async fn run(
114 &mut self,
115 ping: Option<Arc<Ping<N>>>,
116 consensus_sender: Option<ConsensusSender<N>>,
117 primary_sender: PrimarySender<N>,
118 primary_receiver: PrimaryReceiver<N>,
119 ) -> Result<()> {
120 info!("Starting the BFT instance...");
121 let primary_callback = Some(Arc::new(self.clone()) as Arc<dyn PrimaryCallback<N>>);
123
124 let sync_callback = Some(Arc::new(self.clone()) as Arc<dyn SyncCallback<N>>);
125
126 self.primary.run(ping, primary_callback, sync_callback, primary_sender, primary_receiver).await?;
128
129 if let Some(consensus_sender) = consensus_sender {
132 self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
133 }
134 Ok(())
135 }
136
137 pub fn is_synced(&self) -> bool {
139 self.primary.is_synced()
140 }
141
142 pub const fn primary(&self) -> &Primary<N> {
144 &self.primary
145 }
146
147 pub const fn storage(&self) -> &Storage<N> {
149 self.primary.storage()
150 }
151
152 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
154 self.primary.ledger()
155 }
156
157 pub fn leader(&self) -> Option<Address<N>> {
159 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
160 }
161
162 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
164 &self.leader_certificate
165 }
166}
167
168impl<N: Network> BFT<N> {
169 pub fn num_unconfirmed_transmissions(&self) -> usize {
171 self.primary.num_unconfirmed_transmissions()
172 }
173
174 pub fn num_unconfirmed_ratifications(&self) -> usize {
176 self.primary.num_unconfirmed_ratifications()
177 }
178
179 pub fn num_unconfirmed_solutions(&self) -> usize {
181 self.primary.num_unconfirmed_solutions()
182 }
183
184 pub fn num_unconfirmed_transactions(&self) -> usize {
186 self.primary.num_unconfirmed_transactions()
187 }
188}
189
190impl<N: Network> BFT<N> {
191 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
193 self.primary.worker_transmission_ids()
194 }
195
196 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
198 self.primary.worker_transmissions()
199 }
200
201 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
203 self.primary.worker_solutions()
204 }
205
206 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
208 self.primary.worker_transactions()
209 }
210}
211
212#[async_trait::async_trait]
213impl<N: Network> PrimaryCallback<N> for BFT<N> {
214 fn update_to_next_round(&self, current_round: u64) -> bool {
216 let storage_round = self.storage().current_round();
218 if current_round < storage_round {
219 debug!(
220 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
221 );
222 return false;
223 }
224
225 let is_ready = match current_round.is_multiple_of(2) {
227 true => self.update_leader_certificate_to_even_round(current_round),
228 false => self.is_leader_quorum_or_nonleaders_available(current_round),
229 };
230
231 #[cfg(feature = "metrics")]
232 {
233 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
234 if start > 0 {
236 let end = now();
237 let elapsed = std::time::Duration::from_secs((end - start) as u64);
238 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
239 }
240 }
241
242 if current_round.is_multiple_of(2) {
244 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
246 if !is_ready {
248 trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
249 }
250 let leader_round = leader_certificate.round();
252 match leader_round == current_round {
253 true => {
254 info!("Round {current_round} elected a leader - {}", leader_certificate.author());
255 #[cfg(feature = "metrics")]
256 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
257 }
258 false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
259 }
260 } else {
261 match is_ready {
262 true => info!("Round {current_round} reached quorum without a leader"),
263 false => info!("{}", format!("Round {current_round} did not elect a leader (yet)").dimmed()),
264 }
265 }
266 }
267
268 if is_ready {
270 if let Err(err) = self
272 .storage()
273 .increment_to_next_round(current_round)
274 .with_context(|| format!("BFT failed to increment to the next round from round {current_round}"))
275 {
276 warn!("{}", &flatten_error(err));
277 return false;
278 }
279 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
281 }
282
283 is_ready
284 }
285
286 async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
288 self.update_dag::<true, false>(certificate).await
290 }
291}
292
293#[async_trait::async_trait]
294impl<N: Network> SyncCallback<N> for BFT<N> {
295 async fn sync_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
302 let mut dag = self.dag.write();
304
305 for certificate in certificates {
307 dag.commit(&certificate, self.storage().max_gc_rounds());
308 }
309 }
310
311 async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
313 self.update_dag::<true, true>(certificate).await
315 }
316}
317
318impl<N: Network> BFT<N> {
319 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
325 let current_round = self.storage().current_round();
327 if current_round != even_round {
329 warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
330 return false;
331 }
332
333 if !current_round.is_multiple_of(2) || current_round < 2 {
335 error!("BFT cannot update the leader certificate in an odd round");
336 return false;
337 }
338
339 let current_certificates = self.storage().get_certificates_for_round(current_round);
341 if current_certificates.is_empty() {
343 *self.leader_certificate.write() = None;
345 return false;
346 }
347
348 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
350 Ok(committee) => committee,
351 Err(err) => {
352 let err = err.context(format!(
353 "BFT failed to retrieve the committee lookback for the even round {current_round}"
354 ));
355 warn!("{}", &flatten_error(err));
356 return false;
357 }
358 };
359 let leader = match self.ledger().latest_leader() {
361 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
362 _ => {
363 let computed_leader = match committee_lookback.get_leader(current_round) {
365 Ok(leader) => leader,
366 Err(err) => {
367 let err =
368 err.context(format!("BFT failed to compute the leader for the even round {current_round}"));
369 error!("{}", &flatten_error(err));
370 return false;
371 }
372 };
373
374 self.ledger().update_latest_leader(current_round, computed_leader);
376
377 computed_leader
378 }
379 };
380 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
382 *self.leader_certificate.write() = leader_certificate.cloned();
383
384 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
385 }
386
387 fn is_even_round_ready_for_next_round(
391 &self,
392 certificates: IndexSet<BatchCertificate<N>>,
393 committee: Committee<N>,
394 current_round: u64,
395 ) -> bool {
396 let authors = certificates.into_iter().map(|c| c.author()).collect();
398 if !committee.is_quorum_threshold_reached(&authors) {
400 trace!("BFT failed to reach quorum threshold in even round {current_round}");
401 return false;
402 }
403 if let Some(leader_certificate) = self.leader_certificate.read().as_ref()
405 && leader_certificate.round() == current_round
406 {
407 return true;
408 }
409 if self.is_timer_expired() {
411 debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
412 return true;
413 }
414 false
416 }
417
418 fn is_timer_expired(&self) -> bool {
422 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
423 }
424
425 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
430 let current_round = self.storage().current_round();
432 if current_round != odd_round {
434 warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
435 return false;
436 }
437 if current_round % 2 != 1 {
439 error!("BFT does not compute stakes for the leader certificate in an even round");
440 return false;
441 }
442 let current_certificates = self.storage().get_certificates_for_round(current_round);
444 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
446 Ok(committee) => committee,
447 Err(err) => {
448 let err = err.context(format!(
449 "BFT failed to retrieve the committee lookback for the odd round {current_round}"
450 ));
451 error!("{}", &flatten_error(err));
452 return false;
453 }
454 };
455 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
457 if !committee_lookback.is_quorum_threshold_reached(&authors) {
459 trace!("BFT failed reach quorum threshold in odd round {current_round}.");
460 return false;
461 }
462 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
464 return true;
466 };
467 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
469 leader_certificate.id(),
470 current_certificates,
471 &committee_lookback,
472 );
473 stake_with_leader >= committee_lookback.availability_threshold()
475 || stake_without_leader >= committee_lookback.quorum_threshold()
476 || self.is_timer_expired()
477 }
478
479 fn compute_stake_for_leader_certificate(
481 &self,
482 leader_certificate_id: Field<N>,
483 current_certificates: IndexSet<BatchCertificate<N>>,
484 current_committee: &Committee<N>,
485 ) -> (u64, u64) {
486 if current_certificates.is_empty() {
488 return (0, 0);
489 }
490
491 let mut stake_with_leader = 0u64;
493 let mut stake_without_leader = 0u64;
495 for certificate in current_certificates {
497 let stake = current_committee.get_stake(certificate.author());
499 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
501 true => stake_with_leader = stake_with_leader.saturating_add(stake),
503 false => stake_without_leader = stake_without_leader.saturating_add(stake),
505 }
506 }
507 (stake_with_leader, stake_without_leader)
509 }
510}
511
512impl<N: Network> BFT<N> {
513 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
515 &self,
516 certificate: BatchCertificate<N>,
517 ) -> Result<()> {
518 let _lock = self.lock.lock().await;
520
521 let certificate_round = certificate.round();
524
525 self.dag.write().insert(certificate);
527
528 let commit_round = certificate_round.saturating_sub(1);
530
531 if !commit_round.is_multiple_of(2) || commit_round < 2 {
535 return Ok(());
536 }
537 if commit_round <= self.dag.read().last_committed_round() {
539 return Ok(());
540 }
541
542 trace!("Checking if the leader is ready to be committed for round {commit_round}...");
544
545 let committee_lookback = self.ledger().get_committee_lookback_for_round(commit_round).with_context(|| {
547 format!("BFT failed to retrieve the committee with lag for commit round {commit_round}")
548 })?;
549
550 let leader = match self.ledger().latest_leader() {
552 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
553 _ => {
554 let computed_leader = committee_lookback
556 .get_leader(commit_round)
557 .with_context(|| format!("BFT failed to compute the leader for commit round {commit_round}"))?;
558
559 self.ledger().update_latest_leader(commit_round, computed_leader);
561
562 computed_leader
563 }
564 };
565
566 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
568 else {
569 trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
570 return Ok(());
571 };
572 let certificates = self.dag.read().get_certificates_for_round(certificate_round).with_context(|| {
574 format!("BFT failed to retrieve the certificates for certificate round {certificate_round}")
575 })?;
576
577 let certificate_committee_lookback =
579 self.ledger().get_committee_lookback_for_round(certificate_round).with_context(|| {
580 format!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}")
581 })?;
582
583 let authors = certificates
585 .values()
586 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
587 true => Some(c.author()),
588 false => None,
589 })
590 .collect();
591 if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
593 trace!("BFT is not ready to commit {commit_round}. Availability threshold has not been reached yet.");
595 return Ok(());
596 }
597
598 if IS_SYNCING {
599 info!("Proceeding to commit round {commit_round} with leader '{}' from block sync", fmt_id(leader));
600 } else {
601 info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
602 }
603
604 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
606 }
607
608 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
610 &self,
611 leader_certificate: BatchCertificate<N>,
612 ) -> Result<()> {
613 #[cfg(feature = "metrics")]
614 let start = std::time::Instant::now();
615 #[cfg(debug_assertions)]
616 trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());
617
618 let latest_leader_round = leader_certificate.round();
620 let mut leader_certificates = vec![leader_certificate.clone()];
623 {
624 let leader_round = leader_certificate.round();
626
627 let mut current_certificate = leader_certificate;
628 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
629 {
630 let previous_committee_lookback =
632 self.ledger().get_committee_lookback_for_round(round).with_context(|| {
633 format!("BFT failed to retrieve a previous committee lookback for the even round {round}")
634 })?;
635
636 let leader = match self.ledger().latest_leader() {
638 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
639 _ => {
640 let computed_leader = previous_committee_lookback
642 .get_leader(round)
643 .with_context(|| format!("BFT failed to compute the leader for the even round {round}"))?;
644
645 self.ledger().update_latest_leader(round, computed_leader);
647
648 computed_leader
649 }
650 };
651 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
653 else {
654 continue;
655 };
656 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
658 leader_certificates.push(previous_certificate.clone());
660 current_certificate = previous_certificate;
662 } else {
663 #[cfg(debug_assertions)]
664 trace!(
665 "Skipping anchor for round {round} as it is not linked to the most recent committed leader certificate"
666 );
667 }
668 }
669 }
670
671 for leader_certificate in leader_certificates.into_iter().rev() {
673 let leader_round = leader_certificate.round();
675 let commit_subdag = self
677 .order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate)
678 .with_context(|| "BFT failed to order the DAG with DFS")?;
679 if !IS_SYNCING {
681 let mut transmissions = IndexMap::new();
683 let mut seen_transaction_ids = IndexSet::new();
685 let mut seen_solution_ids = IndexSet::new();
687 for certificate in commit_subdag.values().flatten() {
689 for transmission_id in certificate.transmission_ids() {
691 match transmission_id {
695 TransmissionID::Solution(solution_id, _) => {
696 if seen_solution_ids.contains(&solution_id) {
698 continue;
699 }
700 }
701 TransmissionID::Transaction(transaction_id, _) => {
702 if seen_transaction_ids.contains(transaction_id) {
704 continue;
705 }
706 }
707 TransmissionID::Ratification => {
708 bail!("Ratifications are currently not supported in the BFT.")
709 }
710 }
711 if transmissions.contains_key(transmission_id) {
713 continue;
714 }
715 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
718 continue;
719 }
720 let transmission = self.storage().get_transmission(*transmission_id).with_context(|| {
722 format!(
723 "BFT failed to retrieve transmission '{}.{}' from round {}",
724 fmt_id(transmission_id),
725 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
726 certificate.round()
727 )
728 })?;
729 match transmission_id {
731 TransmissionID::Solution(id, _) => {
732 seen_solution_ids.insert(id);
733 }
734 TransmissionID::Transaction(id, _) => {
735 seen_transaction_ids.insert(id);
736 }
737 TransmissionID::Ratification => {}
738 }
739 transmissions.insert(*transmission_id, transmission);
741 }
742 }
743 let subdag = Subdag::from(commit_subdag.clone())?;
746 let anchor_round = subdag.anchor_round();
748 let num_transmissions = transmissions.len();
750 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
752
753 ensure!(
755 anchor_round == leader_round,
756 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
757 );
758
759 if let Some(consensus_sender) = self.consensus_sender.get() {
761 let (callback_sender, callback_receiver) = oneshot::channel();
763 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
765 match callback_receiver.await {
767 Ok(Ok(())) => (), Ok(Err(err)) => {
769 let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
770 error!("{}", &flatten_error(err));
771 return Ok(());
772 }
773 Err(err) => {
774 let err: anyhow::Error = err.into();
775 let err =
776 err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
777 error!("{}", flatten_error(err));
778 return Ok(());
779 }
780 }
781 }
782
783 info!(
784 "Committing a subDAG with anchor round {anchor_round} and {num_transmissions} transmissions: {subdag_metadata:?} (syncing={IS_SYNCING})"
785 );
786 }
787
788 {
790 let mut dag_write = self.dag.write();
791 let mut count = 0;
792 for certificate in commit_subdag.values().flatten() {
793 dag_write.commit(certificate, self.storage().max_gc_rounds());
794 count += 1;
795 }
796
797 trace!("Committed {count} certificates to the DAG");
798 }
799
800 #[cfg(feature = "telemetry")]
802 self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
803 }
804
805 self.storage().garbage_collect_certificates(latest_leader_round);
821
822 #[cfg(feature = "metrics")]
823 metrics::histogram(metrics::bft::COMMIT_LEADER_CERTIFICATE_LATENCY, start.elapsed().as_secs_f64());
824 Ok(())
825 }
826
827 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
829 &self,
830 leader_certificate: BatchCertificate<N>,
831 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
832 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
834 let mut already_ordered = HashSet::new();
836 let mut buffer = vec![leader_certificate];
838 while let Some(certificate) = buffer.pop() {
840 commit.entry(certificate.round()).or_default().insert(certificate.clone());
842
843 let previous_round = certificate.round().saturating_sub(1);
848 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
849 continue;
850 }
851 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
855 if already_ordered.contains(previous_certificate_id) {
857 continue;
858 }
859 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
861 continue;
862 }
863 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
865 continue;
866 }
867
868 let previous_certificate = {
870 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
872 Some(previous_certificate) => previous_certificate,
874 None => match self.storage().get_certificate(*previous_certificate_id) {
876 Some(previous_certificate) => previous_certificate,
878 None => bail!(
880 "Missing previous certificate {} for round {previous_round}",
881 fmt_id(previous_certificate_id)
882 ),
883 },
884 }
885 };
886 already_ordered.insert(previous_certificate.id());
888 buffer.push(previous_certificate);
890 }
891 }
892 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
894 Ok(commit)
896 }
897
898 fn is_linked(
900 &self,
901 previous_certificate: BatchCertificate<N>,
902 current_certificate: BatchCertificate<N>,
903 ) -> Result<bool> {
904 let mut traversal = vec![current_certificate.clone()];
906 for round in (previous_certificate.round()..current_certificate.round()).rev() {
908 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
910 bail!("BFT failed to retrieve the certificates for past round {round}");
913 };
914 traversal = certificates
916 .into_values()
917 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
918 .collect();
919 }
920 Ok(traversal.contains(&previous_certificate))
921 }
922}
923
924impl<N: Network> BFT<N> {
925 pub async fn shut_down(&self) {
927 info!("Shutting down the BFT...");
928 let _lock = self.lock.lock().await;
930 self.primary.shut_down().await;
932 }
933}
934
935#[cfg(test)]
936mod tests {
937 use crate::{
938 BFT,
939 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
940 helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
941 sync::SyncCallback,
942 };
943
944 use snarkos_account::Account;
945 use snarkos_node_bft_ledger_service::{LedgerService, MockLedgerService};
946 use snarkos_node_bft_storage_service::BFTMemoryService;
947 use snarkos_node_sync::BlockSync;
948 use snarkos_utilities::NodeDataDir;
949
950 use snarkvm::{
951 console::account::{Address, PrivateKey},
952 ledger::{
953 committee::{
954 Committee,
955 test_helpers::{sample_committee, sample_committee_for_round, sample_committee_for_round_and_members},
956 },
957 narwhal::{
958 BatchCertificate,
959 batch_certificate::test_helpers::{
960 sample_batch_certificate,
961 sample_batch_certificate_for_round,
962 sample_batch_certificate_for_round_with_committee,
963 },
964 },
965 },
966 utilities::TestRng,
967 };
968
969 use anyhow::Result;
970 use indexmap::{IndexMap, IndexSet};
971 use std::sync::Arc;
972
973 type CurrentNetwork = snarkvm::console::network::MainnetV0;
974
975 fn sample_test_instance(
977 committee_round: Option<u64>,
978 max_gc_rounds: u64,
979 rng: &mut TestRng,
980 ) -> (
981 Committee<CurrentNetwork>,
982 Account<CurrentNetwork>,
983 Arc<MockLedgerService<CurrentNetwork>>,
984 Storage<CurrentNetwork>,
985 ) {
986 let committee = match committee_round {
987 Some(round) => sample_committee_for_round(round, rng),
988 None => sample_committee(rng),
989 };
990 let account = Account::new(rng).unwrap();
991 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
992 let transmissions = Arc::new(BFTMemoryService::new());
993 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
994
995 (committee, account, ledger, storage)
996 }
997
998 fn initialize_bft(
1000 account: Account<CurrentNetwork>,
1001 storage: Storage<CurrentNetwork>,
1002 ledger: Arc<MockLedgerService<CurrentNetwork>>,
1003 ) -> anyhow::Result<BFT<CurrentNetwork>> {
1004 let block_sync = Arc::new(BlockSync::new(ledger.clone()));
1006 BFT::new(
1008 account.clone(),
1009 storage.clone(),
1010 ledger.clone(),
1011 block_sync,
1012 None,
1013 &[],
1014 false,
1015 NodeDataDir::new_test(None),
1016 None,
1017 )
1018 }
1019
1020 #[test]
1021 #[tracing_test::traced_test]
1022 fn test_is_leader_quorum_odd() -> Result<()> {
1023 let rng = &mut TestRng::default();
1024
1025 let mut certificates = IndexSet::new();
1027 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1028 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1029 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1030 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1031
1032 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1034 1,
1035 vec![
1036 certificates[0].author(),
1037 certificates[1].author(),
1038 certificates[2].author(),
1039 certificates[3].author(),
1040 ],
1041 rng,
1042 );
1043
1044 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1046 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1048 let account = Account::new(rng)?;
1050 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1052 assert!(bft.is_timer_expired());
1053 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1055 assert!(!result);
1057 for certificate in certificates.iter() {
1059 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1060 }
1061 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1063 assert!(result); let leader_certificate = sample_batch_certificate(rng);
1066 *bft.leader_certificate.write() = Some(leader_certificate);
1067 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1069 assert!(result); Ok(())
1072 }
1073
1074 #[test]
1075 #[tracing_test::traced_test]
1076 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1077 let rng = &mut TestRng::default();
1078
1079 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1081 assert_eq!(committee.starting_round(), 1);
1082 assert_eq!(storage.current_round(), 1);
1083 assert_eq!(storage.max_gc_rounds(), 10);
1084
1085 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1087 assert!(bft.is_timer_expired());
1088
1089 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1092 assert!(!result);
1093 Ok(())
1094 }
1095
1096 #[test]
1097 #[tracing_test::traced_test]
1098 fn test_is_leader_quorum_even() -> Result<()> {
1099 let rng = &mut TestRng::default();
1100
1101 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1103 assert_eq!(committee.starting_round(), 2);
1104 assert_eq!(storage.current_round(), 2);
1105 assert_eq!(storage.max_gc_rounds(), 10);
1106
1107 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1109 assert!(bft.is_timer_expired());
1110
1111 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1113 assert!(!result);
1114 Ok(())
1115 }
1116
1117 #[test]
1118 #[tracing_test::traced_test]
1119 fn test_is_even_round_ready() -> Result<()> {
1120 let rng = &mut TestRng::default();
1121
1122 let mut certificates = IndexSet::new();
1124 certificates.insert(sample_batch_certificate_for_round(2, rng));
1125 certificates.insert(sample_batch_certificate_for_round(2, rng));
1126 certificates.insert(sample_batch_certificate_for_round(2, rng));
1127 certificates.insert(sample_batch_certificate_for_round(2, rng));
1128
1129 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1131 2,
1132 vec![
1133 certificates[0].author(),
1134 certificates[1].author(),
1135 certificates[2].author(),
1136 certificates[3].author(),
1137 ],
1138 rng,
1139 );
1140
1141 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1143 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1145 let account = Account::new(rng)?;
1147
1148 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1150 assert!(bft.is_timer_expired());
1151
1152 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1154 *bft.leader_certificate.write() = Some(leader_certificate);
1155 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1156 assert!(!result);
1158 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1160 assert!(result);
1161
1162 let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1164 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1166 if !bft_timer.is_timer_expired() {
1167 assert!(!result);
1168 }
1169 let leader_certificate_timeout =
1171 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1172 std::thread::sleep(leader_certificate_timeout);
1173 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1175 if bft_timer.is_timer_expired() {
1176 assert!(result);
1177 } else {
1178 assert!(!result);
1179 }
1180
1181 Ok(())
1182 }
1183
1184 #[test]
1185 #[tracing_test::traced_test]
1186 fn test_update_leader_certificate_odd() -> Result<()> {
1187 let rng = &mut TestRng::default();
1188
1189 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1191 assert_eq!(storage.max_gc_rounds(), 10);
1192
1193 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1195 assert!(bft.is_timer_expired());
1196
1197 let result = bft.update_leader_certificate_to_even_round(1);
1199 assert!(!result);
1200 Ok(())
1201 }
1202
1203 #[test]
1204 #[tracing_test::traced_test]
1205 fn test_update_leader_certificate_bad_round() -> Result<()> {
1206 let rng = &mut TestRng::default();
1207
1208 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1210 assert_eq!(storage.max_gc_rounds(), 10);
1211
1212 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1214
1215 let result = bft.update_leader_certificate_to_even_round(6);
1217 assert!(!result);
1218 Ok(())
1219 }
1220
1221 #[test]
1222 #[tracing_test::traced_test]
1223 fn test_update_leader_certificate_even() -> Result<()> {
1224 let rng = &mut TestRng::default();
1225
1226 let current_round = 3;
1228
1229 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1231 current_round,
1232 rng,
1233 );
1234
1235 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1237 2,
1238 vec![
1239 certificates[0].author(),
1240 certificates[1].author(),
1241 certificates[2].author(),
1242 certificates[3].author(),
1243 ],
1244 rng,
1245 );
1246
1247 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1249
1250 let transmissions = Arc::new(BFTMemoryService::new());
1252 let storage = Storage::new(ledger.clone(), transmissions, 10);
1253 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1254 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1255 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1256 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1257 assert_eq!(storage.current_round(), 2);
1258
1259 let leader = committee.get_leader(2).unwrap();
1261 let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1262
1263 let account = Account::new(rng)?;
1265 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1266
1267 *bft.leader_certificate.write() = Some(leader_certificate);
1269
1270 let result = bft.update_leader_certificate_to_even_round(2);
1273 assert!(result);
1274
1275 Ok(())
1276 }
1277
1278 #[tokio::test]
1279 #[tracing_test::traced_test]
1280 async fn test_order_dag_with_dfs() -> Result<()> {
1281 let rng = &mut TestRng::default();
1282
1283 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1285
1286 let previous_round = 2; let current_round = previous_round + 1;
1289
1290 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1292 current_round,
1293 rng,
1294 );
1295
1296 {
1300 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1302 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1304
1305 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1307
1308 for certificate in previous_certificates.clone() {
1310 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1311 }
1312
1313 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1315 assert!(result.is_ok());
1316 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1317 assert_eq!(candidate_certificates.len(), 1);
1318 let expected_certificates = vec![certificate.clone()];
1319 assert_eq!(
1320 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1321 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1322 );
1323 assert_eq!(candidate_certificates, expected_certificates);
1324 }
1325
1326 {
1330 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1332 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1334
1335 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1337
1338 for certificate in previous_certificates.clone() {
1340 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1341 }
1342
1343 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1345 assert!(result.is_ok());
1346 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1347 assert_eq!(candidate_certificates.len(), 5);
1348 let expected_certificates = vec![
1349 previous_certificates[0].clone(),
1350 previous_certificates[1].clone(),
1351 previous_certificates[2].clone(),
1352 previous_certificates[3].clone(),
1353 certificate,
1354 ];
1355 assert_eq!(
1356 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1357 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1358 );
1359 assert_eq!(candidate_certificates, expected_certificates);
1360 }
1361
1362 Ok(())
1363 }
1364
1365 #[test]
1366 #[tracing_test::traced_test]
1367 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1368 let rng = &mut TestRng::default();
1369
1370 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1372 assert_eq!(committee.starting_round(), 1);
1373 assert_eq!(storage.current_round(), 1);
1374 assert_eq!(storage.max_gc_rounds(), 1);
1375
1376 let previous_round = 2; let current_round = previous_round + 1;
1379
1380 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1382 current_round,
1383 rng,
1384 );
1385 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1387
1388 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1392
1393 let error_msg = format!(
1395 "Missing previous certificate {} for round {previous_round}",
1396 crate::helpers::fmt_id(previous_certificate_ids[3]),
1397 );
1398
1399 let result = bft.order_dag_with_dfs::<false>(certificate);
1401 assert!(result.is_err());
1402 assert_eq!(result.unwrap_err().to_string(), error_msg);
1403 Ok(())
1404 }
1405
1406 #[tokio::test]
1407 async fn test_bft_gc_on_commit() -> Result<()> {
1408 let rng = &mut TestRng::default();
1409
1410 let max_gc_rounds = 1;
1412 let committee_round = 0;
1413 let commit_round = 2;
1414 let current_round = commit_round + 1;
1415
1416 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1418 current_round,
1419 rng,
1420 );
1421
1422 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1424 committee_round,
1425 vec![
1426 certificates[0].author(),
1427 certificates[1].author(),
1428 certificates[2].author(),
1429 certificates[3].author(),
1430 ],
1431 rng,
1432 );
1433
1434 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1436
1437 let transmissions = Arc::new(BFTMemoryService::new());
1439 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1440 for certificate in certificates.iter() {
1442 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1443 }
1444
1445 let leader = committee.get_leader(commit_round).unwrap();
1447 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1448
1449 let account = Account::new(rng)?;
1451 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1452
1453 *bft.dag.write() = mock_dag_with_modified_last_committed_round(commit_round);
1455
1456 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1458
1459 for certificate in certificates {
1461 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1462 }
1463
1464 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1466
1467 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1469
1470 Ok(())
1471 }
1472
1473 #[tokio::test]
1474 #[tracing_test::traced_test]
1475 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1476 let rng = &mut TestRng::default();
1477
1478 let max_gc_rounds = 1;
1480 let committee_round = 0;
1481 let commit_round = 2;
1482 let current_round = commit_round + 1;
1483
1484 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1486 current_round,
1487 rng,
1488 );
1489
1490 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1492 committee_round,
1493 vec![
1494 certificates[0].author(),
1495 certificates[1].author(),
1496 certificates[2].author(),
1497 certificates[3].author(),
1498 ],
1499 rng,
1500 );
1501
1502 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1504
1505 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1507 for certificate in certificates.iter() {
1509 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1510 }
1511
1512 let leader = committee.get_leader(commit_round).unwrap();
1514 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1515
1516 let account = Account::new(rng)?;
1518 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1519
1520 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1522
1523 for certificate in certificates.clone() {
1525 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1526 }
1527
1528 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1530
1531 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1535 let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1537
1538 bootup_bft.sync_dag_at_bootup(certificates.clone()).await;
1540
1541 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1543
1544 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1546 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1547
1548 for certificate in certificates {
1550 let certificate_round = certificate.round();
1551 let certificate_id = certificate.id();
1552 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1554 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1557 }
1558
1559 Ok(())
1560 }
1561
1562 #[tokio::test]
1563 #[tracing_test::traced_test]
1564 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1565 let rng = &mut TestRng::default();
1572
1573 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1575 let committee_round = 0;
1576 let commit_round = 2;
1577 let current_round = commit_round + 1;
1578 let next_round = current_round + 1;
1579
1580 let (round_to_certificates_map, committee) = {
1582 let private_keys = [
1583 PrivateKey::new(rng).unwrap(),
1584 PrivateKey::new(rng).unwrap(),
1585 PrivateKey::new(rng).unwrap(),
1586 PrivateKey::new(rng).unwrap(),
1587 ];
1588 let addresses = vec![
1589 Address::try_from(private_keys[0])?,
1590 Address::try_from(private_keys[1])?,
1591 Address::try_from(private_keys[2])?,
1592 Address::try_from(private_keys[3])?,
1593 ];
1594 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1595 committee_round,
1596 addresses,
1597 rng,
1598 );
1599 let mut round_to_certificates_map: IndexMap<
1601 u64,
1602 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1603 > = IndexMap::new();
1604 let mut previous_certificates = IndexSet::with_capacity(4);
1605 for _ in 0..4 {
1607 previous_certificates.insert(sample_batch_certificate(rng));
1608 }
1609 for round in 0..commit_round + 3 {
1610 let mut current_certificates = IndexSet::new();
1611 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1612 IndexSet::new()
1613 } else {
1614 previous_certificates.iter().map(|c| c.id()).collect()
1615 };
1616 let transmission_ids =
1617 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1618 .into_iter()
1619 .collect::<IndexSet<_>>();
1620 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1621 let committee_id = committee.id();
1622 for (i, private_key_1) in private_keys.iter().enumerate() {
1623 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1624 private_key_1,
1625 round,
1626 timestamp,
1627 committee_id,
1628 transmission_ids.clone(),
1629 previous_certificate_ids.clone(),
1630 rng,
1631 )
1632 .unwrap();
1633 let mut signatures = IndexSet::with_capacity(4);
1634 for (j, private_key_2) in private_keys.iter().enumerate() {
1635 if i != j {
1636 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1637 }
1638 }
1639 let certificate =
1640 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1641 current_certificates.insert(certificate);
1642 }
1643 round_to_certificates_map.insert(round, current_certificates.clone());
1645 previous_certificates = current_certificates.clone();
1646 }
1647 (round_to_certificates_map, committee)
1648 };
1649
1650 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1652 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1654 let leader = committee.get_leader(commit_round).unwrap();
1656 let next_leader = committee.get_leader(next_round).unwrap();
1657 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1659 for i in 1..=commit_round {
1660 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1661 if i == commit_round {
1662 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1664 if let Some(c) = leader_certificate {
1665 pre_shutdown_certificates.push(c.clone());
1666 }
1667 continue;
1668 }
1669 pre_shutdown_certificates.extend(certificates);
1670 }
1671 for certificate in pre_shutdown_certificates.iter() {
1672 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1673 }
1674 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1676 Vec::new();
1677 for j in commit_round..=commit_round + 2 {
1678 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1679 post_shutdown_certificates.extend(certificate);
1680 }
1681 for certificate in post_shutdown_certificates.iter() {
1682 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1683 }
1684 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1686 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1687
1688 let account = Account::new(rng)?;
1690 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1691
1692 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1694
1695 for certificate in pre_shutdown_certificates.clone() {
1697 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1698 }
1699
1700 for certificate in post_shutdown_certificates.clone() {
1702 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1703 }
1704 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1706 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1707 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1708
1709 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1713
1714 let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1716
1717 bootup_bft.sync_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1719
1720 for certificate in post_shutdown_certificates.iter() {
1722 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1723 }
1724 for certificate in post_shutdown_certificates.clone() {
1725 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1726 }
1727 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1729 let commit_subdag_metadata_bootup =
1730 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1731 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1732 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1733
1734 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1738
1739 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1741 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1742 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1743 assert!(
1744 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1745 );
1746
1747 for certificate in pre_shutdown_certificates.clone() {
1749 let certificate_round = certificate.round();
1750 let certificate_id = certificate.id();
1751 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1753 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1754 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1757 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1758 }
1759
1760 for certificate in committed_certificates_bootup.clone() {
1762 let certificate_round = certificate.round();
1763 let certificate_id = certificate.id();
1764 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1766 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1767 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1770 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1771 }
1772
1773 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1775
1776 Ok(())
1777 }
1778
1779 #[tokio::test]
1780 #[tracing_test::traced_test]
1781 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1782 let rng = &mut TestRng::default();
1789
1790 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1792 let committee_round = 0;
1793 let commit_round = 2;
1794 let current_round = commit_round + 1;
1795 let next_round = current_round + 1;
1796
1797 let (round_to_certificates_map, committee) = {
1799 let private_keys = [
1800 PrivateKey::new(rng).unwrap(),
1801 PrivateKey::new(rng).unwrap(),
1802 PrivateKey::new(rng).unwrap(),
1803 PrivateKey::new(rng).unwrap(),
1804 ];
1805 let addresses = vec![
1806 Address::try_from(private_keys[0])?,
1807 Address::try_from(private_keys[1])?,
1808 Address::try_from(private_keys[2])?,
1809 Address::try_from(private_keys[3])?,
1810 ];
1811 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1812 committee_round,
1813 addresses,
1814 rng,
1815 );
1816 let mut round_to_certificates_map: IndexMap<
1818 u64,
1819 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1820 > = IndexMap::new();
1821 let mut previous_certificates = IndexSet::with_capacity(4);
1822 for _ in 0..4 {
1824 previous_certificates.insert(sample_batch_certificate(rng));
1825 }
1826 for round in 0..=commit_round + 2 {
1827 let mut current_certificates = IndexSet::new();
1828 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1829 IndexSet::new()
1830 } else {
1831 previous_certificates.iter().map(|c| c.id()).collect()
1832 };
1833 let transmission_ids =
1834 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1835 .into_iter()
1836 .collect::<IndexSet<_>>();
1837 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1838 let committee_id = committee.id();
1839 for (i, private_key_1) in private_keys.iter().enumerate() {
1840 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1841 private_key_1,
1842 round,
1843 timestamp,
1844 committee_id,
1845 transmission_ids.clone(),
1846 previous_certificate_ids.clone(),
1847 rng,
1848 )
1849 .unwrap();
1850 let mut signatures = IndexSet::with_capacity(4);
1851 for (j, private_key_2) in private_keys.iter().enumerate() {
1852 if i != j {
1853 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1854 }
1855 }
1856 let certificate =
1857 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1858 current_certificates.insert(certificate);
1859 }
1860 round_to_certificates_map.insert(round, current_certificates.clone());
1862 previous_certificates = current_certificates.clone();
1863 }
1864 (round_to_certificates_map, committee)
1865 };
1866
1867 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1869 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1871 let leader = committee.get_leader(commit_round).unwrap();
1873 let next_leader = committee.get_leader(next_round).unwrap();
1874 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1876 for i in 1..=commit_round {
1877 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1878 if i == commit_round {
1879 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1881 if let Some(c) = leader_certificate {
1882 pre_shutdown_certificates.push(c.clone());
1883 }
1884 continue;
1885 }
1886 pre_shutdown_certificates.extend(certificates);
1887 }
1888 for certificate in pre_shutdown_certificates.iter() {
1889 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1890 }
1891 let account = Account::new(rng)?;
1893 let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1894
1895 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1897 bootup_bft.sync_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1899
1900 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1902 Vec::new();
1903 for j in commit_round..=commit_round + 2 {
1904 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1905 post_shutdown_certificates.extend(certificate);
1906 }
1907 for certificate in post_shutdown_certificates.iter() {
1908 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1909 }
1910
1911 for certificate in post_shutdown_certificates.clone() {
1913 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1914 }
1915
1916 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1918 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1919 let committed_certificates = commit_subdag.values().flatten();
1920
1921 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1923 for committed_certificate in committed_certificates.clone() {
1924 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1925 }
1926 }
1927 Ok(())
1928 }
1929
1930 #[test_log::test(tokio::test)]
1932 async fn test_commit_via_is_linked() {
1933 let rng = &mut TestRng::default();
1934
1935 let committee_round = 0;
1936 let leader_round_1 = 2;
1937 let leader_round_2 = 4; let max_gc_rounds = 50;
1939
1940 let num_authors = 4;
1942 let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
1943 let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
1944
1945 let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
1946 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1947 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1948 let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
1949
1950 let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
1951
1952 let round1_certs: IndexSet<_> = (0..num_authors)
1954 .map(|idx| {
1955 let author = &private_keys[idx];
1956 let endorsements: Vec<_> = private_keys
1957 .iter()
1958 .enumerate()
1959 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
1960 .collect();
1961
1962 sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
1963 })
1964 .collect();
1965 certificates_by_round.insert(1, round1_certs.clone());
1966
1967 let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
1968 let mut leader1_certificate = None;
1969
1970 let round2_certs: IndexSet<_> = (0..num_authors)
1971 .map(|idx| {
1972 let author = &private_keys[idx];
1973 let endorsements: Vec<_> = private_keys
1974 .iter()
1975 .enumerate()
1976 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
1977 .collect();
1978 let cert = sample_batch_certificate_for_round_with_committee(
1979 leader_round_1,
1980 round1_certs.iter().map(|c| c.id()).collect(),
1981 author,
1982 &endorsements[..],
1983 rng,
1984 );
1985
1986 if cert.author() == leader1 {
1987 leader1_certificate = Some(cert.clone());
1988 }
1989 cert
1990 })
1991 .collect();
1992 certificates_by_round.insert(leader_round_1, round2_certs.clone());
1993
1994 let round3_certs: IndexSet<_> = (0..num_authors)
1995 .map(|idx| {
1996 let author = &private_keys[idx];
1997 let endorsements: Vec<_> = private_keys
1998 .iter()
1999 .enumerate()
2000 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2001 .collect();
2002
2003 let previous_certificate_ids: IndexSet<_> = round2_certs
2004 .iter()
2005 .filter_map(|cert| {
2006 if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2008 })
2009 .collect();
2010
2011 sample_batch_certificate_for_round_with_committee(
2012 leader_round_1 + 1,
2013 previous_certificate_ids,
2014 author,
2015 &endorsements[..],
2016 rng,
2017 )
2018 })
2019 .collect();
2020 certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2021
2022 let leader_certificate_1 = leader1_certificate.unwrap();
2024 assert!(
2025 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2026 "Leader certificate 1 should not be committed yet"
2027 );
2028 assert_eq!(bft.dag.read().last_committed_round(), 0);
2029
2030 let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2031 let round4_certs: IndexSet<_> = (0..num_authors)
2032 .map(|idx| {
2033 let endorsements: Vec<_> = private_keys
2034 .iter()
2035 .enumerate()
2036 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2037 .collect();
2038
2039 sample_batch_certificate_for_round_with_committee(
2040 leader_round_2,
2041 round3_certs.iter().map(|c| c.id()).collect(),
2042 &private_keys[idx],
2043 &endorsements[..],
2044 rng,
2045 )
2046 })
2047 .collect();
2048 certificates_by_round.insert(leader_round_2, round4_certs.clone());
2049
2050 for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2052 storage.testing_only_insert_certificate_testing_only(certificate.clone());
2053 bft.update_dag::<false, false>(certificate).await.unwrap();
2054 }
2055
2056 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2057
2058 assert!(
2059 bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2060 "Leader certificate 1 should be linked to leader certificate 2"
2061 );
2062
2063 bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2065
2066 assert!(
2068 bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2069 "Leader certificate for round 2 should be committed when committing at round 4"
2070 );
2071
2072 assert!(
2074 bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2075 "Leader certificate for round 4 should be committed"
2076 );
2077
2078 assert_eq!(bft.dag.read().last_committed_round(), 4);
2079 }
2080
2081 #[test_log::test(tokio::test)]
2082 async fn test_commit_via_is_linked_with_skipped_anchor() {
2083 let rng = &mut TestRng::default();
2084
2085 let committee_round = 0;
2086 let leader_round_1 = 2;
2087 let leader_round_2 = 4;
2088 let max_gc_rounds = 50;
2089
2090 let num_authors = 4;
2091 let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2092 let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2093
2094 let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2095 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2096 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2097 let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2098
2099 let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2100
2101 let round1_certs: IndexSet<_> = (0..num_authors)
2103 .map(|idx| {
2104 let author = &private_keys[idx];
2105 let endorsements: Vec<_> = private_keys
2106 .iter()
2107 .enumerate()
2108 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2109 .collect();
2110
2111 sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2112 })
2113 .collect();
2114 certificates_by_round.insert(1, round1_certs.clone());
2115
2116 let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2117 let mut leader1_certificate = None;
2118
2119 let round2_certs: IndexSet<_> = (0..num_authors)
2120 .map(|idx| {
2121 let author = &private_keys[idx];
2122 let endorsements: Vec<_> = private_keys
2123 .iter()
2124 .enumerate()
2125 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2126 .collect();
2127 let cert = sample_batch_certificate_for_round_with_committee(
2128 leader_round_1,
2129 round1_certs.iter().map(|c| c.id()).collect(),
2130 author,
2131 &endorsements[..],
2132 rng,
2133 );
2134
2135 if cert.author() == leader1 {
2136 leader1_certificate = Some(cert.clone());
2137 }
2138 cert
2139 })
2140 .collect();
2141 certificates_by_round.insert(leader_round_1, round2_certs.clone());
2142
2143 let round3_certs: IndexSet<_> = (0..num_authors)
2144 .map(|idx| {
2145 let author = &private_keys[idx];
2146 let endorsements: Vec<_> = private_keys
2147 .iter()
2148 .enumerate()
2149 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2150 .collect();
2151
2152 let previous_certificate_ids: IndexSet<_> = round2_certs
2153 .iter()
2154 .filter_map(|cert| {
2155 if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2157 })
2158 .collect();
2159
2160 sample_batch_certificate_for_round_with_committee(
2161 leader_round_1 + 1,
2162 previous_certificate_ids,
2163 author,
2164 &endorsements[..],
2165 rng,
2166 )
2167 })
2168 .collect();
2169 certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2170
2171 let leader_certificate_1 = leader1_certificate.unwrap();
2173 assert!(
2174 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2175 "Leader certificate 1 should not be committed yet"
2176 );
2177
2178 let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2179 let round4_certs: IndexSet<_> = (0..num_authors)
2180 .map(|idx| {
2181 let endorsements: Vec<_> = private_keys
2182 .iter()
2183 .enumerate()
2184 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2185 .collect();
2186
2187 let previous_certificate_ids: IndexSet<_> = round3_certs
2189 .iter()
2190 .filter_map(|cert| if cert.author() == leader1 { None } else { Some(cert.id()) })
2191 .collect();
2192
2193 sample_batch_certificate_for_round_with_committee(
2194 leader_round_2,
2195 previous_certificate_ids,
2196 &private_keys[idx],
2197 &endorsements[..],
2198 rng,
2199 )
2200 })
2201 .collect();
2202 certificates_by_round.insert(leader_round_2, round4_certs.clone());
2203
2204 for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2206 storage.testing_only_insert_certificate_testing_only(certificate.clone());
2207 bft.update_dag::<false, false>(certificate).await.unwrap();
2208 }
2209
2210 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2211
2212 assert!(
2213 !bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2214 "Leader certificate 1 should not be linked to leader certificate 2"
2215 );
2216 assert_eq!(bft.dag.read().last_committed_round(), 0);
2217
2218 bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2220
2221 assert!(
2223 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2224 "Leader certificate for round 2 should not be committed when committing at round 4"
2225 );
2226
2227 assert!(
2229 bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2230 "Leader certificate for round 4 should be committed"
2231 );
2232 assert_eq!(bft.dag.read().last_committed_round(), 4);
2233 }
2234}