1use crate::{
17 MAX_LEADER_CERTIFICATE_DELAY,
18 helpers::{ConsensusSender, DAG, PrimaryReceiver, PrimarySender, Storage, fmt_id, now},
19 primary::{Primary, PrimaryCallback},
20 sync::SyncCallback,
21};
22
23use snarkos_account::Account;
24use snarkos_node_bft_ledger_service::LedgerService;
25use snarkos_node_sync::{BlockSync, Ping};
26use snarkos_utilities::NodeDataDir;
27
28use snarkvm::{
29 console::account::Address,
30 ledger::{
31 block::Transaction,
32 committee::Committee,
33 narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
34 puzzle::{Solution, SolutionID},
35 },
36 prelude::{Field, Network, Result, bail, ensure},
37 utilities::flatten_error,
38};
39
40use anyhow::Context;
41use colored::Colorize;
42use indexmap::{IndexMap, IndexSet};
43#[cfg(feature = "locktick")]
44use locktick::parking_lot::RwLock;
45#[cfg(feature = "locktick")]
46use locktick::tokio::Mutex;
47#[cfg(not(feature = "locktick"))]
48use parking_lot::RwLock;
49use std::{
50 collections::{BTreeMap, HashSet},
51 net::SocketAddr,
52 sync::{
53 Arc,
54 atomic::{AtomicI64, Ordering},
55 },
56};
57#[cfg(not(feature = "locktick"))]
58use tokio::sync::Mutex;
59use tokio::sync::{OnceCell, oneshot};
60
61#[derive(Clone)]
62pub struct BFT<N: Network> {
63 primary: Primary<N>,
65 dag: Arc<RwLock<DAG<N>>>,
67 leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
69 leader_certificate_timer: Arc<AtomicI64>,
71 consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
73 commit_lock: Arc<Mutex<()>>,
79}
80
81impl<N: Network> BFT<N> {
82 #[allow(clippy::too_many_arguments)]
84 pub fn new(
85 account: Account<N>,
86 storage: Storage<N>,
87 ledger: Arc<dyn LedgerService<N>>,
88 block_sync: Arc<BlockSync<N>>,
89 ip: Option<SocketAddr>,
90 trusted_validators: &[SocketAddr],
91 trusted_peers_only: bool,
92 node_data_dir: NodeDataDir,
93 dev: Option<u16>,
94 ) -> Result<Self> {
95 Ok(Self {
96 primary: Primary::new(
97 account,
98 storage,
99 ledger,
100 block_sync,
101 ip,
102 trusted_validators,
103 trusted_peers_only,
104 node_data_dir,
105 dev,
106 )?,
107 dag: Default::default(),
108 leader_certificate: Default::default(),
109 leader_certificate_timer: Default::default(),
110 consensus_sender: Default::default(),
111 commit_lock: Default::default(),
112 })
113 }
114
115 pub async fn run(
120 &mut self,
121 ping: Option<Arc<Ping<N>>>,
122 consensus_sender: Option<ConsensusSender<N>>,
123 primary_sender: PrimarySender<N>,
124 primary_receiver: PrimaryReceiver<N>,
125 ) -> Result<()> {
126 info!("Starting the BFT instance...");
127 let primary_callback = Some(Arc::new(self.clone()) as Arc<dyn PrimaryCallback<N>>);
129
130 let sync_callback = Some(Arc::new(self.clone()) as Arc<dyn SyncCallback<N>>);
131
132 self.primary.run(ping, primary_callback, sync_callback, primary_sender, primary_receiver).await?;
134
135 if let Some(consensus_sender) = consensus_sender {
138 self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
139 }
140 Ok(())
141 }
142
143 pub fn is_synced(&self) -> bool {
145 self.primary.is_synced()
146 }
147
148 pub const fn primary(&self) -> &Primary<N> {
150 &self.primary
151 }
152
153 pub const fn storage(&self) -> &Storage<N> {
155 self.primary.storage()
156 }
157
158 pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
160 self.primary.ledger()
161 }
162
163 pub fn leader(&self) -> Option<Address<N>> {
165 self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
166 }
167
168 pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
170 &self.leader_certificate
171 }
172}
173
174impl<N: Network> BFT<N> {
175 pub fn num_unconfirmed_transmissions(&self) -> usize {
177 self.primary.num_unconfirmed_transmissions()
178 }
179
180 pub fn num_unconfirmed_ratifications(&self) -> usize {
182 self.primary.num_unconfirmed_ratifications()
183 }
184
185 pub fn num_unconfirmed_solutions(&self) -> usize {
187 self.primary.num_unconfirmed_solutions()
188 }
189
190 pub fn num_unconfirmed_transactions(&self) -> usize {
192 self.primary.num_unconfirmed_transactions()
193 }
194}
195
196impl<N: Network> BFT<N> {
197 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
199 self.primary.worker_transmission_ids()
200 }
201
202 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
204 self.primary.worker_transmissions()
205 }
206
207 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
209 self.primary.worker_solutions()
210 }
211
212 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
214 self.primary.worker_transactions()
215 }
216}
217
218#[async_trait::async_trait]
219impl<N: Network> PrimaryCallback<N> for BFT<N> {
220 fn try_advance_to_next_round(&self, current_round: u64) -> bool {
228 let storage_round = self.storage().current_round();
230 if current_round < storage_round {
231 debug!(
232 "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
233 );
234 return false;
235 }
236
237 let is_ready = match current_round.is_multiple_of(2) {
239 true => self.update_leader_certificate_to_even_round(current_round),
240 false => self.is_leader_quorum_or_nonleaders_available(current_round),
241 };
242
243 #[cfg(feature = "metrics")]
244 {
245 let start = self.leader_certificate_timer.load(Ordering::SeqCst);
246 if start > 0 {
248 let end = now();
249 let elapsed = std::time::Duration::from_secs((end - start) as u64);
250 metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
251 }
252 }
253
254 if current_round.is_multiple_of(2) {
256 if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
258 if !is_ready {
260 trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
261 }
262 let leader_round = leader_certificate.round();
264 match leader_round == current_round {
265 true => {
266 info!("Round {current_round} elected a leader - {}", leader_certificate.author());
267 #[cfg(feature = "metrics")]
268 metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
269 }
270 false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
271 }
272 } else {
273 match is_ready {
274 true => info!("Round {current_round} reached quorum without a leader"),
275 false => info!("{}", format!("Round {current_round} did not elect a leader (yet)").dimmed()),
276 }
277 }
278 }
279
280 if is_ready {
282 if let Err(err) = self
284 .storage()
285 .increment_to_next_round(current_round)
286 .with_context(|| format!("BFT failed to increment to the next round from round {current_round}"))
287 {
288 warn!("{}", &flatten_error(err));
289 return false;
290 }
291 self.leader_certificate_timer.store(now(), Ordering::SeqCst);
293 }
294
295 is_ready
296 }
297
298 async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
300 let certificate_round = certificate.round();
303
304 self.dag.write().insert(certificate);
306
307 let commit_round = certificate_round.saturating_sub(1);
309
310 if !commit_round.is_multiple_of(2) || commit_round < 2 {
314 return Ok(());
315 }
316 if commit_round <= self.dag.read().last_committed_round() {
318 return Ok(());
319 }
320
321 trace!("Checking if the leader is ready to be committed for round {commit_round}...");
323
324 let committee_lookback = self.ledger().get_committee_lookback_for_round(commit_round).with_context(|| {
326 format!("BFT failed to retrieve the committee with lag for commit round {commit_round}")
327 })?;
328
329 let leader = match self.ledger().latest_leader() {
331 Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
332 _ => {
333 let computed_leader = committee_lookback
335 .get_leader(commit_round)
336 .with_context(|| format!("BFT failed to compute the leader for commit round {commit_round}"))?;
337
338 self.ledger().update_latest_leader(commit_round, computed_leader);
340
341 computed_leader
342 }
343 };
344
345 let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
347 else {
348 trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
349 return Ok(());
350 };
351 let certificates = self.dag.read().get_certificates_for_round(certificate_round).with_context(|| {
353 format!("BFT failed to retrieve the certificates for certificate round {certificate_round}")
354 })?;
355
356 let certificate_committee_lookback =
358 self.ledger().get_committee_lookback_for_round(certificate_round).with_context(|| {
359 format!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}")
360 })?;
361
362 let authors = certificates
364 .values()
365 .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
366 true => Some(c.author()),
367 false => None,
368 })
369 .collect();
370
371 if certificate_committee_lookback.is_availability_threshold_reached(&authors) {
373 info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
374 self.commit_leader_certificate(leader_certificate).await
375 } else {
376 trace!(
377 "BFT is not ready to commit round {commit_round} with leader '{}' - Availability threshold has not been reached yet",
378 fmt_id(leader)
379 );
380 Ok(())
381 }
382 }
383}
384
385#[async_trait::async_trait]
386impl<N: Network> SyncCallback<N> for BFT<N> {
387 fn add_certificate_from_sync(&self, certificate: BatchCertificate<N>) {
389 self.dag.write().insert(certificate);
390 }
391
392 fn commit_certificate_from_sync(&self, certificate: &BatchCertificate<N>) {
394 self.dag.write().commit(certificate, self.storage().max_gc_rounds());
395 }
396}
397
398impl<N: Network> BFT<N> {
399 fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
405 let current_round = self.storage().current_round();
407 if current_round != even_round {
409 warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
410 return false;
411 }
412
413 if !current_round.is_multiple_of(2) || current_round < 2 {
415 error!("BFT cannot update the leader certificate in an odd round");
416 return false;
417 }
418
419 let current_certificates = self.storage().get_certificates_for_round(current_round);
421 if current_certificates.is_empty() {
423 *self.leader_certificate.write() = None;
425 return false;
426 }
427
428 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
430 Ok(committee) => committee,
431 Err(err) => {
432 let err = err.context(format!(
433 "BFT failed to retrieve the committee lookback for the even round {current_round}"
434 ));
435 warn!("{}", &flatten_error(err));
436 return false;
437 }
438 };
439 let leader = match self.ledger().latest_leader() {
441 Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
442 _ => {
443 let computed_leader = match committee_lookback.get_leader(current_round) {
445 Ok(leader) => leader,
446 Err(err) => {
447 let err =
448 err.context(format!("BFT failed to compute the leader for the even round {current_round}"));
449 error!("{}", &flatten_error(err));
450 return false;
451 }
452 };
453
454 self.ledger().update_latest_leader(current_round, computed_leader);
456
457 computed_leader
458 }
459 };
460 let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
462 *self.leader_certificate.write() = leader_certificate.cloned();
463
464 self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
465 }
466
467 fn is_even_round_ready_for_next_round(
471 &self,
472 certificates: IndexSet<BatchCertificate<N>>,
473 committee: Committee<N>,
474 current_round: u64,
475 ) -> bool {
476 let authors = certificates.into_iter().map(|c| c.author()).collect();
478 if !committee.is_quorum_threshold_reached(&authors) {
480 trace!("BFT failed to reach quorum threshold in even round {current_round}");
481 return false;
482 }
483 if let Some(leader_certificate) = self.leader_certificate.read().as_ref()
485 && leader_certificate.round() == current_round
486 {
487 return true;
488 }
489 if self.is_timer_expired() {
491 debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
492 return true;
493 }
494 false
496 }
497
498 fn is_timer_expired(&self) -> bool {
502 self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY.as_secs() as i64 <= now()
503 }
504
505 fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
510 let current_round = self.storage().current_round();
512 if current_round != odd_round {
514 warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
515 return false;
516 }
517 if current_round % 2 != 1 {
519 error!("BFT does not compute stakes for the leader certificate in an even round");
520 return false;
521 }
522 let current_certificates = self.storage().get_certificates_for_round(current_round);
524 let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
526 Ok(committee) => committee,
527 Err(err) => {
528 let err = err.context(format!(
529 "BFT failed to retrieve the committee lookback for the odd round {current_round}"
530 ));
531 error!("{}", &flatten_error(err));
532 return false;
533 }
534 };
535 let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
537 if !committee_lookback.is_quorum_threshold_reached(&authors) {
539 trace!("BFT failed reach quorum threshold in odd round {current_round}.");
540 return false;
541 }
542 let Some(leader_certificate) = self.leader_certificate.read().clone() else {
544 return true;
546 };
547 let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
549 leader_certificate.id(),
550 current_certificates,
551 &committee_lookback,
552 );
553 stake_with_leader >= committee_lookback.availability_threshold()
555 || stake_without_leader >= committee_lookback.quorum_threshold()
556 || self.is_timer_expired()
557 }
558
559 fn compute_stake_for_leader_certificate(
561 &self,
562 leader_certificate_id: Field<N>,
563 current_certificates: IndexSet<BatchCertificate<N>>,
564 current_committee: &Committee<N>,
565 ) -> (u64, u64) {
566 if current_certificates.is_empty() {
568 return (0, 0);
569 }
570
571 let mut stake_with_leader = 0u64;
573 let mut stake_without_leader = 0u64;
575 for certificate in current_certificates {
577 let stake = current_committee.get_stake(certificate.author());
579 match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
581 true => stake_with_leader = stake_with_leader.saturating_add(stake),
583 false => stake_without_leader = stake_without_leader.saturating_add(stake),
585 }
586 }
587 (stake_with_leader, stake_without_leader)
589 }
590}
591
592impl<N: Network> BFT<N> {
593 async fn commit_leader_certificate(&self, leader_certificate: BatchCertificate<N>) -> Result<()> {
595 #[cfg(feature = "metrics")]
596 let start = std::time::Instant::now();
597 #[cfg(debug_assertions)]
598 trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());
599
600 let _commit_guard = self.commit_lock.lock().await;
603
604 let latest_leader_round = leader_certificate.round();
606
607 let mut leader_certificates = vec![leader_certificate.clone()];
610 let skip_consensus;
617 {
618 let dag = self.dag.read();
621
622 if latest_leader_round < dag.last_committed_round() {
624 trace!("Skipping already-committed leader round {latest_leader_round}");
625 return Ok(());
626 }
627 skip_consensus = latest_leader_round == dag.last_committed_round();
628
629 #[cfg(debug_assertions)]
630 trace!("Attempting to commit leader certificate for round {}...", latest_leader_round);
631
632 let mut current_certificate = leader_certificate;
633 for round in (dag.last_committed_round() + 2..=latest_leader_round.saturating_sub(2)).rev().step_by(2) {
634 let previous_committee_lookback =
636 self.ledger().get_committee_lookback_for_round(round).with_context(|| {
637 format!("BFT failed to retrieve a previous committee lookback for the even round {round}")
638 })?;
639
640 let leader = match self.ledger().latest_leader() {
642 Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
643 _ => {
644 let computed_leader = previous_committee_lookback
646 .get_leader(round)
647 .with_context(|| format!("BFT failed to compute the leader for the even round {round}"))?;
648
649 self.ledger().update_latest_leader(round, computed_leader);
651
652 computed_leader
653 }
654 };
655 let Some(previous_certificate) = dag.get_certificate_for_round_with_author(round, leader) else {
657 continue;
658 };
659 if dag.is_linked(previous_certificate.clone(), current_certificate.clone())? {
661 leader_certificates.push(previous_certificate.clone());
663 current_certificate = previous_certificate;
665 } else {
666 #[cfg(debug_assertions)]
667 trace!(
668 "Skipping anchor for round {round} as it is not linked to the most recent committed leader certificate"
669 );
670 }
671 }
672 }
673
674 for leader_certificate in leader_certificates.into_iter().rev() {
676 let leader_round = leader_certificate.round();
678 let commit_subdag =
680 self.order_dag_with_dfs(leader_certificate).with_context(|| "BFT failed to order the DAG with DFS")?;
681 let mut transmissions = IndexMap::new();
683 let mut seen_transaction_ids = IndexSet::new();
685 let mut seen_solution_ids = IndexSet::new();
687 for certificate in commit_subdag.values().flatten() {
689 for transmission_id in certificate.transmission_ids() {
691 match transmission_id {
695 TransmissionID::Solution(solution_id, _) => {
696 if seen_solution_ids.contains(&solution_id) {
698 continue;
699 }
700 }
701 TransmissionID::Transaction(transaction_id, _) => {
702 if seen_transaction_ids.contains(transaction_id) {
704 continue;
705 }
706 }
707 TransmissionID::Ratification => {
708 bail!("Ratifications are currently not supported in the BFT.")
709 }
710 }
711 if transmissions.contains_key(transmission_id) {
713 continue;
714 }
715 if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
718 continue;
719 }
720 let transmission = self.storage().get_transmission(*transmission_id).with_context(|| {
723 format!(
724 "BFT failed to retrieve transmission '{}.{}' from round {}",
725 fmt_id(transmission_id),
726 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
727 certificate.round()
728 )
729 })?;
730 match transmission_id {
732 TransmissionID::Solution(id, _) => {
733 seen_solution_ids.insert(id);
734 }
735 TransmissionID::Transaction(id, _) => {
736 seen_transaction_ids.insert(id);
737 }
738 TransmissionID::Ratification => {}
739 }
740 transmissions.insert(*transmission_id, transmission);
742 }
743 }
744 let subdag = Subdag::from(commit_subdag.clone())?;
747 let anchor_round = subdag.anchor_round();
749 let num_transmissions = transmissions.len();
751 let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
753
754 ensure!(
756 anchor_round == leader_round,
757 "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
758 );
759
760 if !skip_consensus {
762 if let Some(consensus_sender) = self.consensus_sender.get() {
763 let (callback_sender, callback_receiver) = oneshot::channel();
765 consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
767 match callback_receiver.await {
769 Ok(Ok(_)) => (),
770 Ok(Err(err)) => {
771 let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
772 error!("{}", &flatten_error(err));
773 return Ok(());
774 }
775 Err(err) => {
776 let err: anyhow::Error = err.into();
777 let err =
778 err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
779 error!("{}", flatten_error(err));
780 return Ok(());
781 }
782 }
783 }
784 }
785
786 info!(
787 "Committing a subDAG with anchor round {anchor_round} and {num_transmissions} transmissions: {subdag_metadata:?}",
788 );
789
790 {
792 let mut dag_write = self.dag.write();
793 let mut count = 0;
794 for certificate in commit_subdag.values().flatten() {
795 dag_write.commit(certificate, self.storage().max_gc_rounds());
796 count += 1;
797 }
798
799 trace!("Committed {count} certificates to the DAG");
800 }
801
802 #[cfg(feature = "telemetry")]
804 self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
805 }
806
807 self.storage()
823 .garbage_collect_certificates(latest_leader_round)
824 .with_context(|| "BFT failed to garbage collect certificates")?;
825
826 #[cfg(feature = "metrics")]
827 metrics::histogram(metrics::bft::COMMIT_LEADER_CERTIFICATE_LATENCY, start.elapsed().as_secs_f64());
828 Ok(())
829 }
830
831 fn order_dag_with_dfs(
833 &self,
834 leader_certificate: BatchCertificate<N>,
835 ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
836 let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
838 let mut already_ordered = HashSet::new();
840 let mut buffer = vec![leader_certificate];
842 while let Some(certificate) = buffer.pop() {
844 commit.entry(certificate.round()).or_default().insert(certificate.clone());
846
847 let previous_round = certificate.round().saturating_sub(1);
852 if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
853 continue;
854 }
855 for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
859 if already_ordered.contains(previous_certificate_id) {
861 continue;
862 }
863 if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
865 continue;
866 }
867 if self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
869 continue;
870 }
871
872 let previous_certificate = {
874 match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
876 Some(previous_certificate) => previous_certificate,
878 None => match self.storage().get_certificate(*previous_certificate_id) {
880 Some(previous_certificate) => previous_certificate,
882 None => bail!(
884 "Missing previous certificate {} for round {previous_round}",
885 fmt_id(previous_certificate_id)
886 ),
887 },
888 }
889 };
890 already_ordered.insert(previous_certificate.id());
892 buffer.push(previous_certificate);
894 }
895 }
896 commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
898 Ok(commit)
900 }
901
902 pub async fn shut_down(&self) {
904 info!("Shutting down the BFT...");
905 self.primary.shut_down().await;
907 }
908}
909
910#[cfg(test)]
911impl<N: Network> BFT<N> {
912 pub fn testing_only_latest_committed_round(&self) -> u64 {
915 self.dag.read().last_committed_round()
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use crate::{
922 BFT,
923 MAX_LEADER_CERTIFICATE_DELAY,
924 PrimaryCallback,
925 helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
926 sync::SyncCallback,
927 };
928
929 use snarkos_account::Account;
930 use snarkos_node_bft_ledger_service::{LedgerService, MockLedgerService};
931 use snarkos_node_bft_storage_service::BFTMemoryService;
932 use snarkos_node_network::ConnectionMode;
933 use snarkos_node_sync::BlockSync;
934 use snarkos_utilities::NodeDataDir;
935
936 use snarkvm::{
937 console::account::{Address, PrivateKey},
938 ledger::{
939 committee::{
940 Committee,
941 test_helpers::{sample_committee, sample_committee_for_round, sample_committee_for_round_and_members},
942 },
943 narwhal::{
944 BatchCertificate,
945 batch_certificate::test_helpers::{
946 sample_batch_certificate,
947 sample_batch_certificate_for_round,
948 sample_batch_certificate_for_round_with_committee,
949 },
950 },
951 },
952 utilities::TestRng,
953 };
954
955 use anyhow::Result;
956 use indexmap::{IndexMap, IndexSet};
957 use std::sync::Arc;
958
959 type CurrentNetwork = snarkvm::console::network::MainnetV0;
960
961 fn sample_test_instance(
963 committee_round: Option<u64>,
964 max_gc_rounds: u64,
965 rng: &mut TestRng,
966 ) -> (
967 Committee<CurrentNetwork>,
968 Account<CurrentNetwork>,
969 Arc<MockLedgerService<CurrentNetwork>>,
970 Storage<CurrentNetwork>,
971 ) {
972 let committee = match committee_round {
973 Some(round) => sample_committee_for_round(round, rng),
974 None => sample_committee(rng),
975 };
976 let account = Account::new(rng).unwrap();
977 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
978 let transmissions = Arc::new(BFTMemoryService::new());
979 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds).unwrap();
980
981 (committee, account, ledger, storage)
982 }
983
984 fn initialize_bft(
986 account: Account<CurrentNetwork>,
987 storage: Storage<CurrentNetwork>,
988 ledger: Arc<MockLedgerService<CurrentNetwork>>,
989 ) -> anyhow::Result<BFT<CurrentNetwork>> {
990 let block_sync = Arc::new(BlockSync::new(ledger.clone(), ConnectionMode::Gateway));
992 BFT::new(
994 account.clone(),
995 storage.clone(),
996 ledger.clone(),
997 block_sync,
998 None,
999 &[],
1000 false,
1001 NodeDataDir::new_test(None),
1002 None,
1003 )
1004 }
1005
1006 #[test]
1007 #[tracing_test::traced_test]
1008 fn test_is_leader_quorum_odd() -> Result<()> {
1009 let rng = &mut TestRng::default();
1010
1011 let mut certificates = IndexSet::new();
1013 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1014 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1015 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1016 certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1017
1018 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1020 1,
1021 vec![
1022 certificates[0].author(),
1023 certificates[1].author(),
1024 certificates[2].author(),
1025 certificates[3].author(),
1026 ],
1027 rng,
1028 );
1029
1030 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1032 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10).unwrap();
1034 let account = Account::new(rng)?;
1036 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1038 assert!(bft.is_timer_expired());
1039 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1041 assert!(!result);
1043 for certificate in certificates.iter() {
1045 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1046 }
1047 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1049 assert!(result); let leader_certificate = sample_batch_certificate(rng);
1052 *bft.leader_certificate.write() = Some(leader_certificate);
1053 let result = bft.is_leader_quorum_or_nonleaders_available(1);
1055 assert!(result); Ok(())
1058 }
1059
1060 #[test]
1061 #[tracing_test::traced_test]
1062 fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1063 let rng = &mut TestRng::default();
1064
1065 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1067 assert_eq!(committee.starting_round(), 1);
1068 assert_eq!(storage.current_round(), 1);
1069 assert_eq!(storage.max_gc_rounds(), 10);
1070
1071 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1073 assert!(bft.is_timer_expired());
1074
1075 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1078 assert!(!result);
1079 Ok(())
1080 }
1081
1082 #[test]
1083 #[tracing_test::traced_test]
1084 fn test_is_leader_quorum_even() -> Result<()> {
1085 let rng = &mut TestRng::default();
1086
1087 let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1089 assert_eq!(committee.starting_round(), 2);
1090 assert_eq!(storage.current_round(), 2);
1091 assert_eq!(storage.max_gc_rounds(), 10);
1092
1093 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1095 assert!(bft.is_timer_expired());
1096
1097 let result = bft.is_leader_quorum_or_nonleaders_available(2);
1099 assert!(!result);
1100 Ok(())
1101 }
1102
1103 #[test]
1104 #[tracing_test::traced_test]
1105 fn test_is_even_round_ready() -> Result<()> {
1106 let rng = &mut TestRng::default();
1107
1108 let mut certificates = IndexSet::new();
1110 certificates.insert(sample_batch_certificate_for_round(2, rng));
1111 certificates.insert(sample_batch_certificate_for_round(2, rng));
1112 certificates.insert(sample_batch_certificate_for_round(2, rng));
1113 certificates.insert(sample_batch_certificate_for_round(2, rng));
1114
1115 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1117 2,
1118 vec![
1119 certificates[0].author(),
1120 certificates[1].author(),
1121 certificates[2].author(),
1122 certificates[3].author(),
1123 ],
1124 rng,
1125 );
1126
1127 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1129 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10).unwrap();
1131 let account = Account::new(rng)?;
1133
1134 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1136 assert!(bft.is_timer_expired());
1137
1138 let leader_certificate = sample_batch_certificate_for_round(2, rng);
1140 *bft.leader_certificate.write() = Some(leader_certificate);
1141 let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1142 assert!(!result);
1144 let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1146 assert!(result);
1147
1148 let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1150 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1152 if !bft_timer.is_timer_expired() {
1153 assert!(!result);
1154 }
1155 std::thread::sleep(MAX_LEADER_CERTIFICATE_DELAY);
1157 let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1159 if bft_timer.is_timer_expired() {
1160 assert!(result);
1161 } else {
1162 assert!(!result);
1163 }
1164
1165 Ok(())
1166 }
1167
1168 #[test]
1169 #[tracing_test::traced_test]
1170 fn test_update_leader_certificate_odd() -> Result<()> {
1171 let rng = &mut TestRng::default();
1172
1173 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1175 assert_eq!(storage.max_gc_rounds(), 10);
1176
1177 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1179 assert!(bft.is_timer_expired());
1180
1181 let result = bft.update_leader_certificate_to_even_round(1);
1183 assert!(!result);
1184 Ok(())
1185 }
1186
1187 #[test]
1188 #[tracing_test::traced_test]
1189 fn test_update_leader_certificate_bad_round() -> Result<()> {
1190 let rng = &mut TestRng::default();
1191
1192 let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1194 assert_eq!(storage.max_gc_rounds(), 10);
1195
1196 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1198
1199 let result = bft.update_leader_certificate_to_even_round(6);
1201 assert!(!result);
1202 Ok(())
1203 }
1204
1205 #[test]
1206 #[tracing_test::traced_test]
1207 fn test_update_leader_certificate_even() -> Result<()> {
1208 let rng = &mut TestRng::default();
1209
1210 let current_round = 3;
1212
1213 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1215 current_round,
1216 rng,
1217 );
1218
1219 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1221 2,
1222 vec![
1223 certificates[0].author(),
1224 certificates[1].author(),
1225 certificates[2].author(),
1226 certificates[3].author(),
1227 ],
1228 rng,
1229 );
1230
1231 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1233
1234 let transmissions = Arc::new(BFTMemoryService::new());
1236 let storage = Storage::new(ledger.clone(), transmissions, 10).unwrap();
1237 storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1238 storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1239 storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1240 storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1241 assert_eq!(storage.current_round(), 2);
1242
1243 let leader = committee.get_leader(2).unwrap();
1245 let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1246
1247 let account = Account::new(rng)?;
1249 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1250
1251 *bft.leader_certificate.write() = Some(leader_certificate);
1253
1254 let result = bft.update_leader_certificate_to_even_round(2);
1257 assert!(result);
1258
1259 Ok(())
1260 }
1261
1262 #[tokio::test]
1263 #[tracing_test::traced_test]
1264 async fn test_order_dag_with_dfs() -> Result<()> {
1265 let rng = &mut TestRng::default();
1266
1267 let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1269
1270 let previous_round = 2; let current_round = previous_round + 1;
1273
1274 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1276 current_round,
1277 rng,
1278 );
1279
1280 {
1284 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
1286 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1288
1289 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1291
1292 for certificate in previous_certificates.clone() {
1294 bft.add_certificate_from_sync(certificate);
1295 }
1296
1297 let result = bft.order_dag_with_dfs(certificate.clone());
1299 assert!(result.is_ok());
1300 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1301 assert_eq!(candidate_certificates.len(), 1);
1302 let expected_certificates = vec![certificate.clone()];
1303 assert_eq!(
1304 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1305 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1306 );
1307 assert_eq!(candidate_certificates, expected_certificates);
1308 }
1309
1310 {
1314 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1).unwrap();
1316 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1318
1319 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1321
1322 for certificate in previous_certificates.clone() {
1324 bft.add_certificate_from_sync(certificate);
1325 }
1326
1327 let result = bft.order_dag_with_dfs(certificate.clone());
1329 assert!(result.is_ok());
1330 let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1331 assert_eq!(candidate_certificates.len(), 5);
1332 let expected_certificates = vec![
1333 previous_certificates[0].clone(),
1334 previous_certificates[1].clone(),
1335 previous_certificates[2].clone(),
1336 previous_certificates[3].clone(),
1337 certificate,
1338 ];
1339 assert_eq!(
1340 candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1341 expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1342 );
1343 assert_eq!(candidate_certificates, expected_certificates);
1344 }
1345
1346 Ok(())
1347 }
1348
1349 #[test]
1350 #[tracing_test::traced_test]
1351 fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1352 let rng = &mut TestRng::default();
1353
1354 let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1356 assert_eq!(committee.starting_round(), 1);
1357 assert_eq!(storage.current_round(), 1);
1358 assert_eq!(storage.max_gc_rounds(), 1);
1359
1360 let previous_round = 2; let current_round = previous_round + 1;
1363
1364 let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1366 current_round,
1367 rng,
1368 );
1369 let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1371
1372 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1376
1377 let error_msg = format!(
1379 "Missing previous certificate {} for round {previous_round}",
1380 crate::helpers::fmt_id(previous_certificate_ids[3]),
1381 );
1382
1383 let result = bft.order_dag_with_dfs(certificate);
1385 assert!(result.is_err());
1386 assert_eq!(result.unwrap_err().to_string(), error_msg);
1387 Ok(())
1388 }
1389
1390 #[tokio::test]
1391 async fn test_bft_gc_on_commit() -> Result<()> {
1392 let rng = &mut TestRng::default();
1393
1394 let max_gc_rounds = 1;
1396 let committee_round = 0;
1397 let commit_round = 2;
1398 let current_round = commit_round + 1;
1399
1400 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1402 current_round,
1403 rng,
1404 );
1405
1406 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1408 committee_round,
1409 vec![
1410 certificates[0].author(),
1411 certificates[1].author(),
1412 certificates[2].author(),
1413 certificates[3].author(),
1414 ],
1415 rng,
1416 );
1417
1418 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1420
1421 let transmissions = Arc::new(BFTMemoryService::new());
1423 let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds).unwrap();
1424 for certificate in certificates.iter() {
1426 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1427 }
1428
1429 let leader = committee.get_leader(commit_round).unwrap();
1431 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1432
1433 let account = Account::new(rng)?;
1435 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1436
1437 *bft.dag.write() = mock_dag_with_modified_last_committed_round(commit_round);
1439
1440 assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1442
1443 for certificate in certificates {
1445 bft.add_certificate_from_sync(certificate);
1446 }
1447
1448 bft.commit_leader_certificate(leader_certificate).await.unwrap();
1450
1451 assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1453
1454 Ok(())
1455 }
1456
1457 #[tokio::test]
1458 #[tracing_test::traced_test]
1459 async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1460 let rng = &mut TestRng::default();
1461
1462 let max_gc_rounds = 1;
1464 let committee_round = 0;
1465 let commit_round = 2;
1466 let current_round = commit_round + 1;
1467
1468 let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1470 current_round,
1471 rng,
1472 );
1473
1474 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1476 committee_round,
1477 vec![
1478 certificates[0].author(),
1479 certificates[1].author(),
1480 certificates[2].author(),
1481 certificates[3].author(),
1482 ],
1483 rng,
1484 );
1485
1486 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1488
1489 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1491 for certificate in certificates.iter() {
1493 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1494 }
1495
1496 let leader = committee.get_leader(commit_round).unwrap();
1498 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1499
1500 let account = Account::new(rng)?;
1502 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1503
1504 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1506
1507 for certificate in certificates.clone() {
1509 bft.add_certificate_from_sync(certificate);
1510 }
1511
1512 bft.commit_leader_certificate(leader_certificate.clone()).await.unwrap();
1514
1515 let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1519 let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1521
1522 for cert in certificates.iter() {
1524 bootup_bft.add_certificate_from_sync(cert.clone());
1525 bootup_bft.commit_certificate_from_sync(cert);
1526 }
1527
1528 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1530
1531 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1533 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1534
1535 for certificate in certificates {
1537 let certificate_round = certificate.round();
1538 let certificate_id = certificate.id();
1539 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1541 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1544 }
1545
1546 Ok(())
1547 }
1548
1549 #[tokio::test]
1550 #[tracing_test::traced_test]
1551 async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1552 let rng = &mut TestRng::default();
1559
1560 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1562 let committee_round = 0;
1563 let commit_round = 2;
1564 let current_round = commit_round + 1;
1565 let next_round = current_round + 1;
1566
1567 let (round_to_certificates_map, committee) = {
1569 let private_keys = [
1570 PrivateKey::new(rng).unwrap(),
1571 PrivateKey::new(rng).unwrap(),
1572 PrivateKey::new(rng).unwrap(),
1573 PrivateKey::new(rng).unwrap(),
1574 ];
1575 let addresses = vec![
1576 Address::try_from(private_keys[0])?,
1577 Address::try_from(private_keys[1])?,
1578 Address::try_from(private_keys[2])?,
1579 Address::try_from(private_keys[3])?,
1580 ];
1581 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1582 committee_round,
1583 addresses,
1584 rng,
1585 );
1586 let mut round_to_certificates_map: IndexMap<
1588 u64,
1589 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1590 > = IndexMap::new();
1591 let mut previous_certificates = IndexSet::with_capacity(4);
1592 for _ in 0..4 {
1594 previous_certificates.insert(sample_batch_certificate(rng));
1595 }
1596 for round in 0..commit_round + 3 {
1597 let mut current_certificates = IndexSet::new();
1598 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1599 IndexSet::new()
1600 } else {
1601 previous_certificates.iter().map(|c| c.id()).collect()
1602 };
1603 let transmission_ids =
1604 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1605 .into_iter()
1606 .collect::<IndexSet<_>>();
1607 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1608 let committee_id = committee.id();
1609 for (i, private_key_1) in private_keys.iter().enumerate() {
1610 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1611 private_key_1,
1612 round,
1613 timestamp,
1614 committee_id,
1615 transmission_ids.clone(),
1616 previous_certificate_ids.clone(),
1617 rng,
1618 )
1619 .unwrap();
1620 let mut signatures = IndexSet::with_capacity(4);
1621 for (j, private_key_2) in private_keys.iter().enumerate() {
1622 if i != j {
1623 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1624 }
1625 }
1626 let certificate =
1627 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1628 current_certificates.insert(certificate);
1629 }
1630 round_to_certificates_map.insert(round, current_certificates.clone());
1632 previous_certificates = current_certificates.clone();
1633 }
1634 (round_to_certificates_map, committee)
1635 };
1636
1637 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1639 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1641 let leader = committee.get_leader(commit_round).unwrap();
1643 let next_leader = committee.get_leader(next_round).unwrap();
1644 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1646 for i in 1..=commit_round {
1647 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1648 if i == commit_round {
1649 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1651 if let Some(c) = leader_certificate {
1652 pre_shutdown_certificates.push(c.clone());
1653 }
1654 continue;
1655 }
1656 pre_shutdown_certificates.extend(certificates);
1657 }
1658 for certificate in pre_shutdown_certificates.iter() {
1659 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1660 }
1661 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1663 Vec::new();
1664 for j in commit_round..=commit_round + 2 {
1665 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1666 post_shutdown_certificates.extend(certificate);
1667 }
1668 for certificate in post_shutdown_certificates.iter() {
1669 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1670 }
1671 let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1673 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1674
1675 let account = Account::new(rng)?;
1677 let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1678
1679 *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1681
1682 for certificate in pre_shutdown_certificates.clone() {
1684 assert!(bft.add_new_certificate(certificate).await.is_ok());
1685 }
1686
1687 for certificate in post_shutdown_certificates.clone() {
1689 assert!(bft.add_new_certificate(certificate).await.is_ok());
1690 }
1691 let commit_subdag = bft.order_dag_with_dfs(next_leader_certificate.clone()).unwrap();
1693 let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1694 bft.commit_leader_certificate(next_leader_certificate.clone()).await.unwrap();
1695
1696 let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1700
1701 let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1703
1704 for cert in pre_shutdown_certificates.iter() {
1706 bootup_bft.add_certificate_from_sync(cert.clone());
1707 bootup_bft.commit_certificate_from_sync(cert);
1708 }
1709
1710 for certificate in post_shutdown_certificates.iter() {
1712 bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1713 }
1714 for certificate in post_shutdown_certificates.clone() {
1715 assert!(bootup_bft.add_new_certificate(certificate).await.is_ok());
1716 }
1717 let commit_subdag_bootup = bootup_bft.order_dag_with_dfs(next_leader_certificate.clone()).unwrap();
1719 let commit_subdag_metadata_bootup =
1720 commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1721 let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1722 bootup_bft.commit_leader_certificate(next_leader_certificate.clone()).await.unwrap();
1723
1724 assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1728
1729 assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1731 assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1732 assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1733 assert!(
1734 bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1735 );
1736
1737 for certificate in pre_shutdown_certificates.clone() {
1739 let certificate_round = certificate.round();
1740 let certificate_id = certificate.id();
1741 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1743 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1744 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1747 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1748 }
1749
1750 for certificate in committed_certificates_bootup.clone() {
1752 let certificate_round = certificate.round();
1753 let certificate_id = certificate.id();
1754 assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1756 assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1757 assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1760 assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1761 }
1762
1763 assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1765
1766 Ok(())
1767 }
1768
1769 #[tokio::test]
1770 #[tracing_test::traced_test]
1771 async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1772 let rng = &mut TestRng::default();
1779
1780 let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1782 let committee_round = 0;
1783 let commit_round = 2;
1784 let current_round = commit_round + 1;
1785 let next_round = current_round + 1;
1786
1787 let (round_to_certificates_map, committee) = {
1789 let private_keys = [
1790 PrivateKey::new(rng).unwrap(),
1791 PrivateKey::new(rng).unwrap(),
1792 PrivateKey::new(rng).unwrap(),
1793 PrivateKey::new(rng).unwrap(),
1794 ];
1795 let addresses = vec![
1796 Address::try_from(private_keys[0])?,
1797 Address::try_from(private_keys[1])?,
1798 Address::try_from(private_keys[2])?,
1799 Address::try_from(private_keys[3])?,
1800 ];
1801 let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1802 committee_round,
1803 addresses,
1804 rng,
1805 );
1806 let mut round_to_certificates_map: IndexMap<
1808 u64,
1809 IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1810 > = IndexMap::new();
1811 let mut previous_certificates = IndexSet::with_capacity(4);
1812 for _ in 0..4 {
1814 previous_certificates.insert(sample_batch_certificate(rng));
1815 }
1816 for round in 0..=commit_round + 2 {
1817 let mut current_certificates = IndexSet::new();
1818 let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1819 IndexSet::new()
1820 } else {
1821 previous_certificates.iter().map(|c| c.id()).collect()
1822 };
1823 let transmission_ids =
1824 snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1825 .into_iter()
1826 .collect::<IndexSet<_>>();
1827 let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1828 let committee_id = committee.id();
1829 for (i, private_key_1) in private_keys.iter().enumerate() {
1830 let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1831 private_key_1,
1832 round,
1833 timestamp,
1834 committee_id,
1835 transmission_ids.clone(),
1836 previous_certificate_ids.clone(),
1837 rng,
1838 )
1839 .unwrap();
1840 let mut signatures = IndexSet::with_capacity(4);
1841 for (j, private_key_2) in private_keys.iter().enumerate() {
1842 if i != j {
1843 signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1844 }
1845 }
1846 let certificate =
1847 snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1848 current_certificates.insert(certificate);
1849 }
1850 round_to_certificates_map.insert(round, current_certificates.clone());
1852 previous_certificates = current_certificates.clone();
1853 }
1854 (round_to_certificates_map, committee)
1855 };
1856
1857 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1859 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1861 let leader = committee.get_leader(commit_round).unwrap();
1863 let next_leader = committee.get_leader(next_round).unwrap();
1864 let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1866 for i in 1..=commit_round {
1867 let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1868 if i == commit_round {
1869 let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1871 if let Some(c) = leader_certificate {
1872 pre_shutdown_certificates.push(c.clone());
1873 }
1874 continue;
1875 }
1876 pre_shutdown_certificates.extend(certificates);
1877 }
1878 for certificate in pre_shutdown_certificates.iter() {
1879 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1880 }
1881 let account = Account::new(rng)?;
1883 let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1884
1885 *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1887 for cert in pre_shutdown_certificates.iter() {
1889 bootup_bft.add_certificate_from_sync(cert.clone());
1890 bootup_bft.commit_certificate_from_sync(cert);
1891 }
1892
1893 let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1895 Vec::new();
1896 for j in commit_round..=commit_round + 2 {
1897 let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1898 post_shutdown_certificates.extend(certificate);
1899 }
1900 for certificate in post_shutdown_certificates.iter() {
1901 storage.testing_only_insert_certificate_testing_only(certificate.clone());
1902 }
1903
1904 for certificate in post_shutdown_certificates.clone() {
1906 assert!(bootup_bft.add_new_certificate(certificate).await.is_ok());
1907 }
1908
1909 let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1911 let commit_subdag = bootup_bft.order_dag_with_dfs(next_leader_certificate).unwrap();
1912 let committed_certificates = commit_subdag.values().flatten();
1913
1914 for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1916 for committed_certificate in committed_certificates.clone() {
1917 assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1918 }
1919 }
1920 Ok(())
1921 }
1922
1923 #[test_log::test(tokio::test)]
1925 async fn test_commit_via_is_linked() {
1926 let rng = &mut TestRng::default();
1927
1928 let committee_round = 0;
1929 let leader_round_1 = 2;
1930 let leader_round_2 = 4; let max_gc_rounds = 50;
1932
1933 let num_authors = 4;
1935 let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
1936 let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
1937
1938 let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
1939 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1940 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
1941 let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
1942
1943 let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
1944
1945 let round1_certs: IndexSet<_> = (0..num_authors)
1947 .map(|idx| {
1948 let author = &private_keys[idx];
1949 let endorsements: Vec<_> = private_keys
1950 .iter()
1951 .enumerate()
1952 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
1953 .collect();
1954
1955 sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
1956 })
1957 .collect();
1958 certificates_by_round.insert(1, round1_certs.clone());
1959
1960 let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
1961 let mut leader1_certificate = None;
1962
1963 let round2_certs: IndexSet<_> = (0..num_authors)
1964 .map(|idx| {
1965 let author = &private_keys[idx];
1966 let endorsements: Vec<_> = private_keys
1967 .iter()
1968 .enumerate()
1969 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
1970 .collect();
1971 let cert = sample_batch_certificate_for_round_with_committee(
1972 leader_round_1,
1973 round1_certs.iter().map(|c| c.id()).collect(),
1974 author,
1975 &endorsements[..],
1976 rng,
1977 );
1978
1979 if cert.author() == leader1 {
1980 leader1_certificate = Some(cert.clone());
1981 }
1982 cert
1983 })
1984 .collect();
1985 certificates_by_round.insert(leader_round_1, round2_certs.clone());
1986
1987 let round3_certs: IndexSet<_> = (0..num_authors)
1988 .map(|idx| {
1989 let author = &private_keys[idx];
1990 let endorsements: Vec<_> = private_keys
1991 .iter()
1992 .enumerate()
1993 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
1994 .collect();
1995
1996 let previous_certificate_ids: IndexSet<_> = round2_certs
1997 .iter()
1998 .filter_map(|cert| {
1999 if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2001 })
2002 .collect();
2003
2004 sample_batch_certificate_for_round_with_committee(
2005 leader_round_1 + 1,
2006 previous_certificate_ids,
2007 author,
2008 &endorsements[..],
2009 rng,
2010 )
2011 })
2012 .collect();
2013 certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2014
2015 let leader_certificate_1 = leader1_certificate.unwrap();
2017 assert!(
2018 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2019 "Leader certificate 1 should not be committed yet"
2020 );
2021 assert_eq!(bft.dag.read().last_committed_round(), 0);
2022
2023 let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2024 let round4_certs: IndexSet<_> = (0..num_authors)
2025 .map(|idx| {
2026 let endorsements: Vec<_> = private_keys
2027 .iter()
2028 .enumerate()
2029 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2030 .collect();
2031
2032 sample_batch_certificate_for_round_with_committee(
2033 leader_round_2,
2034 round3_certs.iter().map(|c| c.id()).collect(),
2035 &private_keys[idx],
2036 &endorsements[..],
2037 rng,
2038 )
2039 })
2040 .collect();
2041 certificates_by_round.insert(leader_round_2, round4_certs.clone());
2042
2043 for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2045 storage.testing_only_insert_certificate_testing_only(certificate.clone());
2046 bft.add_certificate_from_sync(certificate);
2047 }
2048
2049 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2050
2051 assert!(
2052 bft.dag.read().is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2053 "Leader certificate 1 should be linked to leader certificate 2"
2054 );
2055
2056 bft.commit_leader_certificate(leader_certificate_2.clone()).await.unwrap();
2058
2059 assert!(
2061 bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2062 "Leader certificate for round 2 should be committed when committing at round 4"
2063 );
2064
2065 assert!(
2067 bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2068 "Leader certificate for round 4 should be committed"
2069 );
2070
2071 assert_eq!(bft.dag.read().last_committed_round(), 4);
2072 }
2073
2074 #[test_log::test(tokio::test)]
2075 async fn test_commit_via_is_linked_with_skipped_anchor() {
2076 let rng = &mut TestRng::default();
2077
2078 let committee_round = 0;
2079 let leader_round_1 = 2;
2080 let leader_round_2 = 4;
2081 let max_gc_rounds = 50;
2082
2083 let num_authors = 4;
2084 let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2085 let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2086
2087 let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2088 let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2089 let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds).unwrap();
2090 let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2091
2092 let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2093
2094 let round1_certs: IndexSet<_> = (0..num_authors)
2096 .map(|idx| {
2097 let author = &private_keys[idx];
2098 let endorsements: Vec<_> = private_keys
2099 .iter()
2100 .enumerate()
2101 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2102 .collect();
2103
2104 sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2105 })
2106 .collect();
2107 certificates_by_round.insert(1, round1_certs.clone());
2108
2109 let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2110 let mut leader1_certificate = None;
2111
2112 let round2_certs: IndexSet<_> = (0..num_authors)
2113 .map(|idx| {
2114 let author = &private_keys[idx];
2115 let endorsements: Vec<_> = private_keys
2116 .iter()
2117 .enumerate()
2118 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2119 .collect();
2120 let cert = sample_batch_certificate_for_round_with_committee(
2121 leader_round_1,
2122 round1_certs.iter().map(|c| c.id()).collect(),
2123 author,
2124 &endorsements[..],
2125 rng,
2126 );
2127
2128 if cert.author() == leader1 {
2129 leader1_certificate = Some(cert.clone());
2130 }
2131 cert
2132 })
2133 .collect();
2134 certificates_by_round.insert(leader_round_1, round2_certs.clone());
2135
2136 let round3_certs: IndexSet<_> = (0..num_authors)
2137 .map(|idx| {
2138 let author = &private_keys[idx];
2139 let endorsements: Vec<_> = private_keys
2140 .iter()
2141 .enumerate()
2142 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2143 .collect();
2144
2145 let previous_certificate_ids: IndexSet<_> = round2_certs
2146 .iter()
2147 .filter_map(|cert| {
2148 if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2150 })
2151 .collect();
2152
2153 sample_batch_certificate_for_round_with_committee(
2154 leader_round_1 + 1,
2155 previous_certificate_ids,
2156 author,
2157 &endorsements[..],
2158 rng,
2159 )
2160 })
2161 .collect();
2162 certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2163
2164 let leader_certificate_1 = leader1_certificate.unwrap();
2166 assert!(
2167 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2168 "Leader certificate 1 should not be committed yet"
2169 );
2170
2171 let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2172 let round4_certs: IndexSet<_> = (0..num_authors)
2173 .map(|idx| {
2174 let endorsements: Vec<_> = private_keys
2175 .iter()
2176 .enumerate()
2177 .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2178 .collect();
2179
2180 let previous_certificate_ids: IndexSet<_> = round3_certs
2182 .iter()
2183 .filter_map(|cert| if cert.author() == leader1 { None } else { Some(cert.id()) })
2184 .collect();
2185
2186 sample_batch_certificate_for_round_with_committee(
2187 leader_round_2,
2188 previous_certificate_ids,
2189 &private_keys[idx],
2190 &endorsements[..],
2191 rng,
2192 )
2193 })
2194 .collect();
2195 certificates_by_round.insert(leader_round_2, round4_certs.clone());
2196
2197 for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2199 storage.testing_only_insert_certificate_testing_only(certificate.clone());
2200 bft.add_certificate_from_sync(certificate);
2201 }
2202
2203 let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2204
2205 assert!(
2206 !bft.dag.read().is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2207 "Leader certificate 1 should not be linked to leader certificate 2"
2208 );
2209 assert_eq!(bft.dag.read().last_committed_round(), 0);
2210
2211 bft.commit_leader_certificate(leader_certificate_2.clone()).await.unwrap();
2213
2214 assert!(
2216 !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2217 "Leader certificate for round 2 should not be committed when committing at round 4"
2218 );
2219
2220 assert!(
2222 bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2223 "Leader certificate for round 4 should be committed"
2224 );
2225 assert_eq!(bft.dag.read().last_committed_round(), 4);
2226 }
2227}