1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 Primary,
19 helpers::{
20 BFTReceiver,
21 ConsensusSender,
22 DAG,
23 PrimaryReceiver,
24 PrimarySender,
25 Storage,
26 fmt_id,
27 init_bft_channels,
28 now,
29 },
30};
31
32use snarkos_account::Account;
33use snarkos_node_bft_ledger_service::LedgerService;
34use snarkos_node_sync::{BlockSync, Ping};
35use snarkos_utilities::NodeDataDir;
36
37use snarkvm::{
38 console::account::Address,
39 ledger::{
40 block::Transaction,
41 committee::Committee,
42 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
43 puzzle::{Solution, SolutionID},
44 },
45 prelude::{Field, Network, Result, bail, ensure},
46 utilities::flatten_error,
47};
48
49use anyhow::Context;
50use colored::Colorize;
51use indexmap::{IndexMap, IndexSet};
52#[cfg(feature = "locktick")]
53use locktick::{
54 parking_lot::{Mutex, RwLock},
55 tokio::Mutex as TMutex,
56};
57#[cfg(not(feature = "locktick"))]
58use parking_lot::{Mutex, RwLock};
59use std::{
60 collections::{BTreeMap, HashSet},
61 future::Future,
62 net::SocketAddr,
63 sync::{
64 Arc,
65 atomic::{AtomicI64, Ordering},
66 },
67};
68#[cfg(not(feature = "locktick"))]
69use tokio::sync::Mutex as TMutex;
70use tokio::{
71 sync::{OnceCell, oneshot},
72 task::JoinHandle,
73};
74
75#[derive(Clone)]
76pub struct BFT<N: Network> {
77 primary: Primary<N>,
79 dag: Arc<RwLock<DAG<N>>>,
81 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
83 leader_certificate_timer: Arc<AtomicI64>,
85 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
87 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
89 lock: Arc<TMutex<()>>,
91}
92
93impl<N: Network> BFT<N> {
94 #[allow(clippy::too_many_arguments)]
96 pub fn new(
97 account: Account<N>,
98 storage: Storage<N>,
99 ledger: Arc<dyn LedgerService<N>>,
100 block_sync: Arc<BlockSync<N>>,
101 ip: Option<SocketAddr>,
102 trusted_validators: &[SocketAddr],
103 trusted_peers_only: bool,
104 node_data_dir: NodeDataDir,
105 dev: Option<u16>,
106 ) -> Result<Self> {
107 Ok(Self {
108 primary: Primary::new(
109 account,
110 storage,
111 ledger,
112 block_sync,
113 ip,
114 trusted_validators,
115 trusted_peers_only,
116 node_data_dir,
117 dev,
118 )?,
119 dag: Default::default(),
120 leader_certificate: Default::default(),
121 leader_certificate_timer: Default::default(),
122 consensus_sender: Default::default(),
123 handles: Default::default(),
124 lock: Default::default(),
125 })
126 }
127
128 pub async fn run(
133 &mut self,
134 ping: Option<Arc<Ping<N>>>,
135 consensus_sender: Option<ConsensusSender<N>>,
136 primary_sender: PrimarySender<N>,
137 primary_receiver: PrimaryReceiver<N>,
138 ) -> Result<()> {
139 info!("Starting the BFT instance...");
140 let (bft_sender, bft_receiver) = init_bft_channels::<N>();
142 self.start_handlers(bft_receiver);
144 self.primary.run(ping, Some(bft_sender), primary_sender, primary_receiver).await?;
146 if let Some(consensus_sender) = consensus_sender {
149 self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
150 }
151 Ok(())
152 }
153
154 pub fn is_synced(&self) -> bool {
156 self.primary.is_synced()
157 }
158
159 pub const fn primary(&self) -> &Primary<N> {
161 &self.primary
162 }
163
164 pub const fn storage(&self) -> &Storage<N> {
166 self.primary.storage()
167 }
168
169 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
171 self.primary.ledger()
172 }
173
174 pub fn leader(&self) -> Option<Address<N>> {
176 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
177 }
178
179 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
181 &self.leader_certificate
182 }
183}
184
185impl<N: Network> BFT<N> {
186 pub fn num_unconfirmed_transmissions(&self) -> usize {
188 self.primary.num_unconfirmed_transmissions()
189 }
190
191 pub fn num_unconfirmed_ratifications(&self) -> usize {
193 self.primary.num_unconfirmed_ratifications()
194 }
195
196 pub fn num_unconfirmed_solutions(&self) -> usize {
198 self.primary.num_unconfirmed_solutions()
199 }
200
201 pub fn num_unconfirmed_transactions(&self) -> usize {
203 self.primary.num_unconfirmed_transactions()
204 }
205}
206
207impl<N: Network> BFT<N> {
208 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
210 self.primary.worker_transmission_ids()
211 }
212
213 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
215 self.primary.worker_transmissions()
216 }
217
218 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
220 self.primary.worker_solutions()
221 }
222
223 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
225 self.primary.worker_transactions()
226 }
227}
228
229impl<N: Network> BFT<N> {
230 fn update_to_next_round(&self, current_round: u64) -> bool {
232 let storage_round = self.storage().current_round();
234 if current_round < storage_round {
235 debug!(
236 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
237 );
238 return false;
239 }
240
241 let is_ready = match current_round % 2 == 0 {
243 true => self.update_leader_certificate_to_even_round(current_round),
244 false => self.is_leader_quorum_or_nonleaders_available(current_round),
245 };
246
247 #[cfg(feature = "metrics")]
248 {
249 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
250 if start > 0 {
252 let end = now();
253 let elapsed = std::time::Duration::from_secs((end - start) as u64);
254 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
255 }
256 }
257
258 if current_round % 2 == 0 {
260 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
262 if !is_ready {
264 trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
265 }
266 let leader_round = leader_certificate.round();
268 match leader_round == current_round {
269 true => {
270 info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
271 #[cfg(feature = "metrics")]
272 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
273 }
274 false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
275 }
276 } else {
277 match is_ready {
278 true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
279 false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()),
280 }
281 }
282 }
283
284 if is_ready {
286 if let Err(err) = self
288 .storage()
289 .increment_to_next_round(current_round)
290 .with_context(|| format!("BFT failed to increment to the next round from round {current_round}"))
291 {
292 warn!("{}", &flatten_error(err));
293 return false;
294 }
295 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
297 }
298
299 is_ready
300 }
301
302 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
308 let current_round = self.storage().current_round();
310 if current_round != even_round {
312 warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
313 return false;
314 }
315
316 if current_round % 2 != 0 || current_round < 2 {
318 error!("BFT cannot update the leader certificate in an odd round");
319 return false;
320 }
321
322 let current_certificates = self.storage().get_certificates_for_round(current_round);
324 if current_certificates.is_empty() {
326 *self.leader_certificate.write() = None;
328 return false;
329 }
330
331 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
333 Ok(committee) => committee,
334 Err(err) => {
335 let err = err.context(format!(
336 "BFT failed to retrieve the committee lookback for the even round {current_round}"
337 ));
338 warn!("{}", &flatten_error(err));
339 return false;
340 }
341 };
342 let leader = match self.ledger().latest_leader() {
344 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
345 _ => {
346 let computed_leader = match committee_lookback.get_leader(current_round) {
348 Ok(leader) => leader,
349 Err(err) => {
350 let err =
351 err.context(format!("BFT failed to compute the leader for the even round {current_round}"));
352 error!("{}", &flatten_error(err));
353 return false;
354 }
355 };
356
357 self.ledger().update_latest_leader(current_round, computed_leader);
359
360 computed_leader
361 }
362 };
363 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
365 *self.leader_certificate.write() = leader_certificate.cloned();
366
367 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
368 }
369
370 fn is_even_round_ready_for_next_round(
374 &self,
375 certificates: IndexSet<BatchCertificate<N>>,
376 committee: Committee<N>,
377 current_round: u64,
378 ) -> bool {
379 let authors = certificates.into_iter().map(|c| c.author()).collect();
381 if !committee.is_quorum_threshold_reached(&authors) {
383 trace!("BFT failed to reach quorum threshold in even round {current_round}");
384 return false;
385 }
386 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
388 if leader_certificate.round() == current_round {
389 return true;
390 }
391 }
392 if self.is_timer_expired() {
394 debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
395 return true;
396 }
397 false
399 }
400
401 fn is_timer_expired(&self) -> bool {
405 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
406 }
407
408 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
413 let current_round = self.storage().current_round();
415 if current_round != odd_round {
417 warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
418 return false;
419 }
420 if current_round % 2 != 1 {
422 error!("BFT does not compute stakes for the leader certificate in an even round");
423 return false;
424 }
425 let current_certificates = self.storage().get_certificates_for_round(current_round);
427 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
429 Ok(committee) => committee,
430 Err(err) => {
431 let err = err.context(format!(
432 "BFT failed to retrieve the committee lookback for the odd round {current_round}"
433 ));
434 error!("{}", &flatten_error(err));
435 return false;
436 }
437 };
438 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
440 if !committee_lookback.is_quorum_threshold_reached(&authors) {
442 trace!("BFT failed reach quorum threshold in odd round {current_round}.");
443 return false;
444 }
445 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
447 return true;
449 };
450 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
452 leader_certificate.id(),
453 current_certificates,
454 &committee_lookback,
455 );
456 stake_with_leader >= committee_lookback.availability_threshold()
458 || stake_without_leader >= committee_lookback.quorum_threshold()
459 || self.is_timer_expired()
460 }
461
462 fn compute_stake_for_leader_certificate(
464 &self,
465 leader_certificate_id: Field<N>,
466 current_certificates: IndexSet<BatchCertificate<N>>,
467 current_committee: &Committee<N>,
468 ) -> (u64, u64) {
469 if current_certificates.is_empty() {
471 return (0, 0);
472 }
473
474 let mut stake_with_leader = 0u64;
476 let mut stake_without_leader = 0u64;
478 for certificate in current_certificates {
480 let stake = current_committee.get_stake(certificate.author());
482 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
484 true => stake_with_leader = stake_with_leader.saturating_add(stake),
486 false => stake_without_leader = stake_without_leader.saturating_add(stake),
488 }
489 }
490 (stake_with_leader, stake_without_leader)
492 }
493}
494
495impl<N: Network> BFT<N> {
496 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
498 &self,
499 certificate: BatchCertificate<N>,
500 ) -> Result<()> {
501 let _lock = self.lock.lock().await;
503
504 let certificate_round = certificate.round();
507
508 self.dag.write().insert(certificate);
510
511 let commit_round = certificate_round.saturating_sub(1);
513
514 if !commit_round.is_multiple_of(2) || commit_round < 2 {
518 return Ok(());
519 }
520 if commit_round <= self.dag.read().last_committed_round() {
522 return Ok(());
523 }
524
525 trace!("Checking if the leader is ready to be committed for round {commit_round}...");
527
528 let committee_lookback = self.ledger().get_committee_lookback_for_round(commit_round).with_context(|| {
530 format!("BFT failed to retrieve the committee with lag for commit round {commit_round}")
531 })?;
532
533 let leader = match self.ledger().latest_leader() {
535 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
536 _ => {
537 let computed_leader = committee_lookback
539 .get_leader(commit_round)
540 .with_context(|| format!("BFT failed to compute the leader for commit round {commit_round}"))?;
541
542 self.ledger().update_latest_leader(commit_round, computed_leader);
544
545 computed_leader
546 }
547 };
548
549 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
551 else {
552 trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
553 return Ok(());
554 };
555 let certificates = self.dag.read().get_certificates_for_round(certificate_round).with_context(|| {
557 format!("BFT failed to retrieve the certificates for certificate round {certificate_round}")
558 })?;
559
560 let certificate_committee_lookback =
562 self.ledger().get_committee_lookback_for_round(certificate_round).with_context(|| {
563 format!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}")
564 })?;
565
566 let authors = certificates
568 .values()
569 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
570 true => Some(c.author()),
571 false => None,
572 })
573 .collect();
574 if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
576 trace!("BFT is not ready to commit {commit_round}. Availability threshold has not been reached yet.");
578 return Ok(());
579 }
580
581 if IS_SYNCING {
582 info!("Proceeding to commit round {commit_round} with leader '{}' from block sync", fmt_id(leader));
583 } else {
584 info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
585 }
586
587 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
589 }
590
591 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
593 &self,
594 leader_certificate: BatchCertificate<N>,
595 ) -> Result<()> {
596 #[cfg(feature = "metrics")]
597 let start = std::time::Instant::now();
598 #[cfg(debug_assertions)]
599 trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());
600
601 let latest_leader_round = leader_certificate.round();
603 let mut leader_certificates = vec![leader_certificate.clone()];
606 {
607 let leader_round = leader_certificate.round();
609
610 let mut current_certificate = leader_certificate;
611 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
612 {
613 let previous_committee_lookback =
615 self.ledger().get_committee_lookback_for_round(round).with_context(|| {
616 format!("BFT failed to retrieve a previous committee lookback for the even round {round}")
617 })?;
618
619 let leader = match self.ledger().latest_leader() {
621 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
622 _ => {
623 let computed_leader = previous_committee_lookback
625 .get_leader(round)
626 .with_context(|| format!("BFT failed to compute the leader for the even round {round}"))?;
627
628 self.ledger().update_latest_leader(round, computed_leader);
630
631 computed_leader
632 }
633 };
634 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
636 else {
637 continue;
638 };
639 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
641 leader_certificates.push(previous_certificate.clone());
643 current_certificate = previous_certificate;
645 } else {
646 #[cfg(debug_assertions)]
647 trace!(
648 "Skipping anchor for round {round} as it is not linked to the most recent committed leader certificate"
649 );
650 }
651 }
652 }
653
654 for leader_certificate in leader_certificates.into_iter().rev() {
656 let leader_round = leader_certificate.round();
658 let commit_subdag = self
660 .order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate)
661 .with_context(|| "BFT failed to order the DAG with DFS")?;
662 if !IS_SYNCING {
664 let mut transmissions = IndexMap::new();
666 let mut seen_transaction_ids = IndexSet::new();
668 let mut seen_solution_ids = IndexSet::new();
670 for certificate in commit_subdag.values().flatten() {
672 for transmission_id in certificate.transmission_ids() {
674 match transmission_id {
678 TransmissionID::Solution(solution_id, _) => {
679 if seen_solution_ids.contains(&solution_id) {
681 continue;
682 }
683 }
684 TransmissionID::Transaction(transaction_id, _) => {
685 if seen_transaction_ids.contains(transaction_id) {
687 continue;
688 }
689 }
690 TransmissionID::Ratification => {
691 bail!("Ratifications are currently not supported in the BFT.")
692 }
693 }
694 if transmissions.contains_key(transmission_id) {
696 continue;
697 }
698 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
701 continue;
702 }
703 let transmission = self.storage().get_transmission(*transmission_id).with_context(|| {
705 format!(
706 "BFT failed to retrieve transmission '{}.{}' from round {}",
707 fmt_id(transmission_id),
708 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
709 certificate.round()
710 )
711 })?;
712 match transmission_id {
714 TransmissionID::Solution(id, _) => {
715 seen_solution_ids.insert(id);
716 }
717 TransmissionID::Transaction(id, _) => {
718 seen_transaction_ids.insert(id);
719 }
720 TransmissionID::Ratification => {}
721 }
722 transmissions.insert(*transmission_id, transmission);
724 }
725 }
726 let subdag = Subdag::from(commit_subdag.clone())?;
729 let anchor_round = subdag.anchor_round();
731 let num_transmissions = transmissions.len();
733 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
735
736 ensure!(
738 anchor_round == leader_round,
739 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
740 );
741
742 if let Some(consensus_sender) = self.consensus_sender.get() {
744 let (callback_sender, callback_receiver) = oneshot::channel();
746 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
748 match callback_receiver.await {
750 Ok(Ok(())) => (), Ok(Err(err)) => {
752 let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
753 error!("{}", &flatten_error(err));
754 return Ok(());
755 }
756 Err(err) => {
757 let err: anyhow::Error = err.into();
758 let err =
759 err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
760 error!("{}", flatten_error(err));
761 return Ok(());
762 }
763 }
764 }
765
766 info!(
767 "\n\nCommitting a subDAG with anchor round {anchor_round} and {num_transmissions} transmissions: {subdag_metadata:?} (syncing={IS_SYNCING})\n",
768 );
769 }
770
771 {
773 let mut dag_write = self.dag.write();
774 let mut count = 0;
775 for certificate in commit_subdag.values().flatten() {
776 dag_write.commit(certificate, self.storage().max_gc_rounds());
777 count += 1;
778 }
779
780 trace!("Committed {count} certificates to the DAG");
781 }
782
783 #[cfg(feature = "telemetry")]
785 self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
786 }
787
788 self.storage().garbage_collect_certificates(latest_leader_round);
804
805 #[cfg(feature = "metrics")]
806 metrics::histogram(metrics::bft::COMMIT_LEADER_CERTIFICATE_LATENCY, start.elapsed().as_secs_f64());
807 Ok(())
808 }
809
810 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
812 &self,
813 leader_certificate: BatchCertificate<N>,
814 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
815 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
817 let mut already_ordered = HashSet::new();
819 let mut buffer = vec![leader_certificate];
821 while let Some(certificate) = buffer.pop() {
823 commit.entry(certificate.round()).or_default().insert(certificate.clone());
825
826 let previous_round = certificate.round().saturating_sub(1);
831 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
832 continue;
833 }
834 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
838 if already_ordered.contains(previous_certificate_id) {
840 continue;
841 }
842 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
844 continue;
845 }
846 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
848 continue;
849 }
850
851 let previous_certificate = {
853 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
855 Some(previous_certificate) => previous_certificate,
857 None => match self.storage().get_certificate(*previous_certificate_id) {
859 Some(previous_certificate) => previous_certificate,
861 None => bail!(
863 "Missing previous certificate {} for round {previous_round}",
864 fmt_id(previous_certificate_id)
865 ),
866 },
867 }
868 };
869 already_ordered.insert(previous_certificate.id());
871 buffer.push(previous_certificate);
873 }
874 }
875 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
877 Ok(commit)
879 }
880
881 fn is_linked(
883 &self,
884 previous_certificate: BatchCertificate<N>,
885 current_certificate: BatchCertificate<N>,
886 ) -> Result<bool> {
887 let mut traversal = vec![current_certificate.clone()];
889 for round in (previous_certificate.round()..current_certificate.round()).rev() {
891 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
893 bail!("BFT failed to retrieve the certificates for past round {round}");
896 };
897 traversal = certificates
899 .into_values()
900 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
901 .collect();
902 }
903 Ok(traversal.contains(&previous_certificate))
904 }
905}
906
907impl<N: Network> BFT<N> {
908 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
910 let BFTReceiver {
911 mut rx_primary_round,
912 mut rx_primary_certificate,
913 mut rx_sync_bft_dag_at_bootup,
914 mut rx_sync_bft,
915 mut rx_sync_block_committed,
916 } = bft_receiver;
917
918 let self_ = self.clone();
920 self.spawn(async move {
921 while let Some((current_round, callback)) = rx_primary_round.recv().await {
922 callback.send(self_.update_to_next_round(current_round)).ok();
923 }
924 });
925
926 let self_ = self.clone();
928 self.spawn(async move {
929 while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
930 let result = self_.update_dag::<true, false>(certificate).await;
932 callback.send(result).ok();
935 }
936 });
937
938 let self_ = self.clone();
940 self.spawn(async move {
941 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
942 self_.sync_bft_dag_at_bootup(certificates).await;
943 }
944 });
945
946 let self_ = self.clone();
948 self.spawn(async move {
949 while let Some((certificate, callback)) = rx_sync_bft.recv().await {
950 let result = self_.update_dag::<true, true>(certificate).await;
952 callback.send(result).ok();
955 }
956 });
957
958 let self_ = self.clone();
964 self.spawn(async move {
965 while let Some((leader_certificate, callback)) = rx_sync_block_committed.recv().await {
966 self_.dag.write().commit(&leader_certificate, self_.storage().max_gc_rounds());
967 callback.send(Ok(())).ok();
968 }
969 });
970 }
971
972 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
979 let mut dag = self.dag.write();
981
982 for certificate in certificates {
984 dag.commit(&certificate, self.storage().max_gc_rounds());
985 }
986 }
987
988 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
990 self.handles.lock().push(tokio::spawn(future));
991 }
992
993 pub async fn shut_down(&self) {
995 info!("Shutting down the BFT...");
996 let _lock = self.lock.lock().await;
998 self.primary.shut_down().await;
1000 self.handles.lock().iter().for_each(|handle| handle.abort());
1002 }
1003}
1004
1005#[cfg(test)]
1006mod tests {
1007 use crate::{
1008 BFT,
1009 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
1010 helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
1011 };
1012
1013 use snarkos_account::Account;
1014 use snarkos_node_bft_ledger_service::{LedgerService, MockLedgerService};
1015 use snarkos_node_bft_storage_service::BFTMemoryService;
1016 use snarkos_node_sync::BlockSync;
1017 use snarkos_utilities::NodeDataDir;
1018
1019 use snarkvm::{
1020 console::account::{Address, PrivateKey},
1021 ledger::{
1022 committee::{
1023 Committee,
1024 test_helpers::{sample_committee, sample_committee_for_round, sample_committee_for_round_and_members},
1025 },
1026 narwhal::{
1027 BatchCertificate,
1028 batch_certificate::test_helpers::{
1029 sample_batch_certificate,
1030 sample_batch_certificate_for_round,
1031 sample_batch_certificate_for_round_with_committee,
1032 },
1033 },
1034 },
1035 utilities::TestRng,
1036 };
1037
1038 use anyhow::Result;
1039 use indexmap::{IndexMap, IndexSet};
1040 use std::sync::Arc;
1041
1042 type CurrentNetwork = snarkvm::console::network::MainnetV0;
1043
1044 fn sample_test_instance(
1046 committee_round: Option<u64>,
1047 max_gc_rounds: u64,
1048 rng: &mut TestRng,
1049 ) -> (
1050 Committee<CurrentNetwork>,
1051 Account<CurrentNetwork>,
1052 Arc<MockLedgerService<CurrentNetwork>>,
1053 Storage<CurrentNetwork>,
1054 ) {
1055 let committee = match committee_round {
1056 Some(round) => sample_committee_for_round(round, rng),
1057 None => sample_committee(rng),
1058 };
1059 let account = Account::new(rng).unwrap();
1060 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1061 let transmissions = Arc::new(BFTMemoryService::new());
1062 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1063
1064 (committee, account, ledger, storage)
1065 }
1066
1067 fn initialize_bft(
1069 account: Account<CurrentNetwork>,
1070 storage: Storage<CurrentNetwork>,
1071 ledger: Arc<MockLedgerService<CurrentNetwork>>,
1072 ) -> anyhow::Result<BFT<CurrentNetwork>> {
1073 let block_sync = Arc::new(BlockSync::new(ledger.clone()));
1075 BFT::new(
1077 account.clone(),
1078 storage.clone(),
1079 ledger.clone(),
1080 block_sync,
1081 None,
1082 &[],
1083 false,
1084 NodeDataDir::new_test(None),
1085 None,
1086 )
1087 }
1088
1089 #[test]
1090 #[tracing_test::traced_test]
1091 fn test_is_leader_quorum_odd() -> Result<()> {
1092 let rng = &mut TestRng::default();
1093
1094 let mut certificates = IndexSet::new();
1096 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1097 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1098 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1099 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1100
1101 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1103 1,
1104 vec![
1105 certificates[0].author(),
1106 certificates[1].author(),
1107 certificates[2].author(),
1108 certificates[3].author(),
1109 ],
1110 rng,
1111 );
1112
1113 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1115 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1117 let account = Account::new(rng)?;
1119 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1121 assert!(bft.is_timer_expired());
1122 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1124 assert!(!result);
1126 for certificate in certificates.iter() {
1128 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1129 }
1130 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1132 assert!(result); let leader_certificate = sample_batch_certificate(rng);
1135 *bft.leader_certificate.write() = Some(leader_certificate);
1136 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1138 assert!(result); Ok(())
1141 }
1142
1143 #[test]
1144 #[tracing_test::traced_test]
1145 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1146 let rng = &mut TestRng::default();
1147
1148 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1150 assert_eq!(committee.starting_round(), 1);
1151 assert_eq!(storage.current_round(), 1);
1152 assert_eq!(storage.max_gc_rounds(), 10);
1153
1154 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1156 assert!(bft.is_timer_expired());
1157
1158 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1161 assert!(!result);
1162 Ok(())
1163 }
1164
1165 #[test]
1166 #[tracing_test::traced_test]
1167 fn test_is_leader_quorum_even() -> Result<()> {
1168 let rng = &mut TestRng::default();
1169
1170 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1172 assert_eq!(committee.starting_round(), 2);
1173 assert_eq!(storage.current_round(), 2);
1174 assert_eq!(storage.max_gc_rounds(), 10);
1175
1176 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1178 assert!(bft.is_timer_expired());
1179
1180 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1182 assert!(!result);
1183 Ok(())
1184 }
1185
1186 #[test]
1187 #[tracing_test::traced_test]
1188 fn test_is_even_round_ready() -> Result<()> {
1189 let rng = &mut TestRng::default();
1190
1191 let mut certificates = IndexSet::new();
1193 certificates.insert(sample_batch_certificate_for_round(2, rng));
1194 certificates.insert(sample_batch_certificate_for_round(2, rng));
1195 certificates.insert(sample_batch_certificate_for_round(2, rng));
1196 certificates.insert(sample_batch_certificate_for_round(2, rng));
1197
1198 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1200 2,
1201 vec![
1202 certificates[0].author(),
1203 certificates[1].author(),
1204 certificates[2].author(),
1205 certificates[3].author(),
1206 ],
1207 rng,
1208 );
1209
1210 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1212 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1214 let account = Account::new(rng)?;
1216
1217 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1219 assert!(bft.is_timer_expired());
1220
1221 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1223 *bft.leader_certificate.write() = Some(leader_certificate);
1224 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1225 assert!(!result);
1227 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1229 assert!(result);
1230
1231 let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1233 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1235 if !bft_timer.is_timer_expired() {
1236 assert!(!result);
1237 }
1238 let leader_certificate_timeout =
1240 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1241 std::thread::sleep(leader_certificate_timeout);
1242 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1244 if bft_timer.is_timer_expired() {
1245 assert!(result);
1246 } else {
1247 assert!(!result);
1248 }
1249
1250 Ok(())
1251 }
1252
1253 #[test]
1254 #[tracing_test::traced_test]
1255 fn test_update_leader_certificate_odd() -> Result<()> {
1256 let rng = &mut TestRng::default();
1257
1258 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1260 assert_eq!(storage.max_gc_rounds(), 10);
1261
1262 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1264 assert!(bft.is_timer_expired());
1265
1266 let result = bft.update_leader_certificate_to_even_round(1);
1268 assert!(!result);
1269 Ok(())
1270 }
1271
1272 #[test]
1273 #[tracing_test::traced_test]
1274 fn test_update_leader_certificate_bad_round() -> Result<()> {
1275 let rng = &mut TestRng::default();
1276
1277 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1279 assert_eq!(storage.max_gc_rounds(), 10);
1280
1281 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1283
1284 let result = bft.update_leader_certificate_to_even_round(6);
1286 assert!(!result);
1287 Ok(())
1288 }
1289
1290 #[test]
1291 #[tracing_test::traced_test]
1292 fn test_update_leader_certificate_even() -> Result<()> {
1293 let rng = &mut TestRng::default();
1294
1295 let current_round = 3;
1297
1298 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1300 current_round,
1301 rng,
1302 );
1303
1304 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1306 2,
1307 vec![
1308 certificates[0].author(),
1309 certificates[1].author(),
1310 certificates[2].author(),
1311 certificates[3].author(),
1312 ],
1313 rng,
1314 );
1315
1316 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1318
1319 let transmissions = Arc::new(BFTMemoryService::new());
1321 let storage = Storage::new(ledger.clone(), transmissions, 10);
1322 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1323 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1324 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1325 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1326 assert_eq!(storage.current_round(), 2);
1327
1328 let leader = committee.get_leader(2).unwrap();
1330 let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1331
1332 let account = Account::new(rng)?;
1334 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1335
1336 *bft.leader_certificate.write() = Some(leader_certificate);
1338
1339 let result = bft.update_leader_certificate_to_even_round(2);
1342 assert!(result);
1343
1344 Ok(())
1345 }
1346
1347 #[tokio::test]
1348 #[tracing_test::traced_test]
1349 async fn test_order_dag_with_dfs() -> Result<()> {
1350 let rng = &mut TestRng::default();
1351
1352 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1354
1355 let previous_round = 2; let current_round = previous_round + 1;
1358
1359 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1361 current_round,
1362 rng,
1363 );
1364
1365 {
1369 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1371 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1373
1374 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1376
1377 for certificate in previous_certificates.clone() {
1379 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1380 }
1381
1382 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1384 assert!(result.is_ok());
1385 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1386 assert_eq!(candidate_certificates.len(), 1);
1387 let expected_certificates = vec![certificate.clone()];
1388 assert_eq!(
1389 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1390 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1391 );
1392 assert_eq!(candidate_certificates, expected_certificates);
1393 }
1394
1395 {
1399 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1401 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1403
1404 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1406
1407 for certificate in previous_certificates.clone() {
1409 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1410 }
1411
1412 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1414 assert!(result.is_ok());
1415 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1416 assert_eq!(candidate_certificates.len(), 5);
1417 let expected_certificates = vec![
1418 previous_certificates[0].clone(),
1419 previous_certificates[1].clone(),
1420 previous_certificates[2].clone(),
1421 previous_certificates[3].clone(),
1422 certificate,
1423 ];
1424 assert_eq!(
1425 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1426 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1427 );
1428 assert_eq!(candidate_certificates, expected_certificates);
1429 }
1430
1431 Ok(())
1432 }
1433
1434 #[test]
1435 #[tracing_test::traced_test]
1436 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1437 let rng = &mut TestRng::default();
1438
1439 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1441 assert_eq!(committee.starting_round(), 1);
1442 assert_eq!(storage.current_round(), 1);
1443 assert_eq!(storage.max_gc_rounds(), 1);
1444
1445 let previous_round = 2; let current_round = previous_round + 1;
1448
1449 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1451 current_round,
1452 rng,
1453 );
1454 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1456
1457 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1461
1462 let error_msg = format!(
1464 "Missing previous certificate {} for round {previous_round}",
1465 crate::helpers::fmt_id(previous_certificate_ids[3]),
1466 );
1467
1468 let result = bft.order_dag_with_dfs::<false>(certificate);
1470 assert!(result.is_err());
1471 assert_eq!(result.unwrap_err().to_string(), error_msg);
1472 Ok(())
1473 }
1474
1475 #[tokio::test]
1476 async fn test_bft_gc_on_commit() -> Result<()> {
1477 let rng = &mut TestRng::default();
1478
1479 let max_gc_rounds = 1;
1481 let committee_round = 0;
1482 let commit_round = 2;
1483 let current_round = commit_round + 1;
1484
1485 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1487 current_round,
1488 rng,
1489 );
1490
1491 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1493 committee_round,
1494 vec![
1495 certificates[0].author(),
1496 certificates[1].author(),
1497 certificates[2].author(),
1498 certificates[3].author(),
1499 ],
1500 rng,
1501 );
1502
1503 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1505
1506 let transmissions = Arc::new(BFTMemoryService::new());
1508 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1509 for certificate in certificates.iter() {
1511 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1512 }
1513
1514 let leader = committee.get_leader(commit_round).unwrap();
1516 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1517
1518 let account = Account::new(rng)?;
1520 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1521
1522 *bft.dag.write() = mock_dag_with_modified_last_committed_round(commit_round);
1524
1525 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1527
1528 for certificate in certificates {
1530 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1531 }
1532
1533 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1535
1536 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1538
1539 Ok(())
1540 }
1541
1542 #[tokio::test]
1543 #[tracing_test::traced_test]
1544 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1545 let rng = &mut TestRng::default();
1546
1547 let max_gc_rounds = 1;
1549 let committee_round = 0;
1550 let commit_round = 2;
1551 let current_round = commit_round + 1;
1552
1553 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1555 current_round,
1556 rng,
1557 );
1558
1559 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1561 committee_round,
1562 vec![
1563 certificates[0].author(),
1564 certificates[1].author(),
1565 certificates[2].author(),
1566 certificates[3].author(),
1567 ],
1568 rng,
1569 );
1570
1571 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1573
1574 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1576 for certificate in certificates.iter() {
1578 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1579 }
1580
1581 let leader = committee.get_leader(commit_round).unwrap();
1583 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1584
1585 let account = Account::new(rng)?;
1587 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1588
1589 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1591
1592 for certificate in certificates.clone() {
1594 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1595 }
1596
1597 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1599
1600 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1604 let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1606
1607 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1609
1610 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1612
1613 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1615 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1616
1617 for certificate in certificates {
1619 let certificate_round = certificate.round();
1620 let certificate_id = certificate.id();
1621 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1623 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1626 }
1627
1628 Ok(())
1629 }
1630
1631 #[tokio::test]
1632 #[tracing_test::traced_test]
1633 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1634 let rng = &mut TestRng::default();
1641
1642 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1644 let committee_round = 0;
1645 let commit_round = 2;
1646 let current_round = commit_round + 1;
1647 let next_round = current_round + 1;
1648
1649 let (round_to_certificates_map, committee) = {
1651 let private_keys = vec![
1652 PrivateKey::new(rng).unwrap(),
1653 PrivateKey::new(rng).unwrap(),
1654 PrivateKey::new(rng).unwrap(),
1655 PrivateKey::new(rng).unwrap(),
1656 ];
1657 let addresses = vec![
1658 Address::try_from(private_keys[0])?,
1659 Address::try_from(private_keys[1])?,
1660 Address::try_from(private_keys[2])?,
1661 Address::try_from(private_keys[3])?,
1662 ];
1663 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1664 committee_round,
1665 addresses,
1666 rng,
1667 );
1668 let mut round_to_certificates_map: IndexMap<
1670 u64,
1671 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1672 > = IndexMap::new();
1673 let mut previous_certificates = IndexSet::with_capacity(4);
1674 for _ in 0..4 {
1676 previous_certificates.insert(sample_batch_certificate(rng));
1677 }
1678 for round in 0..commit_round + 3 {
1679 let mut current_certificates = IndexSet::new();
1680 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1681 IndexSet::new()
1682 } else {
1683 previous_certificates.iter().map(|c| c.id()).collect()
1684 };
1685 let transmission_ids =
1686 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1687 .into_iter()
1688 .collect::<IndexSet<_>>();
1689 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1690 let committee_id = committee.id();
1691 for (i, private_key_1) in private_keys.iter().enumerate() {
1692 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1693 private_key_1,
1694 round,
1695 timestamp,
1696 committee_id,
1697 transmission_ids.clone(),
1698 previous_certificate_ids.clone(),
1699 rng,
1700 )
1701 .unwrap();
1702 let mut signatures = IndexSet::with_capacity(4);
1703 for (j, private_key_2) in private_keys.iter().enumerate() {
1704 if i != j {
1705 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1706 }
1707 }
1708 let certificate =
1709 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1710 current_certificates.insert(certificate);
1711 }
1712 round_to_certificates_map.insert(round, current_certificates.clone());
1714 previous_certificates = current_certificates.clone();
1715 }
1716 (round_to_certificates_map, committee)
1717 };
1718
1719 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1721 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1723 let leader = committee.get_leader(commit_round).unwrap();
1725 let next_leader = committee.get_leader(next_round).unwrap();
1726 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1728 for i in 1..=commit_round {
1729 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1730 if i == commit_round {
1731 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1733 if let Some(c) = leader_certificate {
1734 pre_shutdown_certificates.push(c.clone());
1735 }
1736 continue;
1737 }
1738 pre_shutdown_certificates.extend(certificates);
1739 }
1740 for certificate in pre_shutdown_certificates.iter() {
1741 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1742 }
1743 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1745 Vec::new();
1746 for j in commit_round..=commit_round + 2 {
1747 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1748 post_shutdown_certificates.extend(certificate);
1749 }
1750 for certificate in post_shutdown_certificates.iter() {
1751 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1752 }
1753 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1755 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1756
1757 let account = Account::new(rng)?;
1759 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1760
1761 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1763
1764 for certificate in pre_shutdown_certificates.clone() {
1766 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1767 }
1768
1769 for certificate in post_shutdown_certificates.clone() {
1771 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1772 }
1773 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1775 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1776 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1777
1778 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1782
1783 let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1785
1786 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1788
1789 for certificate in post_shutdown_certificates.iter() {
1791 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1792 }
1793 for certificate in post_shutdown_certificates.clone() {
1794 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1795 }
1796 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1798 let commit_subdag_metadata_bootup =
1799 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1800 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1801 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1802
1803 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1807
1808 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1810 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1811 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1812 assert!(
1813 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1814 );
1815
1816 for certificate in pre_shutdown_certificates.clone() {
1818 let certificate_round = certificate.round();
1819 let certificate_id = certificate.id();
1820 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1822 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1823 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1826 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1827 }
1828
1829 for certificate in committed_certificates_bootup.clone() {
1831 let certificate_round = certificate.round();
1832 let certificate_id = certificate.id();
1833 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1835 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1836 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1839 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1840 }
1841
1842 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1844
1845 Ok(())
1846 }
1847
1848 #[tokio::test]
1849 #[tracing_test::traced_test]
1850 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1851 let rng = &mut TestRng::default();
1858
1859 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1861 let committee_round = 0;
1862 let commit_round = 2;
1863 let current_round = commit_round + 1;
1864 let next_round = current_round + 1;
1865
1866 let (round_to_certificates_map, committee) = {
1868 let private_keys = vec![
1869 PrivateKey::new(rng).unwrap(),
1870 PrivateKey::new(rng).unwrap(),
1871 PrivateKey::new(rng).unwrap(),
1872 PrivateKey::new(rng).unwrap(),
1873 ];
1874 let addresses = vec![
1875 Address::try_from(private_keys[0])?,
1876 Address::try_from(private_keys[1])?,
1877 Address::try_from(private_keys[2])?,
1878 Address::try_from(private_keys[3])?,
1879 ];
1880 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1881 committee_round,
1882 addresses,
1883 rng,
1884 );
1885 let mut round_to_certificates_map: IndexMap<
1887 u64,
1888 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1889 > = IndexMap::new();
1890 let mut previous_certificates = IndexSet::with_capacity(4);
1891 for _ in 0..4 {
1893 previous_certificates.insert(sample_batch_certificate(rng));
1894 }
1895 for round in 0..=commit_round + 2 {
1896 let mut current_certificates = IndexSet::new();
1897 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1898 IndexSet::new()
1899 } else {
1900 previous_certificates.iter().map(|c| c.id()).collect()
1901 };
1902 let transmission_ids =
1903 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1904 .into_iter()
1905 .collect::<IndexSet<_>>();
1906 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1907 let committee_id = committee.id();
1908 for (i, private_key_1) in private_keys.iter().enumerate() {
1909 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1910 private_key_1,
1911 round,
1912 timestamp,
1913 committee_id,
1914 transmission_ids.clone(),
1915 previous_certificate_ids.clone(),
1916 rng,
1917 )
1918 .unwrap();
1919 let mut signatures = IndexSet::with_capacity(4);
1920 for (j, private_key_2) in private_keys.iter().enumerate() {
1921 if i != j {
1922 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1923 }
1924 }
1925 let certificate =
1926 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1927 current_certificates.insert(certificate);
1928 }
1929 round_to_certificates_map.insert(round, current_certificates.clone());
1931 previous_certificates = current_certificates.clone();
1932 }
1933 (round_to_certificates_map, committee)
1934 };
1935
1936 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1938 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1940 let leader = committee.get_leader(commit_round).unwrap();
1942 let next_leader = committee.get_leader(next_round).unwrap();
1943 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1945 for i in 1..=commit_round {
1946 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1947 if i == commit_round {
1948 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1950 if let Some(c) = leader_certificate {
1951 pre_shutdown_certificates.push(c.clone());
1952 }
1953 continue;
1954 }
1955 pre_shutdown_certificates.extend(certificates);
1956 }
1957 for certificate in pre_shutdown_certificates.iter() {
1958 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1959 }
1960 let account = Account::new(rng)?;
1962 let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1963
1964 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1966 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1968
1969 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1971 Vec::new();
1972 for j in commit_round..=commit_round + 2 {
1973 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1974 post_shutdown_certificates.extend(certificate);
1975 }
1976 for certificate in post_shutdown_certificates.iter() {
1977 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1978 }
1979
1980 for certificate in post_shutdown_certificates.clone() {
1982 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1983 }
1984
1985 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1987 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1988 let committed_certificates = commit_subdag.values().flatten();
1989
1990 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1992 for committed_certificate in committed_certificates.clone() {
1993 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1994 }
1995 }
1996 Ok(())
1997 }
1998
1999 #[test_log::test(tokio::test)]
2001 async fn test_commit_via_is_linked() {
2002 let rng = &mut TestRng::default();
2003
2004 let committee_round = 0;
2005 let leader_round_1 = 2;
2006 let leader_round_2 = 4; let max_gc_rounds = 50;
2008
2009 let num_authors = 4;
2011 let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2012 let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2013
2014 let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2015 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2016 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2017 let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2018
2019 let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2020
2021 let round1_certs: IndexSet<_> = (0..num_authors)
2023 .map(|idx| {
2024 let author = &private_keys[idx];
2025 let endorsements: Vec<_> = private_keys
2026 .iter()
2027 .enumerate()
2028 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2029 .collect();
2030
2031 sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2032 })
2033 .collect();
2034 certificates_by_round.insert(1, round1_certs.clone());
2035
2036 let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2037 let mut leader1_certificate = None;
2038
2039 let round2_certs: IndexSet<_> = (0..num_authors)
2040 .map(|idx| {
2041 let author = &private_keys[idx];
2042 let endorsements: Vec<_> = private_keys
2043 .iter()
2044 .enumerate()
2045 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2046 .collect();
2047 let cert = sample_batch_certificate_for_round_with_committee(
2048 leader_round_1,
2049 round1_certs.iter().map(|c| c.id()).collect(),
2050 author,
2051 &endorsements[..],
2052 rng,
2053 );
2054
2055 if cert.author() == leader1 {
2056 leader1_certificate = Some(cert.clone());
2057 }
2058 cert
2059 })
2060 .collect();
2061 certificates_by_round.insert(leader_round_1, round2_certs.clone());
2062
2063 let round3_certs: IndexSet<_> = (0..num_authors)
2064 .map(|idx| {
2065 let author = &private_keys[idx];
2066 let endorsements: Vec<_> = private_keys
2067 .iter()
2068 .enumerate()
2069 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2070 .collect();
2071
2072 let previous_certificate_ids: IndexSet<_> = round2_certs
2073 .iter()
2074 .filter_map(|cert| {
2075 if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2077 })
2078 .collect();
2079
2080 sample_batch_certificate_for_round_with_committee(
2081 leader_round_1 + 1,
2082 previous_certificate_ids,
2083 author,
2084 &endorsements[..],
2085 rng,
2086 )
2087 })
2088 .collect();
2089 certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2090
2091 let leader_certificate_1 = leader1_certificate.unwrap();
2093 assert!(
2094 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2095 "Leader certificate 1 should not be committed yet"
2096 );
2097 assert_eq!(bft.dag.read().last_committed_round(), 0);
2098
2099 let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2100 let round4_certs: IndexSet<_> = (0..num_authors)
2101 .map(|idx| {
2102 let endorsements: Vec<_> = private_keys
2103 .iter()
2104 .enumerate()
2105 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2106 .collect();
2107
2108 sample_batch_certificate_for_round_with_committee(
2109 leader_round_2,
2110 round3_certs.iter().map(|c| c.id()).collect(),
2111 &private_keys[idx],
2112 &endorsements[..],
2113 rng,
2114 )
2115 })
2116 .collect();
2117 certificates_by_round.insert(leader_round_2, round4_certs.clone());
2118
2119 for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2121 storage.testing_only_insert_certificate_testing_only(certificate.clone());
2122 bft.update_dag::<false, false>(certificate).await.unwrap();
2123 }
2124
2125 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2126
2127 assert!(
2128 bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2129 "Leader certificate 1 should be linked to leader certificate 2"
2130 );
2131
2132 bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2134
2135 assert!(
2137 bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2138 "Leader certificate for round 2 should be committed when committing at round 4"
2139 );
2140
2141 assert!(
2143 bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2144 "Leader certificate for round 4 should be committed"
2145 );
2146
2147 assert_eq!(bft.dag.read().last_committed_round(), 4);
2148 }
2149
2150 #[test_log::test(tokio::test)]
2151 async fn test_commit_via_is_linked_with_skipped_anchor() {
2152 let rng = &mut TestRng::default();
2153
2154 let committee_round = 0;
2155 let leader_round_1 = 2;
2156 let leader_round_2 = 4;
2157 let max_gc_rounds = 50;
2158
2159 let num_authors = 4;
2160 let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2161 let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2162
2163 let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2164 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2165 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2166 let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2167
2168 let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2169
2170 let round1_certs: IndexSet<_> = (0..num_authors)
2172 .map(|idx| {
2173 let author = &private_keys[idx];
2174 let endorsements: Vec<_> = private_keys
2175 .iter()
2176 .enumerate()
2177 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2178 .collect();
2179
2180 sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2181 })
2182 .collect();
2183 certificates_by_round.insert(1, round1_certs.clone());
2184
2185 let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2186 let mut leader1_certificate = None;
2187
2188 let round2_certs: IndexSet<_> = (0..num_authors)
2189 .map(|idx| {
2190 let author = &private_keys[idx];
2191 let endorsements: Vec<_> = private_keys
2192 .iter()
2193 .enumerate()
2194 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2195 .collect();
2196 let cert = sample_batch_certificate_for_round_with_committee(
2197 leader_round_1,
2198 round1_certs.iter().map(|c| c.id()).collect(),
2199 author,
2200 &endorsements[..],
2201 rng,
2202 );
2203
2204 if cert.author() == leader1 {
2205 leader1_certificate = Some(cert.clone());
2206 }
2207 cert
2208 })
2209 .collect();
2210 certificates_by_round.insert(leader_round_1, round2_certs.clone());
2211
2212 let round3_certs: IndexSet<_> = (0..num_authors)
2213 .map(|idx| {
2214 let author = &private_keys[idx];
2215 let endorsements: Vec<_> = private_keys
2216 .iter()
2217 .enumerate()
2218 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2219 .collect();
2220
2221 let previous_certificate_ids: IndexSet<_> = round2_certs
2222 .iter()
2223 .filter_map(|cert| {
2224 if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2226 })
2227 .collect();
2228
2229 sample_batch_certificate_for_round_with_committee(
2230 leader_round_1 + 1,
2231 previous_certificate_ids,
2232 author,
2233 &endorsements[..],
2234 rng,
2235 )
2236 })
2237 .collect();
2238 certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2239
2240 let leader_certificate_1 = leader1_certificate.unwrap();
2242 assert!(
2243 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2244 "Leader certificate 1 should not be committed yet"
2245 );
2246
2247 let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2248 let round4_certs: IndexSet<_> = (0..num_authors)
2249 .map(|idx| {
2250 let endorsements: Vec<_> = private_keys
2251 .iter()
2252 .enumerate()
2253 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2254 .collect();
2255
2256 let previous_certificate_ids: IndexSet<_> = round3_certs
2258 .iter()
2259 .filter_map(|cert| if cert.author() == leader1 { None } else { Some(cert.id()) })
2260 .collect();
2261
2262 sample_batch_certificate_for_round_with_committee(
2263 leader_round_2,
2264 previous_certificate_ids,
2265 &private_keys[idx],
2266 &endorsements[..],
2267 rng,
2268 )
2269 })
2270 .collect();
2271 certificates_by_round.insert(leader_round_2, round4_certs.clone());
2272
2273 for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2275 storage.testing_only_insert_certificate_testing_only(certificate.clone());
2276 bft.update_dag::<false, false>(certificate).await.unwrap();
2277 }
2278
2279 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2280
2281 assert!(
2282 !bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2283 "Leader certificate 1 should not be linked to leader certificate 2"
2284 );
2285 assert_eq!(bft.dag.read().last_committed_round(), 0);
2286
2287 bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2289
2290 assert!(
2292 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2293 "Leader certificate for round 2 should not be committed when committing at round 4"
2294 );
2295
2296 assert!(
2298 bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2299 "Leader certificate for round 4 should be committed"
2300 );
2301 assert_eq!(bft.dag.read().last_committed_round(), 4);
2302 }
2303}