1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18 Primary,
19 helpers::{
20 BFTReceiver,
21 ConsensusSender,
22 DAG,
23 PrimaryReceiver,
24 PrimarySender,
25 Storage,
26 fmt_id,
27 init_bft_channels,
28 now,
29 },
30};
31use snarkos_account::Account;
32use snarkos_node_bft_ledger_service::LedgerService;
33use snarkvm::{
34 console::account::Address,
35 ledger::{
36 block::Transaction,
37 committee::Committee,
38 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
39 puzzle::{Solution, SolutionID},
40 },
41 prelude::{Field, Network, Result, bail, ensure},
42};
43
44use colored::Colorize;
45use indexmap::{IndexMap, IndexSet};
46use parking_lot::{Mutex, RwLock};
47use std::{
48 collections::{BTreeMap, HashSet},
49 future::Future,
50 net::SocketAddr,
51 sync::{
52 Arc,
53 atomic::{AtomicI64, Ordering},
54 },
55};
56use tokio::{
57 sync::{Mutex as TMutex, OnceCell, oneshot},
58 task::JoinHandle,
59};
60
61#[derive(Clone)]
62pub struct BFT<N: Network> {
63 primary: Primary<N>,
65 dag: Arc<RwLock<DAG<N>>>,
67 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
69 leader_certificate_timer: Arc<AtomicI64>,
71 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
73 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
75 lock: Arc<TMutex<()>>,
77}
78
79impl<N: Network> BFT<N> {
80 pub fn new(
82 account: Account<N>,
83 storage: Storage<N>,
84 ledger: Arc<dyn LedgerService<N>>,
85 ip: Option<SocketAddr>,
86 trusted_validators: &[SocketAddr],
87 dev: Option<u16>,
88 ) -> Result<Self> {
89 Ok(Self {
90 primary: Primary::new(account, storage, ledger, ip, trusted_validators, dev)?,
91 dag: Default::default(),
92 leader_certificate: Default::default(),
93 leader_certificate_timer: Default::default(),
94 consensus_sender: Default::default(),
95 handles: Default::default(),
96 lock: Default::default(),
97 })
98 }
99
100 pub async fn run(
102 &mut self,
103 consensus_sender: Option<ConsensusSender<N>>,
104 primary_sender: PrimarySender<N>,
105 primary_receiver: PrimaryReceiver<N>,
106 ) -> Result<()> {
107 info!("Starting the BFT instance...");
108 let (bft_sender, bft_receiver) = init_bft_channels::<N>();
110 self.start_handlers(bft_receiver);
112 self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await?;
114 if let Some(consensus_sender) = consensus_sender {
117 self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
118 }
119 Ok(())
120 }
121
122 pub fn is_synced(&self) -> bool {
124 self.primary.is_synced()
125 }
126
127 pub const fn primary(&self) -> &Primary<N> {
129 &self.primary
130 }
131
132 pub const fn storage(&self) -> &Storage<N> {
134 self.primary.storage()
135 }
136
137 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
139 self.primary.ledger()
140 }
141
142 pub fn leader(&self) -> Option<Address<N>> {
144 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
145 }
146
147 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
149 &self.leader_certificate
150 }
151}
152
153impl<N: Network> BFT<N> {
154 pub fn num_unconfirmed_transmissions(&self) -> usize {
156 self.primary.num_unconfirmed_transmissions()
157 }
158
159 pub fn num_unconfirmed_ratifications(&self) -> usize {
161 self.primary.num_unconfirmed_ratifications()
162 }
163
164 pub fn num_unconfirmed_solutions(&self) -> usize {
166 self.primary.num_unconfirmed_solutions()
167 }
168
169 pub fn num_unconfirmed_transactions(&self) -> usize {
171 self.primary.num_unconfirmed_transactions()
172 }
173}
174
175impl<N: Network> BFT<N> {
176 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
178 self.primary.worker_transmission_ids()
179 }
180
181 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
183 self.primary.worker_transmissions()
184 }
185
186 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
188 self.primary.worker_solutions()
189 }
190
191 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
193 self.primary.worker_transactions()
194 }
195}
196
197impl<N: Network> BFT<N> {
198 fn update_to_next_round(&self, current_round: u64) -> bool {
200 let storage_round = self.storage().current_round();
202 if current_round < storage_round {
203 debug!(
204 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
205 );
206 return false;
207 }
208
209 let is_ready = match current_round % 2 == 0 {
211 true => self.update_leader_certificate_to_even_round(current_round),
212 false => self.is_leader_quorum_or_nonleaders_available(current_round),
213 };
214
215 #[cfg(feature = "metrics")]
216 {
217 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
218 if start > 0 {
220 let end = now();
221 let elapsed = std::time::Duration::from_secs((end - start) as u64);
222 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
223 }
224 }
225
226 if current_round % 2 == 0 {
228 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
230 if !is_ready {
232 trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
233 }
234 let leader_round = leader_certificate.round();
236 match leader_round == current_round {
237 true => {
238 info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
239 #[cfg(feature = "metrics")]
240 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
241 }
242 false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
243 }
244 } else {
245 match is_ready {
246 true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
247 false => info!("{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed()),
248 }
249 }
250 }
251
252 if is_ready {
254 if let Err(e) = self.storage().increment_to_next_round(current_round) {
256 warn!("BFT failed to increment to the next round from round {current_round} - {e}");
257 return false;
258 }
259 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
261 }
262
263 is_ready
264 }
265
266 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
272 let current_round = self.storage().current_round();
274 if current_round != even_round {
276 warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
277 return false;
278 }
279
280 if current_round % 2 != 0 || current_round < 2 {
282 error!("BFT cannot update the leader certificate in an odd round");
283 return false;
284 }
285
286 let current_certificates = self.storage().get_certificates_for_round(current_round);
288 if current_certificates.is_empty() {
290 *self.leader_certificate.write() = None;
292 return false;
293 }
294
295 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
297 Ok(committee) => committee,
298 Err(e) => {
299 error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
300 return false;
301 }
302 };
303 let leader = match self.ledger().latest_leader() {
305 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
306 _ => {
307 let computed_leader = match committee_lookback.get_leader(current_round) {
309 Ok(leader) => leader,
310 Err(e) => {
311 error!("BFT failed to compute the leader for the even round {current_round} - {e}");
312 return false;
313 }
314 };
315
316 self.ledger().update_latest_leader(current_round, computed_leader);
318
319 computed_leader
320 }
321 };
322 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
324 *self.leader_certificate.write() = leader_certificate.cloned();
325
326 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
327 }
328
329 fn is_even_round_ready_for_next_round(
333 &self,
334 certificates: IndexSet<BatchCertificate<N>>,
335 committee: Committee<N>,
336 current_round: u64,
337 ) -> bool {
338 let authors = certificates.into_iter().map(|c| c.author()).collect();
340 if !committee.is_quorum_threshold_reached(&authors) {
342 trace!("BFT failed to reach quorum threshold in even round {current_round}");
343 return false;
344 }
345 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
347 if leader_certificate.round() == current_round {
348 return true;
349 }
350 }
351 if self.is_timer_expired() {
353 debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
354 return true;
355 }
356 false
358 }
359
360 fn is_timer_expired(&self) -> bool {
362 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
363 }
364
365 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
370 let current_round = self.storage().current_round();
372 if current_round != odd_round {
374 warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
375 return false;
376 }
377 if current_round % 2 != 1 {
379 error!("BFT does not compute stakes for the leader certificate in an even round");
380 return false;
381 }
382 let current_certificates = self.storage().get_certificates_for_round(current_round);
384 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
386 Ok(committee) => committee,
387 Err(e) => {
388 error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
389 return false;
390 }
391 };
392 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
394 if !committee_lookback.is_quorum_threshold_reached(&authors) {
396 trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
397 return false;
398 }
399 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
401 return true;
403 };
404 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
406 leader_certificate.id(),
407 current_certificates,
408 &committee_lookback,
409 );
410 stake_with_leader >= committee_lookback.availability_threshold()
412 || stake_without_leader >= committee_lookback.quorum_threshold()
413 || self.is_timer_expired()
414 }
415
416 fn compute_stake_for_leader_certificate(
418 &self,
419 leader_certificate_id: Field<N>,
420 current_certificates: IndexSet<BatchCertificate<N>>,
421 current_committee: &Committee<N>,
422 ) -> (u64, u64) {
423 if current_certificates.is_empty() {
425 return (0, 0);
426 }
427
428 let mut stake_with_leader = 0u64;
430 let mut stake_without_leader = 0u64;
432 for certificate in current_certificates {
434 let stake = current_committee.get_stake(certificate.author());
436 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
438 true => stake_with_leader = stake_with_leader.saturating_add(stake),
440 false => stake_without_leader = stake_without_leader.saturating_add(stake),
442 }
443 }
444 (stake_with_leader, stake_without_leader)
446 }
447}
448
449impl<N: Network> BFT<N> {
450 async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
452 &self,
453 certificate: BatchCertificate<N>,
454 ) -> Result<()> {
455 let _lock = self.lock.lock().await;
457
458 let certificate_round = certificate.round();
460 self.dag.write().insert(certificate);
462
463 let commit_round = certificate_round.saturating_sub(1);
465 if commit_round % 2 != 0 || commit_round < 2 {
467 return Ok(());
468 }
469 if commit_round <= self.dag.read().last_committed_round() {
471 return Ok(());
472 }
473
474 trace!("Checking if the leader is ready to be committed for round {commit_round}...");
476
477 let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
479 bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
480 };
481
482 let leader = match self.ledger().latest_leader() {
484 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
485 _ => {
486 let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
488 bail!("BFT failed to compute the leader for commit round {commit_round}");
489 };
490
491 self.ledger().update_latest_leader(commit_round, computed_leader);
493
494 computed_leader
495 }
496 };
497
498 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
500 else {
501 trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
502 return Ok(());
503 };
504 let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
506 bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
508 };
509 let authors = certificates
511 .values()
512 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
513 true => Some(c.author()),
514 false => None,
515 })
516 .collect();
517 if !committee_lookback.is_availability_threshold_reached(&authors) {
519 trace!("BFT is not ready to commit {commit_round}");
521 return Ok(());
522 }
523
524 info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
526
527 self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
529 }
530
531 async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
533 &self,
534 leader_certificate: BatchCertificate<N>,
535 ) -> Result<()> {
536 let latest_leader_round = leader_certificate.round();
538 let mut leader_certificates = vec![leader_certificate.clone()];
541 {
542 let leader_round = leader_certificate.round();
544
545 let mut current_certificate = leader_certificate;
546 for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
547 {
548 let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
550 Ok(committee) => committee,
551 Err(e) => {
552 bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
553 }
554 };
555 let leader = match self.ledger().latest_leader() {
557 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
558 _ => {
559 let computed_leader = match previous_committee_lookback.get_leader(round) {
561 Ok(leader) => leader,
562 Err(e) => {
563 bail!("BFT failed to compute the leader for the even round {round} - {e}");
564 }
565 };
566
567 self.ledger().update_latest_leader(round, computed_leader);
569
570 computed_leader
571 }
572 };
573 let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
575 else {
576 continue;
577 };
578 if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
580 leader_certificates.push(previous_certificate.clone());
582 current_certificate = previous_certificate;
584 }
585 }
586 }
587
588 for leader_certificate in leader_certificates.into_iter().rev() {
590 let leader_round = leader_certificate.round();
592 let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
594 Ok(subdag) => subdag,
595 Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
596 };
597 if !IS_SYNCING {
599 let mut transmissions = IndexMap::new();
601 let mut seen_transaction_ids = IndexSet::new();
603 let mut seen_solution_ids = IndexSet::new();
605 for certificate in commit_subdag.values().flatten() {
607 for transmission_id in certificate.transmission_ids() {
609 match transmission_id {
613 TransmissionID::Solution(solution_id, _) => {
614 if seen_solution_ids.contains(&solution_id) {
616 continue;
617 }
618 }
619 TransmissionID::Transaction(transaction_id, _) => {
620 if seen_transaction_ids.contains(transaction_id) {
622 continue;
623 }
624 }
625 TransmissionID::Ratification => {
626 bail!("Ratifications are currently not supported in the BFT.")
627 }
628 }
629 if transmissions.contains_key(transmission_id) {
631 continue;
632 }
633 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
636 continue;
637 }
638 let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
640 bail!(
641 "BFT failed to retrieve transmission '{}.{}' from round {}",
642 fmt_id(transmission_id),
643 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
644 certificate.round()
645 );
646 };
647 match transmission_id {
649 TransmissionID::Solution(id, _) => {
650 seen_solution_ids.insert(id);
651 }
652 TransmissionID::Transaction(id, _) => {
653 seen_transaction_ids.insert(id);
654 }
655 TransmissionID::Ratification => {}
656 }
657 transmissions.insert(*transmission_id, transmission);
659 }
660 }
661 let subdag = Subdag::from(commit_subdag.clone())?;
664 let anchor_round = subdag.anchor_round();
666 let num_transmissions = transmissions.len();
668 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
670
671 ensure!(
673 anchor_round == leader_round,
674 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
675 );
676
677 if let Some(consensus_sender) = self.consensus_sender.get() {
679 let (callback_sender, callback_receiver) = oneshot::channel();
681 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
683 match callback_receiver.await {
685 Ok(Ok(())) => (), Ok(Err(e)) => {
687 error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
688 return Ok(());
689 }
690 Err(e) => {
691 error!("BFT failed to receive the callback for round {anchor_round} - {e}");
692 return Ok(());
693 }
694 }
695 }
696
697 info!(
698 "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
699 );
700 }
701
702 let mut dag_write = self.dag.write();
704 for certificate in commit_subdag.values().flatten() {
705 dag_write.commit(certificate, self.storage().max_gc_rounds());
706 }
707 }
708
709 self.storage().garbage_collect_certificates(latest_leader_round);
711
712 Ok(())
713 }
714
715 fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
717 &self,
718 leader_certificate: BatchCertificate<N>,
719 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
720 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
722 let mut already_ordered = HashSet::new();
724 let mut buffer = vec![leader_certificate];
726 while let Some(certificate) = buffer.pop() {
728 commit.entry(certificate.round()).or_default().insert(certificate.clone());
730
731 let previous_round = certificate.round().saturating_sub(1);
733 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
734 continue;
735 }
736 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
740 if already_ordered.contains(previous_certificate_id) {
742 continue;
743 }
744 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
746 continue;
747 }
748 if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
750 continue;
751 }
752
753 let previous_certificate = {
755 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
757 Some(previous_certificate) => previous_certificate,
759 None => match self.storage().get_certificate(*previous_certificate_id) {
761 Some(previous_certificate) => previous_certificate,
763 None => bail!(
765 "Missing previous certificate {} for round {previous_round}",
766 fmt_id(previous_certificate_id)
767 ),
768 },
769 }
770 };
771 already_ordered.insert(previous_certificate.id());
773 buffer.push(previous_certificate);
775 }
776 }
777 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
779 Ok(commit)
781 }
782
783 fn is_linked(
785 &self,
786 previous_certificate: BatchCertificate<N>,
787 current_certificate: BatchCertificate<N>,
788 ) -> Result<bool> {
789 let mut traversal = vec![current_certificate.clone()];
791 for round in (previous_certificate.round()..current_certificate.round()).rev() {
793 let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
795 bail!("BFT failed to retrieve the certificates for past round {round}");
798 };
799 traversal = certificates
801 .into_values()
802 .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
803 .collect();
804 }
805 Ok(traversal.contains(&previous_certificate))
806 }
807}
808
809impl<N: Network> BFT<N> {
810 fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
812 let BFTReceiver {
813 mut rx_primary_round,
814 mut rx_primary_certificate,
815 mut rx_sync_bft_dag_at_bootup,
816 mut rx_sync_bft,
817 } = bft_receiver;
818
819 let self_ = self.clone();
821 self.spawn(async move {
822 while let Some((current_round, callback)) = rx_primary_round.recv().await {
823 callback.send(self_.update_to_next_round(current_round)).ok();
824 }
825 });
826
827 let self_ = self.clone();
829 self.spawn(async move {
830 while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
831 let result = self_.update_dag::<true, false>(certificate).await;
833 callback.send(result).ok();
836 }
837 });
838
839 let self_ = self.clone();
841 self.spawn(async move {
842 while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
843 self_.sync_bft_dag_at_bootup(certificates).await;
844 }
845 });
846
847 let self_ = self.clone();
849 self.spawn(async move {
850 while let Some((certificate, callback)) = rx_sync_bft.recv().await {
851 let result = self_.update_dag::<true, true>(certificate).await;
853 callback.send(result).ok();
856 }
857 });
858 }
859
860 async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
867 let mut dag = self.dag.write();
869
870 for certificate in certificates {
872 dag.commit(&certificate, self.storage().max_gc_rounds());
873 }
874 }
875
876 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
878 self.handles.lock().push(tokio::spawn(future));
879 }
880
881 pub async fn shut_down(&self) {
883 info!("Shutting down the BFT...");
884 let _lock = self.lock.lock().await;
886 self.primary.shut_down().await;
888 self.handles.lock().iter().for_each(|handle| handle.abort());
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use crate::{BFT, MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, helpers::Storage};
896 use snarkos_account::Account;
897 use snarkos_node_bft_ledger_service::MockLedgerService;
898 use snarkos_node_bft_storage_service::BFTMemoryService;
899 use snarkvm::{
900 console::account::{Address, PrivateKey},
901 ledger::{
902 committee::Committee,
903 narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
904 },
905 utilities::TestRng,
906 };
907
908 use anyhow::Result;
909 use indexmap::{IndexMap, IndexSet};
910 use std::sync::Arc;
911
912 type CurrentNetwork = snarkvm::console::network::MainnetV0;
913
914 fn sample_test_instance(
916 committee_round: Option<u64>,
917 max_gc_rounds: u64,
918 rng: &mut TestRng,
919 ) -> (
920 Committee<CurrentNetwork>,
921 Account<CurrentNetwork>,
922 Arc<MockLedgerService<CurrentNetwork>>,
923 Storage<CurrentNetwork>,
924 ) {
925 let committee = match committee_round {
926 Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
927 None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
928 };
929 let account = Account::new(rng).unwrap();
930 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
931 let transmissions = Arc::new(BFTMemoryService::new());
932 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
933
934 (committee, account, ledger, storage)
935 }
936
937 #[test]
938 #[tracing_test::traced_test]
939 fn test_is_leader_quorum_odd() -> Result<()> {
940 let rng = &mut TestRng::default();
941
942 let mut certificates = IndexSet::new();
944 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
945 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
946 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
947 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
948
949 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
951 1,
952 vec![
953 certificates[0].author(),
954 certificates[1].author(),
955 certificates[2].author(),
956 certificates[3].author(),
957 ],
958 rng,
959 );
960
961 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
963 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
965 let account = Account::new(rng)?;
967 let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
969 assert!(bft.is_timer_expired());
970 let result = bft.is_leader_quorum_or_nonleaders_available(1);
972 assert!(!result);
974 for certificate in certificates.iter() {
976 storage.testing_only_insert_certificate_testing_only(certificate.clone());
977 }
978 let result = bft.is_leader_quorum_or_nonleaders_available(1);
980 assert!(result); let leader_certificate = sample_batch_certificate(rng);
983 *bft.leader_certificate.write() = Some(leader_certificate);
984 let result = bft.is_leader_quorum_or_nonleaders_available(1);
986 assert!(result); Ok(())
989 }
990
991 #[test]
992 #[tracing_test::traced_test]
993 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
994 let rng = &mut TestRng::default();
995
996 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
998 assert_eq!(committee.starting_round(), 1);
999 assert_eq!(storage.current_round(), 1);
1000 assert_eq!(storage.max_gc_rounds(), 10);
1001
1002 let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1004 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1009 assert!(!result);
1010 Ok(())
1011 }
1012
1013 #[test]
1014 #[tracing_test::traced_test]
1015 fn test_is_leader_quorum_even() -> Result<()> {
1016 let rng = &mut TestRng::default();
1017
1018 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1020 assert_eq!(committee.starting_round(), 2);
1021 assert_eq!(storage.current_round(), 2);
1022 assert_eq!(storage.max_gc_rounds(), 10);
1023
1024 let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1026 assert!(bft.is_timer_expired()); let result = bft.is_leader_quorum_or_nonleaders_available(2);
1030 assert!(!result);
1031 Ok(())
1032 }
1033
1034 #[test]
1035 #[tracing_test::traced_test]
1036 fn test_is_even_round_ready() -> Result<()> {
1037 let rng = &mut TestRng::default();
1038
1039 let mut certificates = IndexSet::new();
1041 certificates.insert(sample_batch_certificate_for_round(2, rng));
1042 certificates.insert(sample_batch_certificate_for_round(2, rng));
1043 certificates.insert(sample_batch_certificate_for_round(2, rng));
1044 certificates.insert(sample_batch_certificate_for_round(2, rng));
1045
1046 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1048 2,
1049 vec![
1050 certificates[0].author(),
1051 certificates[1].author(),
1052 certificates[2].author(),
1053 certificates[3].author(),
1054 ],
1055 rng,
1056 );
1057
1058 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1060 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1062 let account = Account::new(rng)?;
1064 let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
1066 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1068 *bft.leader_certificate.write() = Some(leader_certificate);
1069 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1070 assert!(!result);
1072 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1074 assert!(result);
1075
1076 let bft_timer = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
1078 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1080 if !bft_timer.is_timer_expired() {
1081 assert!(!result);
1082 }
1083 let leader_certificate_timeout =
1085 std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1086 std::thread::sleep(leader_certificate_timeout);
1087 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1089 if bft_timer.is_timer_expired() {
1090 assert!(result);
1091 } else {
1092 assert!(!result);
1093 }
1094
1095 Ok(())
1096 }
1097
1098 #[test]
1099 #[tracing_test::traced_test]
1100 fn test_update_leader_certificate_odd() -> Result<()> {
1101 let rng = &mut TestRng::default();
1102
1103 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1105 assert_eq!(storage.max_gc_rounds(), 10);
1106
1107 let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1109
1110 let result = bft.update_leader_certificate_to_even_round(1);
1112 assert!(!result);
1113 Ok(())
1114 }
1115
1116 #[test]
1117 #[tracing_test::traced_test]
1118 fn test_update_leader_certificate_bad_round() -> Result<()> {
1119 let rng = &mut TestRng::default();
1120
1121 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1123 assert_eq!(storage.max_gc_rounds(), 10);
1124
1125 let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1127
1128 let result = bft.update_leader_certificate_to_even_round(6);
1130 assert!(!result);
1131 Ok(())
1132 }
1133
1134 #[test]
1135 #[tracing_test::traced_test]
1136 fn test_update_leader_certificate_even() -> Result<()> {
1137 let rng = &mut TestRng::default();
1138
1139 let current_round = 3;
1141
1142 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1144 current_round,
1145 rng,
1146 );
1147
1148 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1150 2,
1151 vec![
1152 certificates[0].author(),
1153 certificates[1].author(),
1154 certificates[2].author(),
1155 certificates[3].author(),
1156 ],
1157 rng,
1158 );
1159
1160 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1162
1163 let transmissions = Arc::new(BFTMemoryService::new());
1165 let storage = Storage::new(ledger.clone(), transmissions, 10);
1166 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1167 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1168 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1169 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1170 assert_eq!(storage.current_round(), 2);
1171
1172 let leader = committee.get_leader(2).unwrap();
1174 let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1175
1176 let account = Account::new(rng)?;
1178 let bft = BFT::new(account, storage.clone(), ledger, None, &[], None)?;
1179
1180 *bft.leader_certificate.write() = Some(leader_certificate);
1182
1183 let result = bft.update_leader_certificate_to_even_round(2);
1186 assert!(result);
1187
1188 Ok(())
1189 }
1190
1191 #[tokio::test]
1192 #[tracing_test::traced_test]
1193 async fn test_order_dag_with_dfs() -> Result<()> {
1194 let rng = &mut TestRng::default();
1195
1196 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1198
1199 let previous_round = 2; let current_round = previous_round + 1;
1202
1203 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1205 current_round,
1206 rng,
1207 );
1208
1209 {
1213 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1215 let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], None)?;
1217
1218 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1220
1221 for certificate in previous_certificates.clone() {
1223 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1224 }
1225
1226 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1228 assert!(result.is_ok());
1229 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1230 assert_eq!(candidate_certificates.len(), 1);
1231 let expected_certificates = vec![certificate.clone()];
1232 assert_eq!(
1233 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1234 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1235 );
1236 assert_eq!(candidate_certificates, expected_certificates);
1237 }
1238
1239 {
1243 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1245 let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1247
1248 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1250
1251 for certificate in previous_certificates.clone() {
1253 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1254 }
1255
1256 let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1258 assert!(result.is_ok());
1259 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1260 assert_eq!(candidate_certificates.len(), 5);
1261 let expected_certificates = vec![
1262 previous_certificates[0].clone(),
1263 previous_certificates[1].clone(),
1264 previous_certificates[2].clone(),
1265 previous_certificates[3].clone(),
1266 certificate,
1267 ];
1268 assert_eq!(
1269 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1270 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1271 );
1272 assert_eq!(candidate_certificates, expected_certificates);
1273 }
1274
1275 Ok(())
1276 }
1277
1278 #[test]
1279 #[tracing_test::traced_test]
1280 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1281 let rng = &mut TestRng::default();
1282
1283 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1285 assert_eq!(committee.starting_round(), 1);
1286 assert_eq!(storage.current_round(), 1);
1287 assert_eq!(storage.max_gc_rounds(), 1);
1288
1289 let previous_round = 2; let current_round = previous_round + 1;
1292
1293 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1295 current_round,
1296 rng,
1297 );
1298 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1300
1301 let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1305
1306 let error_msg = format!(
1308 "Missing previous certificate {} for round {previous_round}",
1309 crate::helpers::fmt_id(previous_certificate_ids[3]),
1310 );
1311
1312 let result = bft.order_dag_with_dfs::<false>(certificate);
1314 assert!(result.is_err());
1315 assert_eq!(result.unwrap_err().to_string(), error_msg);
1316 Ok(())
1317 }
1318
1319 #[tokio::test]
1320 #[tracing_test::traced_test]
1321 async fn test_bft_gc_on_commit() -> Result<()> {
1322 let rng = &mut TestRng::default();
1323
1324 let max_gc_rounds = 1;
1326 let committee_round = 0;
1327 let commit_round = 2;
1328 let current_round = commit_round + 1;
1329
1330 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1332 current_round,
1333 rng,
1334 );
1335
1336 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1338 committee_round,
1339 vec![
1340 certificates[0].author(),
1341 certificates[1].author(),
1342 certificates[2].author(),
1343 certificates[3].author(),
1344 ],
1345 rng,
1346 );
1347
1348 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1350
1351 let transmissions = Arc::new(BFTMemoryService::new());
1353 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1354 for certificate in certificates.iter() {
1356 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1357 }
1358
1359 let leader = committee.get_leader(commit_round).unwrap();
1361 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1362
1363 let account = Account::new(rng)?;
1365 let bft = BFT::new(account, storage.clone(), ledger, None, &[], None)?;
1366 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1368
1369 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1371
1372 for certificate in certificates {
1374 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1375 }
1376
1377 bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1379
1380 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1382
1383 Ok(())
1384 }
1385
1386 #[tokio::test]
1387 #[tracing_test::traced_test]
1388 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1389 let rng = &mut TestRng::default();
1390
1391 let max_gc_rounds = 1;
1393 let committee_round = 0;
1394 let commit_round = 2;
1395 let current_round = commit_round + 1;
1396
1397 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1399 current_round,
1400 rng,
1401 );
1402
1403 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1405 committee_round,
1406 vec![
1407 certificates[0].author(),
1408 certificates[1].author(),
1409 certificates[2].author(),
1410 certificates[3].author(),
1411 ],
1412 rng,
1413 );
1414
1415 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1417
1418 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1420 for certificate in certificates.iter() {
1422 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1423 }
1424
1425 let leader = committee.get_leader(commit_round).unwrap();
1427 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1428
1429 let account = Account::new(rng)?;
1431 let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], None)?;
1432
1433 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1435
1436 for certificate in certificates.clone() {
1438 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1439 }
1440
1441 bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1443
1444 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1448 let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], None)?;
1450
1451 bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1453
1454 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1456
1457 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1459 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1460
1461 for certificate in certificates {
1463 let certificate_round = certificate.round();
1464 let certificate_id = certificate.id();
1465 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1467 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1470 }
1471
1472 Ok(())
1473 }
1474
1475 #[tokio::test]
1476 #[tracing_test::traced_test]
1477 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1478 let rng = &mut TestRng::default();
1485
1486 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1488 let committee_round = 0;
1489 let commit_round = 2;
1490 let current_round = commit_round + 1;
1491 let next_round = current_round + 1;
1492
1493 let (round_to_certificates_map, committee) = {
1495 let private_keys = vec![
1496 PrivateKey::new(rng).unwrap(),
1497 PrivateKey::new(rng).unwrap(),
1498 PrivateKey::new(rng).unwrap(),
1499 PrivateKey::new(rng).unwrap(),
1500 ];
1501 let addresses = vec![
1502 Address::try_from(private_keys[0])?,
1503 Address::try_from(private_keys[1])?,
1504 Address::try_from(private_keys[2])?,
1505 Address::try_from(private_keys[3])?,
1506 ];
1507 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1508 committee_round,
1509 addresses,
1510 rng,
1511 );
1512 let mut round_to_certificates_map: IndexMap<
1514 u64,
1515 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1516 > = IndexMap::new();
1517 let mut previous_certificates = IndexSet::with_capacity(4);
1518 for _ in 0..4 {
1520 previous_certificates.insert(sample_batch_certificate(rng));
1521 }
1522 for round in 0..commit_round + 3 {
1523 let mut current_certificates = IndexSet::new();
1524 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1525 IndexSet::new()
1526 } else {
1527 previous_certificates.iter().map(|c| c.id()).collect()
1528 };
1529 let transmission_ids =
1530 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1531 .into_iter()
1532 .collect::<IndexSet<_>>();
1533 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1534 let committee_id = committee.id();
1535 for (i, private_key_1) in private_keys.iter().enumerate() {
1536 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1537 private_key_1,
1538 round,
1539 timestamp,
1540 committee_id,
1541 transmission_ids.clone(),
1542 previous_certificate_ids.clone(),
1543 rng,
1544 )
1545 .unwrap();
1546 let mut signatures = IndexSet::with_capacity(4);
1547 for (j, private_key_2) in private_keys.iter().enumerate() {
1548 if i != j {
1549 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1550 }
1551 }
1552 let certificate =
1553 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1554 current_certificates.insert(certificate);
1555 }
1556 round_to_certificates_map.insert(round, current_certificates.clone());
1558 previous_certificates = current_certificates.clone();
1559 }
1560 (round_to_certificates_map, committee)
1561 };
1562
1563 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1565 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1567 let leader = committee.get_leader(commit_round).unwrap();
1569 let next_leader = committee.get_leader(next_round).unwrap();
1570 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1572 for i in 1..=commit_round {
1573 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1574 if i == commit_round {
1575 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1577 if let Some(c) = leader_certificate {
1578 pre_shutdown_certificates.push(c.clone());
1579 }
1580 continue;
1581 }
1582 pre_shutdown_certificates.extend(certificates);
1583 }
1584 for certificate in pre_shutdown_certificates.iter() {
1585 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1586 }
1587 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1589 Vec::new();
1590 for j in commit_round..=commit_round + 2 {
1591 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1592 post_shutdown_certificates.extend(certificate);
1593 }
1594 for certificate in post_shutdown_certificates.iter() {
1595 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1596 }
1597 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1599 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1600
1601 let account = Account::new(rng)?;
1603 let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], None)?;
1604
1605 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1607
1608 for certificate in pre_shutdown_certificates.clone() {
1610 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1611 }
1612
1613 for certificate in post_shutdown_certificates.clone() {
1615 assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1616 }
1617 let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1619 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1620 bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1621
1622 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1626
1627 let bootup_bft = BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], None)?;
1629
1630 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1632
1633 for certificate in post_shutdown_certificates.iter() {
1635 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1636 }
1637 for certificate in post_shutdown_certificates.clone() {
1638 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1639 }
1640 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1642 let commit_subdag_metadata_bootup =
1643 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1644 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1645 bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1646
1647 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1651
1652 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1654 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1655 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1656 assert!(
1657 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1658 );
1659
1660 for certificate in pre_shutdown_certificates.clone() {
1662 let certificate_round = certificate.round();
1663 let certificate_id = certificate.id();
1664 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1666 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1667 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1670 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1671 }
1672
1673 for certificate in committed_certificates_bootup.clone() {
1675 let certificate_round = certificate.round();
1676 let certificate_id = certificate.id();
1677 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1679 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1680 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1683 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1684 }
1685
1686 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1688
1689 Ok(())
1690 }
1691
1692 #[tokio::test]
1693 #[tracing_test::traced_test]
1694 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1695 let rng = &mut TestRng::default();
1702
1703 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1705 let committee_round = 0;
1706 let commit_round = 2;
1707 let current_round = commit_round + 1;
1708 let next_round = current_round + 1;
1709
1710 let (round_to_certificates_map, committee) = {
1712 let private_keys = vec![
1713 PrivateKey::new(rng).unwrap(),
1714 PrivateKey::new(rng).unwrap(),
1715 PrivateKey::new(rng).unwrap(),
1716 PrivateKey::new(rng).unwrap(),
1717 ];
1718 let addresses = vec![
1719 Address::try_from(private_keys[0])?,
1720 Address::try_from(private_keys[1])?,
1721 Address::try_from(private_keys[2])?,
1722 Address::try_from(private_keys[3])?,
1723 ];
1724 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1725 committee_round,
1726 addresses,
1727 rng,
1728 );
1729 let mut round_to_certificates_map: IndexMap<
1731 u64,
1732 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1733 > = IndexMap::new();
1734 let mut previous_certificates = IndexSet::with_capacity(4);
1735 for _ in 0..4 {
1737 previous_certificates.insert(sample_batch_certificate(rng));
1738 }
1739 for round in 0..=commit_round + 2 {
1740 let mut current_certificates = IndexSet::new();
1741 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1742 IndexSet::new()
1743 } else {
1744 previous_certificates.iter().map(|c| c.id()).collect()
1745 };
1746 let transmission_ids =
1747 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1748 .into_iter()
1749 .collect::<IndexSet<_>>();
1750 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1751 let committee_id = committee.id();
1752 for (i, private_key_1) in private_keys.iter().enumerate() {
1753 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1754 private_key_1,
1755 round,
1756 timestamp,
1757 committee_id,
1758 transmission_ids.clone(),
1759 previous_certificate_ids.clone(),
1760 rng,
1761 )
1762 .unwrap();
1763 let mut signatures = IndexSet::with_capacity(4);
1764 for (j, private_key_2) in private_keys.iter().enumerate() {
1765 if i != j {
1766 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1767 }
1768 }
1769 let certificate =
1770 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1771 current_certificates.insert(certificate);
1772 }
1773 round_to_certificates_map.insert(round, current_certificates.clone());
1775 previous_certificates = current_certificates.clone();
1776 }
1777 (round_to_certificates_map, committee)
1778 };
1779
1780 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1782 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1784 let leader = committee.get_leader(commit_round).unwrap();
1786 let next_leader = committee.get_leader(next_round).unwrap();
1787 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1789 for i in 1..=commit_round {
1790 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1791 if i == commit_round {
1792 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1794 if let Some(c) = leader_certificate {
1795 pre_shutdown_certificates.push(c.clone());
1796 }
1797 continue;
1798 }
1799 pre_shutdown_certificates.extend(certificates);
1800 }
1801 for certificate in pre_shutdown_certificates.iter() {
1802 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1803 }
1804 let account = Account::new(rng)?;
1806 let bootup_bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
1807 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1809 bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1811
1812 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1814 Vec::new();
1815 for j in commit_round..=commit_round + 2 {
1816 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1817 post_shutdown_certificates.extend(certificate);
1818 }
1819 for certificate in post_shutdown_certificates.iter() {
1820 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1821 }
1822
1823 for certificate in post_shutdown_certificates.clone() {
1825 assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1826 }
1827
1828 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1830 let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1831 let committed_certificates = commit_subdag.values().flatten();
1832
1833 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1835 for committed_certificate in committed_certificates.clone() {
1836 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1837 }
1838 }
1839 Ok(())
1840 }
1841}