1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 Primary,
19 helpers::{
20 BFTReceiver,
21 ConsensusSender,
22 DAG,
23 PrimaryReceiver,
24 PrimarySender,
25 Storage,
26 fmt_id,
27 init_bft_channels,
28 now,
29 },
30};
31
32use snarkos_account::Account;
33use snarkos_node_bft_ledger_service::LedgerService;
34use snarkos_node_sync::{BlockSync, Ping};
35use snarkos_utilities::NodeDataDir;
36
37use snarkvm::{
38 console::account::Address,
39 ledger::{
40 block::Transaction,
41 committee::Committee,
42 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
43 puzzle::{Solution, SolutionID},
44 },
45 prelude::{Field, Network, Result, bail, ensure},
46 utilities::flatten_error,
47};
48
49use anyhow::Context;
50use colored::Colorize;
51use indexmap::{IndexMap, IndexSet};
52#[cfg(feature = "locktick")]
53use locktick::{
54 parking_lot::{Mutex, RwLock},
55 tokio::Mutex as TMutex,
56};
57#[cfg(not(feature = "locktick"))]
58use parking_lot::{Mutex, RwLock};
59use std::{
60 collections::{BTreeMap, HashSet},
61 future::Future,
62 net::SocketAddr,
63 sync::{
64 Arc,
65 atomic::{AtomicI64, Ordering},
66 },
67};
68#[cfg(not(feature = "locktick"))]
69use tokio::sync::Mutex as TMutex;
70use tokio::{
71 sync::{OnceCell, oneshot},
72 task::JoinHandle,
73};
74
75#[derive(Clone)]
76pub struct BFT<N: Network> {
77 primary: Primary<N>,
79 dag: Arc<RwLock<DAG<N>>>,
81 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
83 leader_certificate_timer: Arc<AtomicI64>,
85 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
87 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
89 lock: Arc<TMutex<()>>,
91}
92
93impl<N: Network> BFT<N> {
94 #[allow(clippy::too_many_arguments)]
96 pub fn new(
97 account: Account<N>,
98 storage: Storage<N>,
99 ledger: Arc<dyn LedgerService<N>>,
100 block_sync: Arc<BlockSync<N>>,
101 ip: Option<SocketAddr>,
102 trusted_validators: &[SocketAddr],
103 trusted_peers_only: bool,
104 node_data_dir: NodeDataDir,
105 dev: Option<u16>,
106 ) -> Result<Self> {
107 Ok(Self {
108 primary: Primary::new(
109 account,
110 storage,
111 ledger,
112 block_sync,
113 ip,
114 trusted_validators,
115 trusted_peers_only,
116 node_data_dir,
117 dev,
118 )?,
119 dag: Default::default(),
120 leader_certificate: Default::default(),
121 leader_certificate_timer: Default::default(),
122 consensus_sender: Default::default(),
123 handles: Default::default(),
124 lock: Default::default(),
125 })
126 }
127
128 pub async fn run(
133 &mut self,
134 ping: Option<Arc<Ping<N>>>,
135 consensus_sender: Option<ConsensusSender<N>>,
136 primary_sender: PrimarySender<N>,
137 primary_receiver: PrimaryReceiver<N>,
138 ) -> Result<()> {
139 info!("Starting the BFT instance...");
140 let (bft_sender, bft_receiver) = init_bft_channels::<N>();
142 self.start_handlers(bft_receiver);
144 self.primary.run(ping, Some(bft_sender), primary_sender, primary_receiver).await?;
146 if let Some(consensus_sender) = consensus_sender {
149 self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
150 }
151 Ok(())
152 }
153
154 pub fn is_synced(&self) -> bool {
156 self.primary.is_synced()
157 }
158
159 pub const fn primary(&self) -> &Primary<N> {
161 &self.primary
162 }
163
164 pub const fn storage(&self) -> &Storage<N> {
166 self.primary.storage()
167 }
168
169 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
171 self.primary.ledger()
172 }
173
174 pub fn leader(&self) -> Option<Address<N>> {
176 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
177 }
178
179 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
181 &self.leader_certificate
182 }
183}
184
185impl<N: Network> BFT<N> {
186 pub fn num_unconfirmed_transmissions(&self) -> usize {
188 self.primary.num_unconfirmed_transmissions()
189 }
190
191 pub fn num_unconfirmed_ratifications(&self) -> usize {
193 self.primary.num_unconfirmed_ratifications()
194 }
195
196 pub fn num_unconfirmed_solutions(&self) -> usize {
198 self.primary.num_unconfirmed_solutions()
199 }
200
201 pub fn num_unconfirmed_transactions(&self) -> usize {
203 self.primary.num_unconfirmed_transactions()
204 }
205}
206
207impl<N: Network> BFT<N> {
208 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
210 self.primary.worker_transmission_ids()
211 }
212
213 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
215 self.primary.worker_transmissions()
216 }
217
218 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
220 self.primary.worker_solutions()
221 }
222
223 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
225 self.primary.worker_transactions()
226 }
227}
228
229impl<N: Network> BFT<N> {
230 fn update_to_next_round(&self, current_round: u64) -> bool {
232 let storage_round = self.storage().current_round();
234 if current_round < storage_round {
235 debug!(
236 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
237 );
238 return false;
239 }
240
241 let is_ready = match current_round % 2 == 0 {
243 true => self.update_leader_certificate_to_even_round(current_round),
244 false => self.is_leader_quorum_or_nonleaders_available(current_round),
245 };
246
247 #[cfg(feature = "metrics")]
248 {
249 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
250 if start > 0 {
252 let end = now();
253 let elapsed = std::time::Duration::from_secs((end - start) as u64);
254 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
255 }
256 }
257
258 if current_round % 2 == 0 {
260 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
262 if !is_ready {
264 trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
265 }
266 let leader_round = leader_certificate.round();
268 match leader_round == current_round {
269 true => {
270 info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
271 #[cfg(feature = "metrics")]
272 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
273 }
274 false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
275 }
276 } else {
277 match is_ready {
278 true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
279 false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()),
280 }
281 }
282 }
283
284 if is_ready {
286 if let Err(err) = self
288 .storage()
289 .increment_to_next_round(current_round)
290 .with_context(|| format!("BFT failed to increment to the next round from round {current_round}"))
291 {
292 warn!("{}", &flatten_error(err));
293 return false;
294 }
295 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
297 }
298
299 is_ready
300 }
301
302 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
308 let current_round = self.storage().current_round();
310 if current_round != even_round {
312 warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
313 return false;
314 }
315
316 if current_round % 2 != 0 || current_round < 2 {
318 error!("BFT cannot update the leader certificate in an odd round");
319 return false;
320 }
321
322 let current_certificates = self.storage().get_certificates_for_round(current_round);
324 if current_certificates.is_empty() {
326 *self.leader_certificate.write() = None;
328 return false;
329 }
330
331 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
333 Ok(committee) => committee,
334 Err(err) => {
335 let err = err.context(format!(
336 "BFT failed to retrieve the committee lookback for the even round {current_round}"
337 ));
338 warn!("{}", &flatten_error(err));
339 return false;
340 }
341 };
342 let leader = match self.ledger().latest_leader() {
344 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
345 _ => {
346 let computed_leader = match committee_lookback.get_leader(current_round) {
348 Ok(leader) => leader,
349 Err(err) => {
350 let err =
351 err.context(format!("BFT failed to compute the leader for the even round {current_round}"));
352 error!("{}", &flatten_error(err));
353 return false;
354 }
355 };
356
357 self.ledger().update_latest_leader(current_round, computed_leader);
359
360 computed_leader
361 }
362 };
363 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
365 *self.leader_certificate.write() = leader_certificate.cloned();
366
367 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
368 }
369
370 fn is_even_round_ready_for_next_round(
374 &self,
375 certificates: IndexSet<BatchCertificate<N>>,
376 committee: Committee<N>,
377 current_round: u64,
378 ) -> bool {
379 let authors = certificates.into_iter().map(|c| c.author()).collect();
381 if !committee.is_quorum_threshold_reached(&authors) {
383 trace!("BFT failed to reach quorum threshold in even round {current_round}");
384 return false;
385 }
386 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
388 if leader_certificate.round() == current_round {
389 return true;
390 }
391 }
392 if self.is_timer_expired() {
394 debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
395 return true;
396 }
397 false
399 }
400
401 fn is_timer_expired(&self) -> bool {
405 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
406 }
407
408 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
413 let current_round = self.storage().current_round();
415 if current_round != odd_round {
417 warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
418 return false;
419 }
420 if current_round % 2 != 1 {
422 error!("BFT does not compute stakes for the leader certificate in an even round");
423 return false;
424 }
425 let current_certificates = self.storage().get_certificates_for_round(current_round);
427 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
429 Ok(committee) => committee,
430 Err(err) => {
431 let err = err.context(format!(
432 "BFT failed to retrieve the committee lookback for the odd round {current_round}"
433 ));
434 error!("{}", &flatten_error(err));
435 return false;
436 }
437 };
438 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
440 if !committee_lookback.is_quorum_threshold_reached(&authors) {
442 trace!("BFT failed reach quorum threshold in odd round {current_round}.");
443 return false;
444 }
445 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
447 return true;
449 };
450 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
452 leader_certificate.id(),
453 current_certificates,
454 &committee_lookback,
455 );
456 stake_with_leader >= committee_lookback.availability_threshold()
458 || stake_without_leader >= committee_lookback.quorum_threshold()
459 || self.is_timer_expired()
460 }
461
462 fn compute_stake_for_leader_certificate(
464 &self,
465 leader_certificate_id: Field<N>,
466 current_certificates: IndexSet<BatchCertificate<N>>,
467 current_committee: &Committee<N>,
468 ) -> (u64, u64) {
469 if current_certificates.is_empty() {
471 return (0, 0);
472 }
473
474 let mut stake_with_leader = 0u64;
476 let mut stake_without_leader = 0u64;
478 for certificate in current_certificates {
480 let stake = current_committee.get_stake(certificate.author());
482 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
484 true => stake_with_leader = stake_with_leader.saturating_add(stake),
486 false => stake_without_leader = stake_without_leader.saturating_add(stake),
488 }
489 }
490 (stake_with_leader, stake_without_leader)
492 }
493}
494
495impl<N: Network> BFT<N> {
496 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
498 &self,
499 certificate: BatchCertificate<N>,
500 ) -> Result<()> {
501 let _lock = self.lock.lock().await;
503
504 let certificate_round = certificate.round();
507
508 self.dag.write().insert(certificate);
510
511 let commit_round = certificate_round.saturating_sub(1);
513
514 if !commit_round.is_multiple_of(2) || commit_round < 2 {
518 return Ok(());
519 }
520 if commit_round <= self.dag.read().last_committed_round() {
522 return Ok(());
523 }
524
525 trace!("Checking if the leader is ready to be committed for round {commit_round}...");
527
528 let committee_lookback = self.ledger().get_committee_lookback_for_round(commit_round).with_context(|| {
530 format!("BFT failed to retrieve the committee with lag for commit round {commit_round}")
531 })?;
532
533 let leader = match self.ledger().latest_leader() {
535 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
536 _ => {
537 let computed_leader = committee_lookback
539 .get_leader(commit_round)
540 .with_context(|| format!("BFT failed to compute the leader for commit round {commit_round}"))?;
541
542 self.ledger().update_latest_leader(commit_round, computed_leader);
544
545 computed_leader
546 }
547 };
548
549 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
551 else {
552 trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
553 return Ok(());
554 };
555 let certificates = self.dag.read().get_certificates_for_round(certificate_round).with_context(|| {
557 format!("BFT failed to retrieve the certificates for certificate round {certificate_round}")
558 })?;
559
560 let certificate_committee_lookback =
562 self.ledger().get_committee_lookback_for_round(certificate_round).with_context(|| {
563 format!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}")
564 })?;
565
566 let authors = certificates
568 .values()
569 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
570 true => Some(c.author()),
571 false => None,
572 })
573 .collect();
574 if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
576 trace!("BFT is not ready to commit {commit_round}. Availability threshold has not been reached yet.");
578 return Ok(());
579 }
580
581 if IS_SYNCING {
582 info!("Proceeding to commit round {commit_round} with leader '{}' from block sync", fmt_id(leader));
583 } else {
584 info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
585 }
586
587 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
589 }
590
591 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
593 &self,
594 leader_certificate: BatchCertificate<N>,
595 ) -> Result<()> {
596 #[cfg(debug_assertions)]
597 trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());
598
599 let latest_leader_round = leader_certificate.round();
601 let mut leader_certificates = vec![leader_certificate.clone()];
604 {
605 let leader_round = leader_certificate.round();
607
608 let mut current_certificate = leader_certificate;
609 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
610 {
611 let previous_committee_lookback =
613 self.ledger().get_committee_lookback_for_round(round).with_context(|| {
614 format!("BFT failed to retrieve a previous committee lookback for the even round {round}")
615 })?;
616
617 let leader = match self.ledger().latest_leader() {
619 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
620 _ => {
621 let computed_leader = previous_committee_lookback
623 .get_leader(round)
624 .with_context(|| format!("BFT failed to compute the leader for the even round {round}"))?;
625
626 self.ledger().update_latest_leader(round, computed_leader);
628
629 computed_leader
630 }
631 };
632 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
634 else {
635 continue;
636 };
637 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
639 leader_certificates.push(previous_certificate.clone());
641 current_certificate = previous_certificate;
643 } else {
644 #[cfg(debug_assertions)]
645 trace!(
646 "Skipping anchor for round {round} as it is not linked to the most recent committed leader certificate"
647 );
648 }
649 }
650 }
651
652 for leader_certificate in leader_certificates.into_iter().rev() {
654 let leader_round = leader_certificate.round();
656 let commit_subdag = self
658 .order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate)
659 .with_context(|| "BFT failed to order the DAG with DFS")?;
660 if !IS_SYNCING {
662 let mut transmissions = IndexMap::new();
664 let mut seen_transaction_ids = IndexSet::new();
666 let mut seen_solution_ids = IndexSet::new();
668 for certificate in commit_subdag.values().flatten() {
670 for transmission_id in certificate.transmission_ids() {
672 match transmission_id {
676 TransmissionID::Solution(solution_id, _) => {
677 if seen_solution_ids.contains(&solution_id) {
679 continue;
680 }
681 }
682 TransmissionID::Transaction(transaction_id, _) => {
683 if seen_transaction_ids.contains(transaction_id) {
685 continue;
686 }
687 }
688 TransmissionID::Ratification => {
689 bail!("Ratifications are currently not supported in the BFT.")
690 }
691 }
692 if transmissions.contains_key(transmission_id) {
694 continue;
695 }
696 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
699 continue;
700 }
701 let transmission = self.storage().get_transmission(*transmission_id).with_context(|| {
703 format!(
704 "BFT failed to retrieve transmission '{}.{}' from round {}",
705 fmt_id(transmission_id),
706 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
707 certificate.round()
708 )
709 })?;
710 match transmission_id {
712 TransmissionID::Solution(id, _) => {
713 seen_solution_ids.insert(id);
714 }
715 TransmissionID::Transaction(id, _) => {
716 seen_transaction_ids.insert(id);
717 }
718 TransmissionID::Ratification => {}
719 }
720 transmissions.insert(*transmission_id, transmission);
722 }
723 }
724 let subdag = Subdag::from(commit_subdag.clone())?;
727 let anchor_round = subdag.anchor_round();
729 let num_transmissions = transmissions.len();
731 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
733
734 ensure!(
736 anchor_round == leader_round,
737 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
738 );
739
740 if let Some(consensus_sender) = self.consensus_sender.get() {
742 let (callback_sender, callback_receiver) = oneshot::channel();
744 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
746 match callback_receiver.await {
748 Ok(Ok(())) => (), Ok(Err(err)) => {
750 let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
751 error!("{}", &flatten_error(err));
752 return Ok(());
753 }
754 Err(err) => {
755 let err: anyhow::Error = err.into();
756 let err =
757 err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
758 error!("{}", flatten_error(err));
759 return Ok(());
760 }
761 }
762 }
763
764 info!(
765 "\n\nCommitting a subDAG with anchor round {anchor_round} and {num_transmissions} transmissions: {subdag_metadata:?} (syncing={IS_SYNCING})\n",
766 );
767 }
768
769 {
771 let mut dag_write = self.dag.write();
772 let mut count = 0;
773 for certificate in commit_subdag.values().flatten() {
774 dag_write.commit(certificate, self.storage().max_gc_rounds());
775 count += 1;
776 }
777
778 trace!("Committed {count} certificates to the DAG");
779 }
780
781 #[cfg(feature = "telemetry")]
783 self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
784 }
785
786 self.storage().garbage_collect_certificates(latest_leader_round);
802
803 Ok(())
804 }
805
806 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
808 &self,
809 leader_certificate: BatchCertificate<N>,
810 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
811 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
813 let mut already_ordered = HashSet::new();
815 let mut buffer = vec![leader_certificate];
817 while let Some(certificate) = buffer.pop() {
819 commit.entry(certificate.round()).or_default().insert(certificate.clone());
821
822 let previous_round = certificate.round().saturating_sub(1);
827 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
828 continue;
829 }
830 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
834 if already_ordered.contains(previous_certificate_id) {
836 continue;
837 }
838 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
840 continue;
841 }
842 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
844 continue;
845 }
846
847 let previous_certificate = {
849 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
851 Some(previous_certificate) => previous_certificate,
853 None => match self.storage().get_certificate(*previous_certificate_id) {
855 Some(previous_certificate) => previous_certificate,
857 None => bail!(
859 "Missing previous certificate {} for round {previous_round}",
860 fmt_id(previous_certificate_id)
861 ),
862 },
863 }
864 };
865 already_ordered.insert(previous_certificate.id());
867 buffer.push(previous_certificate);
869 }
870 }
871 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
873 Ok(commit)
875 }
876
877 fn is_linked(
879 &self,
880 previous_certificate: BatchCertificate<N>,
881 current_certificate: BatchCertificate<N>,
882 ) -> Result<bool> {
883 let mut traversal = vec![current_certificate.clone()];
885 for round in (previous_certificate.round()..current_certificate.round()).rev() {
887 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
889 bail!("BFT failed to retrieve the certificates for past round {round}");
892 };
893 traversal = certificates
895 .into_values()
896 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
897 .collect();
898 }
899 Ok(traversal.contains(&previous_certificate))
900 }
901}
902
903impl<N: Network> BFT<N> {
904 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
906 let BFTReceiver {
907 mut rx_primary_round,
908 mut rx_primary_certificate,
909 mut rx_sync_bft_dag_at_bootup,
910 mut rx_sync_bft,
911 mut rx_sync_block_committed,
912 } = bft_receiver;
913
914 let self_ = self.clone();
916 self.spawn(async move {
917 while let Some((current_round, callback)) = rx_primary_round.recv().await {
918 callback.send(self_.update_to_next_round(current_round)).ok();
919 }
920 });
921
922 let self_ = self.clone();
924 self.spawn(async move {
925 while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
926 let result = self_.update_dag::<true, false>(certificate).await;
928 callback.send(result).ok();
931 }
932 });
933
934 let self_ = self.clone();
936 self.spawn(async move {
937 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
938 self_.sync_bft_dag_at_bootup(certificates).await;
939 }
940 });
941
942 let self_ = self.clone();
944 self.spawn(async move {
945 while let Some((certificate, callback)) = rx_sync_bft.recv().await {
946 let result = self_.update_dag::<true, true>(certificate).await;
948 callback.send(result).ok();
951 }
952 });
953
954 let self_ = self.clone();
960 self.spawn(async move {
961 while let Some((leader_certificate, callback)) = rx_sync_block_committed.recv().await {
962 self_.dag.write().commit(&leader_certificate, self_.storage().max_gc_rounds());
963 callback.send(Ok(())).ok();
964 }
965 });
966 }
967
968 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
975 let mut dag = self.dag.write();
977
978 for certificate in certificates {
980 dag.commit(&certificate, self.storage().max_gc_rounds());
981 }
982 }
983
984 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
986 self.handles.lock().push(tokio::spawn(future));
987 }
988
989 pub async fn shut_down(&self) {
991 info!("Shutting down the BFT...");
992 let _lock = self.lock.lock().await;
994 self.primary.shut_down().await;
996 self.handles.lock().iter().for_each(|handle| handle.abort());
998 }
999}
1000
1001#[cfg(test)]
1002mod tests {
1003 use crate::{
1004 BFT,
1005 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
1006 helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
1007 };
1008
1009 use snarkos_account::Account;
1010 use snarkos_node_bft_ledger_service::{LedgerService, MockLedgerService};
1011 use snarkos_node_bft_storage_service::BFTMemoryService;
1012 use snarkos_node_sync::BlockSync;
1013 use snarkos_utilities::NodeDataDir;
1014
1015 use snarkvm::{
1016 console::account::{Address, PrivateKey},
1017 ledger::{
1018 committee::{
1019 Committee,
1020 test_helpers::{sample_committee, sample_committee_for_round, sample_committee_for_round_and_members},
1021 },
1022 narwhal::{
1023 BatchCertificate,
1024 batch_certificate::test_helpers::{
1025 sample_batch_certificate,
1026 sample_batch_certificate_for_round,
1027 sample_batch_certificate_for_round_with_committee,
1028 },
1029 },
1030 },
1031 utilities::TestRng,
1032 };
1033
1034 use anyhow::Result;
1035 use indexmap::{IndexMap, IndexSet};
1036 use std::sync::Arc;
1037
1038 type CurrentNetwork = snarkvm::console::network::MainnetV0;
1039
1040 fn sample_test_instance(
1042 committee_round: Option<u64>,
1043 max_gc_rounds: u64,
1044 rng: &mut TestRng,
1045 ) -> (
1046 Committee<CurrentNetwork>,
1047 Account<CurrentNetwork>,
1048 Arc<MockLedgerService<CurrentNetwork>>,
1049 Storage<CurrentNetwork>,
1050 ) {
1051 let committee = match committee_round {
1052 Some(round) => sample_committee_for_round(round, rng),
1053 None => sample_committee(rng),
1054 };
1055 let account = Account::new(rng).unwrap();
1056 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1057 let transmissions = Arc::new(BFTMemoryService::new());
1058 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1059
1060 (committee, account, ledger, storage)
1061 }
1062
1063 fn initialize_bft(
1065 account: Account<CurrentNetwork>,
1066 storage: Storage<CurrentNetwork>,
1067 ledger: Arc<MockLedgerService<CurrentNetwork>>,
1068 ) -> anyhow::Result<BFT<CurrentNetwork>> {
1069 let block_sync = Arc::new(BlockSync::new(ledger.clone()));
1071 BFT::new(
1073 account.clone(),
1074 storage.clone(),
1075 ledger.clone(),
1076 block_sync,
1077 None,
1078 &[],
1079 false,
1080 NodeDataDir::new_test(None),
1081 None,
1082 )
1083 }
1084
1085 #[test]
1086 #[tracing_test::traced_test]
1087 fn test_is_leader_quorum_odd() -> Result<()> {
1088 let rng = &mut TestRng::default();
1089
1090 let mut certificates = IndexSet::new();
1092 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1093 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1094 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1095 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1096
1097 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1099 1,
1100 vec![
1101 certificates[0].author(),
1102 certificates[1].author(),
1103 certificates[2].author(),
1104 certificates[3].author(),
1105 ],
1106 rng,
1107 );
1108
1109 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1111 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1113 let account = Account::new(rng)?;
1115 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1117 assert!(bft.is_timer_expired());
1118 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1120 assert!(!result);
1122 for certificate in certificates.iter() {
1124 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1125 }
1126 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1128 assert!(result); let leader_certificate = sample_batch_certificate(rng);
1131 *bft.leader_certificate.write() = Some(leader_certificate);
1132 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1134 assert!(result); Ok(())
1137 }
1138
1139 #[test]
1140 #[tracing_test::traced_test]
1141 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1142 let rng = &mut TestRng::default();
1143
1144 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1146 assert_eq!(committee.starting_round(), 1);
1147 assert_eq!(storage.current_round(), 1);
1148 assert_eq!(storage.max_gc_rounds(), 10);
1149
1150 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1152 assert!(bft.is_timer_expired());
1153
1154 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1157 assert!(!result);
1158 Ok(())
1159 }
1160
1161 #[test]
1162 #[tracing_test::traced_test]
1163 fn test_is_leader_quorum_even() -> Result<()> {
1164 let rng = &mut TestRng::default();
1165
1166 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1168 assert_eq!(committee.starting_round(), 2);
1169 assert_eq!(storage.current_round(), 2);
1170 assert_eq!(storage.max_gc_rounds(), 10);
1171
1172 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1174 assert!(bft.is_timer_expired());
1175
1176 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1178 assert!(!result);
1179 Ok(())
1180 }
1181
1182 #[test]
1183 #[tracing_test::traced_test]
1184 fn test_is_even_round_ready() -> Result<()> {
1185 let rng = &mut TestRng::default();
1186
1187 let mut certificates = IndexSet::new();
1189 certificates.insert(sample_batch_certificate_for_round(2, rng));
1190 certificates.insert(sample_batch_certificate_for_round(2, rng));
1191 certificates.insert(sample_batch_certificate_for_round(2, rng));
1192 certificates.insert(sample_batch_certificate_for_round(2, rng));
1193
1194 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1196 2,
1197 vec![
1198 certificates[0].author(),
1199 certificates[1].author(),
1200 certificates[2].author(),
1201 certificates[3].author(),
1202 ],
1203 rng,
1204 );
1205
1206 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1208 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1210 let account = Account::new(rng)?;
1212
1213 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1215 assert!(bft.is_timer_expired());
1216
1217 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1219 *bft.leader_certificate.write() = Some(leader_certificate);
1220 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1221 assert!(!result);
1223 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1225 assert!(result);
1226
1227 let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1229 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1231 if !bft_timer.is_timer_expired() {
1232 assert!(!result);
1233 }
1234 let leader_certificate_timeout =
1236 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1237 std::thread::sleep(leader_certificate_timeout);
1238 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1240 if bft_timer.is_timer_expired() {
1241 assert!(result);
1242 } else {
1243 assert!(!result);
1244 }
1245
1246 Ok(())
1247 }
1248
1249 #[test]
1250 #[tracing_test::traced_test]
1251 fn test_update_leader_certificate_odd() -> Result<()> {
1252 let rng = &mut TestRng::default();
1253
1254 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1256 assert_eq!(storage.max_gc_rounds(), 10);
1257
1258 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1260 assert!(bft.is_timer_expired());
1261
1262 let result = bft.update_leader_certificate_to_even_round(1);
1264 assert!(!result);
1265 Ok(())
1266 }
1267
1268 #[test]
1269 #[tracing_test::traced_test]
1270 fn test_update_leader_certificate_bad_round() -> Result<()> {
1271 let rng = &mut TestRng::default();
1272
1273 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1275 assert_eq!(storage.max_gc_rounds(), 10);
1276
1277 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1279
1280 let result = bft.update_leader_certificate_to_even_round(6);
1282 assert!(!result);
1283 Ok(())
1284 }
1285
1286 #[test]
1287 #[tracing_test::traced_test]
1288 fn test_update_leader_certificate_even() -> Result<()> {
1289 let rng = &mut TestRng::default();
1290
1291 let current_round = 3;
1293
1294 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1296 current_round,
1297 rng,
1298 );
1299
1300 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1302 2,
1303 vec![
1304 certificates[0].author(),
1305 certificates[1].author(),
1306 certificates[2].author(),
1307 certificates[3].author(),
1308 ],
1309 rng,
1310 );
1311
1312 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1314
1315 let transmissions = Arc::new(BFTMemoryService::new());
1317 let storage = Storage::new(ledger.clone(), transmissions, 10);
1318 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1319 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1320 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1321 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1322 assert_eq!(storage.current_round(), 2);
1323
1324 let leader = committee.get_leader(2).unwrap();
1326 let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1327
1328 let account = Account::new(rng)?;
1330 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1331
1332 *bft.leader_certificate.write() = Some(leader_certificate);
1334
1335 let result = bft.update_leader_certificate_to_even_round(2);
1338 assert!(result);
1339
1340 Ok(())
1341 }
1342
1343 #[tokio::test]
1344 #[tracing_test::traced_test]
1345 async fn test_order_dag_with_dfs() -> Result<()> {
1346 let rng = &mut TestRng::default();
1347
1348 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1350
1351 let previous_round = 2; let current_round = previous_round + 1;
1354
1355 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1357 current_round,
1358 rng,
1359 );
1360
1361 {
1365 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1367 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1369
1370 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1372
1373 for certificate in previous_certificates.clone() {
1375 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1376 }
1377
1378 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1380 assert!(result.is_ok());
1381 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1382 assert_eq!(candidate_certificates.len(), 1);
1383 let expected_certificates = vec![certificate.clone()];
1384 assert_eq!(
1385 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1386 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1387 );
1388 assert_eq!(candidate_certificates, expected_certificates);
1389 }
1390
1391 {
1395 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1397 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1399
1400 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1402
1403 for certificate in previous_certificates.clone() {
1405 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1406 }
1407
1408 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1410 assert!(result.is_ok());
1411 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1412 assert_eq!(candidate_certificates.len(), 5);
1413 let expected_certificates = vec![
1414 previous_certificates[0].clone(),
1415 previous_certificates[1].clone(),
1416 previous_certificates[2].clone(),
1417 previous_certificates[3].clone(),
1418 certificate,
1419 ];
1420 assert_eq!(
1421 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1422 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1423 );
1424 assert_eq!(candidate_certificates, expected_certificates);
1425 }
1426
1427 Ok(())
1428 }
1429
1430 #[test]
1431 #[tracing_test::traced_test]
1432 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1433 let rng = &mut TestRng::default();
1434
1435 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1437 assert_eq!(committee.starting_round(), 1);
1438 assert_eq!(storage.current_round(), 1);
1439 assert_eq!(storage.max_gc_rounds(), 1);
1440
1441 let previous_round = 2; let current_round = previous_round + 1;
1444
1445 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1447 current_round,
1448 rng,
1449 );
1450 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1452
1453 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1457
1458 let error_msg = format!(
1460 "Missing previous certificate {} for round {previous_round}",
1461 crate::helpers::fmt_id(previous_certificate_ids[3]),
1462 );
1463
1464 let result = bft.order_dag_with_dfs::<false>(certificate);
1466 assert!(result.is_err());
1467 assert_eq!(result.unwrap_err().to_string(), error_msg);
1468 Ok(())
1469 }
1470
1471 #[tokio::test]
1472 async fn test_bft_gc_on_commit() -> Result<()> {
1473 let rng = &mut TestRng::default();
1474
1475 let max_gc_rounds = 1;
1477 let committee_round = 0;
1478 let commit_round = 2;
1479 let current_round = commit_round + 1;
1480
1481 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1483 current_round,
1484 rng,
1485 );
1486
1487 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1489 committee_round,
1490 vec![
1491 certificates[0].author(),
1492 certificates[1].author(),
1493 certificates[2].author(),
1494 certificates[3].author(),
1495 ],
1496 rng,
1497 );
1498
1499 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1501
1502 let transmissions = Arc::new(BFTMemoryService::new());
1504 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1505 for certificate in certificates.iter() {
1507 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1508 }
1509
1510 let leader = committee.get_leader(commit_round).unwrap();
1512 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1513
1514 let account = Account::new(rng)?;
1516 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1517
1518 *bft.dag.write() = mock_dag_with_modified_last_committed_round(commit_round);
1520
1521 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1523
1524 for certificate in certificates {
1526 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1527 }
1528
1529 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1531
1532 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1534
1535 Ok(())
1536 }
1537
1538 #[tokio::test]
1539 #[tracing_test::traced_test]
1540 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1541 let rng = &mut TestRng::default();
1542
1543 let max_gc_rounds = 1;
1545 let committee_round = 0;
1546 let commit_round = 2;
1547 let current_round = commit_round + 1;
1548
1549 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1551 current_round,
1552 rng,
1553 );
1554
1555 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1557 committee_round,
1558 vec![
1559 certificates[0].author(),
1560 certificates[1].author(),
1561 certificates[2].author(),
1562 certificates[3].author(),
1563 ],
1564 rng,
1565 );
1566
1567 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1569
1570 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1572 for certificate in certificates.iter() {
1574 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1575 }
1576
1577 let leader = committee.get_leader(commit_round).unwrap();
1579 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1580
1581 let account = Account::new(rng)?;
1583 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1584
1585 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1587
1588 for certificate in certificates.clone() {
1590 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1591 }
1592
1593 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1595
1596 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1600 let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1602
1603 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1605
1606 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1608
1609 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1611 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1612
1613 for certificate in certificates {
1615 let certificate_round = certificate.round();
1616 let certificate_id = certificate.id();
1617 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1619 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1622 }
1623
1624 Ok(())
1625 }
1626
1627 #[tokio::test]
1628 #[tracing_test::traced_test]
1629 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1630 let rng = &mut TestRng::default();
1637
1638 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1640 let committee_round = 0;
1641 let commit_round = 2;
1642 let current_round = commit_round + 1;
1643 let next_round = current_round + 1;
1644
1645 let (round_to_certificates_map, committee) = {
1647 let private_keys = vec![
1648 PrivateKey::new(rng).unwrap(),
1649 PrivateKey::new(rng).unwrap(),
1650 PrivateKey::new(rng).unwrap(),
1651 PrivateKey::new(rng).unwrap(),
1652 ];
1653 let addresses = vec![
1654 Address::try_from(private_keys[0])?,
1655 Address::try_from(private_keys[1])?,
1656 Address::try_from(private_keys[2])?,
1657 Address::try_from(private_keys[3])?,
1658 ];
1659 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1660 committee_round,
1661 addresses,
1662 rng,
1663 );
1664 let mut round_to_certificates_map: IndexMap<
1666 u64,
1667 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1668 > = IndexMap::new();
1669 let mut previous_certificates = IndexSet::with_capacity(4);
1670 for _ in 0..4 {
1672 previous_certificates.insert(sample_batch_certificate(rng));
1673 }
1674 for round in 0..commit_round + 3 {
1675 let mut current_certificates = IndexSet::new();
1676 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1677 IndexSet::new()
1678 } else {
1679 previous_certificates.iter().map(|c| c.id()).collect()
1680 };
1681 let transmission_ids =
1682 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1683 .into_iter()
1684 .collect::<IndexSet<_>>();
1685 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1686 let committee_id = committee.id();
1687 for (i, private_key_1) in private_keys.iter().enumerate() {
1688 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1689 private_key_1,
1690 round,
1691 timestamp,
1692 committee_id,
1693 transmission_ids.clone(),
1694 previous_certificate_ids.clone(),
1695 rng,
1696 )
1697 .unwrap();
1698 let mut signatures = IndexSet::with_capacity(4);
1699 for (j, private_key_2) in private_keys.iter().enumerate() {
1700 if i != j {
1701 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1702 }
1703 }
1704 let certificate =
1705 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1706 current_certificates.insert(certificate);
1707 }
1708 round_to_certificates_map.insert(round, current_certificates.clone());
1710 previous_certificates = current_certificates.clone();
1711 }
1712 (round_to_certificates_map, committee)
1713 };
1714
1715 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1717 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1719 let leader = committee.get_leader(commit_round).unwrap();
1721 let next_leader = committee.get_leader(next_round).unwrap();
1722 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1724 for i in 1..=commit_round {
1725 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1726 if i == commit_round {
1727 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1729 if let Some(c) = leader_certificate {
1730 pre_shutdown_certificates.push(c.clone());
1731 }
1732 continue;
1733 }
1734 pre_shutdown_certificates.extend(certificates);
1735 }
1736 for certificate in pre_shutdown_certificates.iter() {
1737 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1738 }
1739 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1741 Vec::new();
1742 for j in commit_round..=commit_round + 2 {
1743 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1744 post_shutdown_certificates.extend(certificate);
1745 }
1746 for certificate in post_shutdown_certificates.iter() {
1747 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1748 }
1749 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1751 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1752
1753 let account = Account::new(rng)?;
1755 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1756
1757 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1759
1760 for certificate in pre_shutdown_certificates.clone() {
1762 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1763 }
1764
1765 for certificate in post_shutdown_certificates.clone() {
1767 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1768 }
1769 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1771 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1772 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1773
1774 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1778
1779 let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1781
1782 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1784
1785 for certificate in post_shutdown_certificates.iter() {
1787 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1788 }
1789 for certificate in post_shutdown_certificates.clone() {
1790 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1791 }
1792 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1794 let commit_subdag_metadata_bootup =
1795 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1796 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1797 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1798
1799 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1803
1804 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1806 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1807 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1808 assert!(
1809 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1810 );
1811
1812 for certificate in pre_shutdown_certificates.clone() {
1814 let certificate_round = certificate.round();
1815 let certificate_id = certificate.id();
1816 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1818 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1819 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1822 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1823 }
1824
1825 for certificate in committed_certificates_bootup.clone() {
1827 let certificate_round = certificate.round();
1828 let certificate_id = certificate.id();
1829 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1831 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1832 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1835 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1836 }
1837
1838 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1840
1841 Ok(())
1842 }
1843
1844 #[tokio::test]
1845 #[tracing_test::traced_test]
1846 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1847 let rng = &mut TestRng::default();
1854
1855 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1857 let committee_round = 0;
1858 let commit_round = 2;
1859 let current_round = commit_round + 1;
1860 let next_round = current_round + 1;
1861
1862 let (round_to_certificates_map, committee) = {
1864 let private_keys = vec![
1865 PrivateKey::new(rng).unwrap(),
1866 PrivateKey::new(rng).unwrap(),
1867 PrivateKey::new(rng).unwrap(),
1868 PrivateKey::new(rng).unwrap(),
1869 ];
1870 let addresses = vec![
1871 Address::try_from(private_keys[0])?,
1872 Address::try_from(private_keys[1])?,
1873 Address::try_from(private_keys[2])?,
1874 Address::try_from(private_keys[3])?,
1875 ];
1876 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1877 committee_round,
1878 addresses,
1879 rng,
1880 );
1881 let mut round_to_certificates_map: IndexMap<
1883 u64,
1884 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1885 > = IndexMap::new();
1886 let mut previous_certificates = IndexSet::with_capacity(4);
1887 for _ in 0..4 {
1889 previous_certificates.insert(sample_batch_certificate(rng));
1890 }
1891 for round in 0..=commit_round + 2 {
1892 let mut current_certificates = IndexSet::new();
1893 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1894 IndexSet::new()
1895 } else {
1896 previous_certificates.iter().map(|c| c.id()).collect()
1897 };
1898 let transmission_ids =
1899 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1900 .into_iter()
1901 .collect::<IndexSet<_>>();
1902 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1903 let committee_id = committee.id();
1904 for (i, private_key_1) in private_keys.iter().enumerate() {
1905 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1906 private_key_1,
1907 round,
1908 timestamp,
1909 committee_id,
1910 transmission_ids.clone(),
1911 previous_certificate_ids.clone(),
1912 rng,
1913 )
1914 .unwrap();
1915 let mut signatures = IndexSet::with_capacity(4);
1916 for (j, private_key_2) in private_keys.iter().enumerate() {
1917 if i != j {
1918 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1919 }
1920 }
1921 let certificate =
1922 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1923 current_certificates.insert(certificate);
1924 }
1925 round_to_certificates_map.insert(round, current_certificates.clone());
1927 previous_certificates = current_certificates.clone();
1928 }
1929 (round_to_certificates_map, committee)
1930 };
1931
1932 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1934 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1936 let leader = committee.get_leader(commit_round).unwrap();
1938 let next_leader = committee.get_leader(next_round).unwrap();
1939 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1941 for i in 1..=commit_round {
1942 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1943 if i == commit_round {
1944 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1946 if let Some(c) = leader_certificate {
1947 pre_shutdown_certificates.push(c.clone());
1948 }
1949 continue;
1950 }
1951 pre_shutdown_certificates.extend(certificates);
1952 }
1953 for certificate in pre_shutdown_certificates.iter() {
1954 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1955 }
1956 let account = Account::new(rng)?;
1958 let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1959
1960 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1962 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1964
1965 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1967 Vec::new();
1968 for j in commit_round..=commit_round + 2 {
1969 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1970 post_shutdown_certificates.extend(certificate);
1971 }
1972 for certificate in post_shutdown_certificates.iter() {
1973 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1974 }
1975
1976 for certificate in post_shutdown_certificates.clone() {
1978 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1979 }
1980
1981 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1983 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1984 let committed_certificates = commit_subdag.values().flatten();
1985
1986 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1988 for committed_certificate in committed_certificates.clone() {
1989 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1990 }
1991 }
1992 Ok(())
1993 }
1994
1995 #[test_log::test(tokio::test)]
1997 async fn test_commit_via_is_linked() {
1998 let rng = &mut TestRng::default();
1999
2000 let committee_round = 0;
2001 let leader_round_1 = 2;
2002 let leader_round_2 = 4; let max_gc_rounds = 50;
2004
2005 let num_authors = 4;
2007 let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2008 let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2009
2010 let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2011 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2012 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2013 let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2014
2015 let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2016
2017 let round1_certs: IndexSet<_> = (0..num_authors)
2019 .map(|idx| {
2020 let author = &private_keys[idx];
2021 let endorsements: Vec<_> = private_keys
2022 .iter()
2023 .enumerate()
2024 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2025 .collect();
2026
2027 sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2028 })
2029 .collect();
2030 certificates_by_round.insert(1, round1_certs.clone());
2031
2032 let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2033 let mut leader1_certificate = None;
2034
2035 let round2_certs: IndexSet<_> = (0..num_authors)
2036 .map(|idx| {
2037 let author = &private_keys[idx];
2038 let endorsements: Vec<_> = private_keys
2039 .iter()
2040 .enumerate()
2041 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2042 .collect();
2043 let cert = sample_batch_certificate_for_round_with_committee(
2044 leader_round_1,
2045 round1_certs.iter().map(|c| c.id()).collect(),
2046 author,
2047 &endorsements[..],
2048 rng,
2049 );
2050
2051 if cert.author() == leader1 {
2052 leader1_certificate = Some(cert.clone());
2053 }
2054 cert
2055 })
2056 .collect();
2057 certificates_by_round.insert(leader_round_1, round2_certs.clone());
2058
2059 let round3_certs: IndexSet<_> = (0..num_authors)
2060 .map(|idx| {
2061 let author = &private_keys[idx];
2062 let endorsements: Vec<_> = private_keys
2063 .iter()
2064 .enumerate()
2065 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2066 .collect();
2067
2068 let previous_certificate_ids: IndexSet<_> = round2_certs
2069 .iter()
2070 .filter_map(|cert| {
2071 if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2073 })
2074 .collect();
2075
2076 sample_batch_certificate_for_round_with_committee(
2077 leader_round_1 + 1,
2078 previous_certificate_ids,
2079 author,
2080 &endorsements[..],
2081 rng,
2082 )
2083 })
2084 .collect();
2085 certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2086
2087 let leader_certificate_1 = leader1_certificate.unwrap();
2089 assert!(
2090 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2091 "Leader certificate 1 should not be committed yet"
2092 );
2093 assert_eq!(bft.dag.read().last_committed_round(), 0);
2094
2095 let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2096 let round4_certs: IndexSet<_> = (0..num_authors)
2097 .map(|idx| {
2098 let endorsements: Vec<_> = private_keys
2099 .iter()
2100 .enumerate()
2101 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2102 .collect();
2103
2104 sample_batch_certificate_for_round_with_committee(
2105 leader_round_2,
2106 round3_certs.iter().map(|c| c.id()).collect(),
2107 &private_keys[idx],
2108 &endorsements[..],
2109 rng,
2110 )
2111 })
2112 .collect();
2113 certificates_by_round.insert(leader_round_2, round4_certs.clone());
2114
2115 for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2117 storage.testing_only_insert_certificate_testing_only(certificate.clone());
2118 bft.update_dag::<false, false>(certificate).await.unwrap();
2119 }
2120
2121 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2122
2123 assert!(
2124 bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2125 "Leader certificate 1 should be linked to leader certificate 2"
2126 );
2127
2128 bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2130
2131 assert!(
2133 bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2134 "Leader certificate for round 2 should be committed when committing at round 4"
2135 );
2136
2137 assert!(
2139 bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2140 "Leader certificate for round 4 should be committed"
2141 );
2142
2143 assert_eq!(bft.dag.read().last_committed_round(), 4);
2144 }
2145
2146 #[test_log::test(tokio::test)]
2147 async fn test_commit_via_is_linked_with_skipped_anchor() {
2148 let rng = &mut TestRng::default();
2149
2150 let committee_round = 0;
2151 let leader_round_1 = 2;
2152 let leader_round_2 = 4;
2153 let max_gc_rounds = 50;
2154
2155 let num_authors = 4;
2156 let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2157 let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2158
2159 let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2160 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2161 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2162 let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2163
2164 let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2165
2166 let round1_certs: IndexSet<_> = (0..num_authors)
2168 .map(|idx| {
2169 let author = &private_keys[idx];
2170 let endorsements: Vec<_> = private_keys
2171 .iter()
2172 .enumerate()
2173 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2174 .collect();
2175
2176 sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2177 })
2178 .collect();
2179 certificates_by_round.insert(1, round1_certs.clone());
2180
2181 let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2182 let mut leader1_certificate = None;
2183
2184 let round2_certs: IndexSet<_> = (0..num_authors)
2185 .map(|idx| {
2186 let author = &private_keys[idx];
2187 let endorsements: Vec<_> = private_keys
2188 .iter()
2189 .enumerate()
2190 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2191 .collect();
2192 let cert = sample_batch_certificate_for_round_with_committee(
2193 leader_round_1,
2194 round1_certs.iter().map(|c| c.id()).collect(),
2195 author,
2196 &endorsements[..],
2197 rng,
2198 );
2199
2200 if cert.author() == leader1 {
2201 leader1_certificate = Some(cert.clone());
2202 }
2203 cert
2204 })
2205 .collect();
2206 certificates_by_round.insert(leader_round_1, round2_certs.clone());
2207
2208 let round3_certs: IndexSet<_> = (0..num_authors)
2209 .map(|idx| {
2210 let author = &private_keys[idx];
2211 let endorsements: Vec<_> = private_keys
2212 .iter()
2213 .enumerate()
2214 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2215 .collect();
2216
2217 let previous_certificate_ids: IndexSet<_> = round2_certs
2218 .iter()
2219 .filter_map(|cert| {
2220 if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2222 })
2223 .collect();
2224
2225 sample_batch_certificate_for_round_with_committee(
2226 leader_round_1 + 1,
2227 previous_certificate_ids,
2228 author,
2229 &endorsements[..],
2230 rng,
2231 )
2232 })
2233 .collect();
2234 certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2235
2236 let leader_certificate_1 = leader1_certificate.unwrap();
2238 assert!(
2239 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2240 "Leader certificate 1 should not be committed yet"
2241 );
2242
2243 let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2244 let round4_certs: IndexSet<_> = (0..num_authors)
2245 .map(|idx| {
2246 let endorsements: Vec<_> = private_keys
2247 .iter()
2248 .enumerate()
2249 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2250 .collect();
2251
2252 let previous_certificate_ids: IndexSet<_> = round3_certs
2254 .iter()
2255 .filter_map(|cert| if cert.author() == leader1 { None } else { Some(cert.id()) })
2256 .collect();
2257
2258 sample_batch_certificate_for_round_with_committee(
2259 leader_round_2,
2260 previous_certificate_ids,
2261 &private_keys[idx],
2262 &endorsements[..],
2263 rng,
2264 )
2265 })
2266 .collect();
2267 certificates_by_round.insert(leader_round_2, round4_certs.clone());
2268
2269 for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2271 storage.testing_only_insert_certificate_testing_only(certificate.clone());
2272 bft.update_dag::<false, false>(certificate).await.unwrap();
2273 }
2274
2275 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2276
2277 assert!(
2278 !bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2279 "Leader certificate 1 should not be linked to leader certificate 2"
2280 );
2281 assert_eq!(bft.dag.read().last_committed_round(), 0);
2282
2283 bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2285
2286 assert!(
2288 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2289 "Leader certificate for round 2 should not be committed when committing at round 4"
2290 );
2291
2292 assert!(
2294 bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2295 "Leader certificate for round 4 should be committed"
2296 );
2297 assert_eq!(bft.dag.read().last_committed_round(), 4);
2298 }
2299}