snarkos_node_bft/
bft.rs

1// Copyright 2024-2025 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    Primary,
19    helpers::{
20        BFTReceiver,
21        ConsensusSender,
22        DAG,
23        PrimaryReceiver,
24        PrimarySender,
25        Storage,
26        fmt_id,
27        init_bft_channels,
28        now,
29    },
30};
31use snarkos_account::Account;
32use snarkos_node_bft_ledger_service::LedgerService;
33use snarkvm::{
34    console::account::Address,
35    ledger::{
36        block::Transaction,
37        committee::Committee,
38        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
39        puzzle::{Solution, SolutionID},
40    },
41    prelude::{Field, Network, Result, bail, ensure},
42};
43
44use aleo_std::StorageMode;
45use colored::Colorize;
46use indexmap::{IndexMap, IndexSet};
47#[cfg(feature = "locktick")]
48use locktick::{
49    parking_lot::{Mutex, RwLock},
50    tokio::Mutex as TMutex,
51};
52#[cfg(not(feature = "locktick"))]
53use parking_lot::{Mutex, RwLock};
54use std::{
55    collections::{BTreeMap, HashSet},
56    future::Future,
57    net::SocketAddr,
58    sync::{
59        Arc,
60        atomic::{AtomicI64, Ordering},
61    },
62};
63#[cfg(not(feature = "locktick"))]
64use tokio::sync::Mutex as TMutex;
65use tokio::{
66    sync::{OnceCell, oneshot},
67    task::JoinHandle,
68};
69
70#[derive(Clone)]
71pub struct BFT<N: Network> {
72    /// The primary.
73    primary: Primary<N>,
74    /// The DAG.
75    dag: Arc<RwLock<DAG<N>>>,
76    /// The batch certificate of the leader from the current even round, if one was present.
77    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
78    /// The timer for the leader certificate to be received.
79    leader_certificate_timer: Arc<AtomicI64>,
80    /// The consensus sender.
81    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
82    /// The spawned handles.
83    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
84    /// The BFT lock.
85    lock: Arc<TMutex<()>>,
86}
87
88impl<N: Network> BFT<N> {
89    /// Initializes a new instance of the BFT.
90    pub fn new(
91        account: Account<N>,
92        storage: Storage<N>,
93        ledger: Arc<dyn LedgerService<N>>,
94        ip: Option<SocketAddr>,
95        trusted_validators: &[SocketAddr],
96        storage_mode: StorageMode,
97    ) -> Result<Self> {
98        Ok(Self {
99            primary: Primary::new(account, storage, ledger, ip, trusted_validators, storage_mode)?,
100            dag: Default::default(),
101            leader_certificate: Default::default(),
102            leader_certificate_timer: Default::default(),
103            consensus_sender: Default::default(),
104            handles: Default::default(),
105            lock: Default::default(),
106        })
107    }
108
109    /// Run the BFT instance.
110    pub async fn run(
111        &mut self,
112        consensus_sender: Option<ConsensusSender<N>>,
113        primary_sender: PrimarySender<N>,
114        primary_receiver: PrimaryReceiver<N>,
115    ) -> Result<()> {
116        info!("Starting the BFT instance...");
117        // Initialize the BFT channels.
118        let (bft_sender, bft_receiver) = init_bft_channels::<N>();
119        // First, start the BFT handlers.
120        self.start_handlers(bft_receiver);
121        // Next, run the primary instance.
122        self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await?;
123        // Lastly, set the consensus sender.
124        // Note: This ensures during initial syncing, that the BFT does not advance the ledger.
125        if let Some(consensus_sender) = consensus_sender {
126            self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
127        }
128        Ok(())
129    }
130
131    /// Returns `true` if the primary is synced.
132    pub fn is_synced(&self) -> bool {
133        self.primary.is_synced()
134    }
135
136    /// Returns the primary.
137    pub const fn primary(&self) -> &Primary<N> {
138        &self.primary
139    }
140
141    /// Returns the storage.
142    pub const fn storage(&self) -> &Storage<N> {
143        self.primary.storage()
144    }
145
146    /// Returns the ledger.
147    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
148        self.primary.ledger()
149    }
150
151    /// Returns the leader of the current even round, if one was present.
152    pub fn leader(&self) -> Option<Address<N>> {
153        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
154    }
155
156    /// Returns the certificate of the leader from the current even round, if one was present.
157    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
158        &self.leader_certificate
159    }
160}
161
162impl<N: Network> BFT<N> {
163    /// Returns the number of unconfirmed transmissions.
164    pub fn num_unconfirmed_transmissions(&self) -> usize {
165        self.primary.num_unconfirmed_transmissions()
166    }
167
168    /// Returns the number of unconfirmed ratifications.
169    pub fn num_unconfirmed_ratifications(&self) -> usize {
170        self.primary.num_unconfirmed_ratifications()
171    }
172
173    /// Returns the number of solutions.
174    pub fn num_unconfirmed_solutions(&self) -> usize {
175        self.primary.num_unconfirmed_solutions()
176    }
177
178    /// Returns the number of unconfirmed transactions.
179    pub fn num_unconfirmed_transactions(&self) -> usize {
180        self.primary.num_unconfirmed_transactions()
181    }
182}
183
184impl<N: Network> BFT<N> {
185    /// Returns the worker transmission IDs.
186    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
187        self.primary.worker_transmission_ids()
188    }
189
190    /// Returns the worker transmissions.
191    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
192        self.primary.worker_transmissions()
193    }
194
195    /// Returns the worker solutions.
196    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
197        self.primary.worker_solutions()
198    }
199
200    /// Returns the worker transactions.
201    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
202        self.primary.worker_transactions()
203    }
204}
205
206impl<N: Network> BFT<N> {
207    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
208    fn update_to_next_round(&self, current_round: u64) -> bool {
209        // Ensure the current round is at least the storage round (this is a sanity check).
210        let storage_round = self.storage().current_round();
211        if current_round < storage_round {
212            debug!(
213                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
214            );
215            return false;
216        }
217
218        // Determine if the BFT is ready to update to the next round.
219        let is_ready = match current_round % 2 == 0 {
220            true => self.update_leader_certificate_to_even_round(current_round),
221            false => self.is_leader_quorum_or_nonleaders_available(current_round),
222        };
223
224        #[cfg(feature = "metrics")]
225        {
226            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
227            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
228            if start > 0 {
229                let end = now();
230                let elapsed = std::time::Duration::from_secs((end - start) as u64);
231                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
232            }
233        }
234
235        // Log whether the round is going to update.
236        if current_round % 2 == 0 {
237            // Determine if there is a leader certificate.
238            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
239                // Ensure the state of the leader certificate is consistent with the BFT being ready.
240                if !is_ready {
241                    trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
242                }
243                // Log the leader election.
244                let leader_round = leader_certificate.round();
245                match leader_round == current_round {
246                    true => {
247                        info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
248                        #[cfg(feature = "metrics")]
249                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
250                    }
251                    false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
252                }
253            } else {
254                match is_ready {
255                    true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
256                    false => info!("{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed()),
257                }
258            }
259        }
260
261        // If the BFT is ready, then update to the next round.
262        if is_ready {
263            // Update to the next round in storage.
264            if let Err(e) = self.storage().increment_to_next_round(current_round) {
265                warn!("BFT failed to increment to the next round from round {current_round} - {e}");
266                return false;
267            }
268            // Update the timer for the leader certificate.
269            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
270        }
271
272        is_ready
273    }
274
275    /// Updates the leader certificate to the current even round,
276    /// returning `true` if the BFT is ready to update to the next round.
277    ///
278    /// This method runs on every even round, by determining the leader of the current even round,
279    /// and setting the leader certificate to their certificate in the round, if they were present.
280    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
281        // Retrieve the current round.
282        let current_round = self.storage().current_round();
283        // Ensure the current round matches the given round.
284        if current_round != even_round {
285            warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
286            return false;
287        }
288
289        // If the current round is odd, return false.
290        if current_round % 2 != 0 || current_round < 2 {
291            error!("BFT cannot update the leader certificate in an odd round");
292            return false;
293        }
294
295        // Retrieve the certificates for the current round.
296        let current_certificates = self.storage().get_certificates_for_round(current_round);
297        // If there are no current certificates, set the leader certificate to 'None', and return early.
298        if current_certificates.is_empty() {
299            // Set the leader certificate to 'None'.
300            *self.leader_certificate.write() = None;
301            return false;
302        }
303
304        // Retrieve the committee lookback of the current round.
305        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
306            Ok(committee) => committee,
307            Err(e) => {
308                error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
309                return false;
310            }
311        };
312        // Determine the leader of the current round.
313        let leader = match self.ledger().latest_leader() {
314            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
315            _ => {
316                // Compute the leader for the current round.
317                let computed_leader = match committee_lookback.get_leader(current_round) {
318                    Ok(leader) => leader,
319                    Err(e) => {
320                        error!("BFT failed to compute the leader for the even round {current_round} - {e}");
321                        return false;
322                    }
323                };
324
325                // Cache the computed leader.
326                self.ledger().update_latest_leader(current_round, computed_leader);
327
328                computed_leader
329            }
330        };
331        // Find and set the leader certificate, if the leader was present in the current even round.
332        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
333        *self.leader_certificate.write() = leader_certificate.cloned();
334
335        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
336    }
337
338    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
339    ///  - If the leader certificate is set for the current even round.
340    ///  - The timer for the leader certificate has expired.
341    fn is_even_round_ready_for_next_round(
342        &self,
343        certificates: IndexSet<BatchCertificate<N>>,
344        committee: Committee<N>,
345        current_round: u64,
346    ) -> bool {
347        // Retrieve the authors for the current round.
348        let authors = certificates.into_iter().map(|c| c.author()).collect();
349        // Check if quorum threshold is reached.
350        if !committee.is_quorum_threshold_reached(&authors) {
351            trace!("BFT failed to reach quorum threshold in even round {current_round}");
352            return false;
353        }
354        // If the leader certificate is set for the current even round, return 'true'.
355        if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
356            if leader_certificate.round() == current_round {
357                return true;
358            }
359        }
360        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
361        if self.is_timer_expired() {
362            debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
363            return true;
364        }
365        // Otherwise, return 'false'.
366        false
367    }
368
369    /// Returns `true` if the timer for the leader certificate has expired.
370    fn is_timer_expired(&self) -> bool {
371        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
372    }
373
374    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
375    ///  - The leader certificate is `None`.
376    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
377    ///  - The leader certificate timer has expired.
378    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
379        // Retrieve the current round.
380        let current_round = self.storage().current_round();
381        // Ensure the current round matches the given round.
382        if current_round != odd_round {
383            warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
384            return false;
385        }
386        // If the current round is even, return false.
387        if current_round % 2 != 1 {
388            error!("BFT does not compute stakes for the leader certificate in an even round");
389            return false;
390        }
391        // Retrieve the certificates for the current round.
392        let current_certificates = self.storage().get_certificates_for_round(current_round);
393        // Retrieve the committee lookback for the current round.
394        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
395            Ok(committee) => committee,
396            Err(e) => {
397                error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
398                return false;
399            }
400        };
401        // Retrieve the authors of the current certificates.
402        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
403        // Check if quorum threshold is reached.
404        if !committee_lookback.is_quorum_threshold_reached(&authors) {
405            trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
406            return false;
407        }
408        // Retrieve the leader certificate.
409        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
410            // If there is no leader certificate for the previous round, return 'true'.
411            return true;
412        };
413        // Compute the stake for the leader certificate.
414        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
415            leader_certificate.id(),
416            current_certificates,
417            &committee_lookback,
418        );
419        // Return 'true' if any of the following conditions hold:
420        stake_with_leader >= committee_lookback.availability_threshold()
421            || stake_without_leader >= committee_lookback.quorum_threshold()
422            || self.is_timer_expired()
423    }
424
425    /// Computes the amount of stake that has & has not signed for the leader certificate.
426    fn compute_stake_for_leader_certificate(
427        &self,
428        leader_certificate_id: Field<N>,
429        current_certificates: IndexSet<BatchCertificate<N>>,
430        current_committee: &Committee<N>,
431    ) -> (u64, u64) {
432        // If there are no current certificates, return early.
433        if current_certificates.is_empty() {
434            return (0, 0);
435        }
436
437        // Initialize a tracker for the stake with the leader.
438        let mut stake_with_leader = 0u64;
439        // Initialize a tracker for the stake without the leader.
440        let mut stake_without_leader = 0u64;
441        // Iterate over the current certificates.
442        for certificate in current_certificates {
443            // Retrieve the stake for the author of the certificate.
444            let stake = current_committee.get_stake(certificate.author());
445            // Determine if the certificate includes the leader.
446            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
447                // If the certificate includes the leader, add the stake to the stake with the leader.
448                true => stake_with_leader = stake_with_leader.saturating_add(stake),
449                // If the certificate does not include the leader, add the stake to the stake without the leader.
450                false => stake_without_leader = stake_without_leader.saturating_add(stake),
451            }
452        }
453        // Return the stake with the leader, and the stake without the leader.
454        (stake_with_leader, stake_without_leader)
455    }
456}
457
458impl<N: Network> BFT<N> {
459    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
460    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
461        &self,
462        certificate: BatchCertificate<N>,
463    ) -> Result<()> {
464        // Acquire the BFT lock.
465        let _lock = self.lock.lock().await;
466
467        // Retrieve the certificate round.
468        let certificate_round = certificate.round();
469        // Insert the certificate into the DAG.
470        self.dag.write().insert(certificate);
471
472        // Construct the commit round.
473        let commit_round = certificate_round.saturating_sub(1);
474        // If the commit round is odd, return early.
475        if commit_round % 2 != 0 || commit_round < 2 {
476            return Ok(());
477        }
478        // If the commit round is at or below the last committed round, return early.
479        if commit_round <= self.dag.read().last_committed_round() {
480            return Ok(());
481        }
482
483        /* Proceeding to check if the leader is ready to be committed. */
484        trace!("Checking if the leader is ready to be committed for round {commit_round}...");
485
486        // Retrieve the committee lookback for the commit round.
487        let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
488            bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
489        };
490
491        // Either retrieve the cached leader or compute it.
492        let leader = match self.ledger().latest_leader() {
493            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
494            _ => {
495                // Compute the leader for the commit round.
496                let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
497                    bail!("BFT failed to compute the leader for commit round {commit_round}");
498                };
499
500                // Cache the computed leader.
501                self.ledger().update_latest_leader(commit_round, computed_leader);
502
503                computed_leader
504            }
505        };
506
507        // Retrieve the leader certificate for the commit round.
508        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
509        else {
510            trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
511            return Ok(());
512        };
513        // Retrieve all of the certificates for the **certificate** round.
514        let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
515            // TODO (howardwu): Investigate how many certificates we should have at this point.
516            bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
517        };
518        // Retrieve the committee lookback for the certificate round (i.e. the round just after the commit round).
519        let Ok(certificate_committee_lookback) = self.ledger().get_committee_lookback_for_round(certificate_round)
520        else {
521            bail!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}");
522        };
523        // Construct a set over the authors who included the leader's certificate in the certificate round.
524        let authors = certificates
525            .values()
526            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
527                true => Some(c.author()),
528                false => None,
529            })
530            .collect();
531        // Check if the leader is ready to be committed.
532        if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
533            // If the leader is not ready to be committed, return early.
534            trace!("BFT is not ready to commit {commit_round}");
535            return Ok(());
536        }
537
538        /* Proceeding to commit the leader. */
539        info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
540
541        // Commit the leader certificate, and all previous leader certificates since the last committed round.
542        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
543    }
544
545    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
546    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
547        &self,
548        leader_certificate: BatchCertificate<N>,
549    ) -> Result<()> {
550        // Fetch the leader round.
551        let latest_leader_round = leader_certificate.round();
552        // Determine the list of all previous leader certificates since the last committed round.
553        // The order of the leader certificates is from **newest** to **oldest**.
554        let mut leader_certificates = vec![leader_certificate.clone()];
555        {
556            // Retrieve the leader round.
557            let leader_round = leader_certificate.round();
558
559            let mut current_certificate = leader_certificate;
560            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
561            {
562                // Retrieve the previous committee for the leader round.
563                let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
564                    Ok(committee) => committee,
565                    Err(e) => {
566                        bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
567                    }
568                };
569                // Either retrieve the cached leader or compute it.
570                let leader = match self.ledger().latest_leader() {
571                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
572                    _ => {
573                        // Compute the leader for the commit round.
574                        let computed_leader = match previous_committee_lookback.get_leader(round) {
575                            Ok(leader) => leader,
576                            Err(e) => {
577                                bail!("BFT failed to compute the leader for the even round {round} - {e}");
578                            }
579                        };
580
581                        // Cache the computed leader.
582                        self.ledger().update_latest_leader(round, computed_leader);
583
584                        computed_leader
585                    }
586                };
587                // Retrieve the previous leader certificate.
588                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
589                else {
590                    continue;
591                };
592                // Determine if there is a path between the previous certificate and the current certificate.
593                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
594                    // Add the previous leader certificate to the list of certificates to commit.
595                    leader_certificates.push(previous_certificate.clone());
596                    // Update the current certificate to the previous leader certificate.
597                    current_certificate = previous_certificate;
598                }
599            }
600        }
601
602        // Iterate over the leader certificates to commit.
603        for leader_certificate in leader_certificates.into_iter().rev() {
604            // Retrieve the leader certificate round.
605            let leader_round = leader_certificate.round();
606            // Compute the commit subdag.
607            let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
608                Ok(subdag) => subdag,
609                Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
610            };
611            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
612            if !IS_SYNCING {
613                // Initialize a map for the deduped transmissions.
614                let mut transmissions = IndexMap::new();
615                // Initialize a map for the deduped transaction ids.
616                let mut seen_transaction_ids = IndexSet::new();
617                // Initialize a map for the deduped solution ids.
618                let mut seen_solution_ids = IndexSet::new();
619                // Start from the oldest leader certificate.
620                for certificate in commit_subdag.values().flatten() {
621                    // Retrieve the transmissions.
622                    for transmission_id in certificate.transmission_ids() {
623                        // If the transaction ID or solution ID already exists in the map, skip it.
624                        // Note: This additional check is done to ensure that we do not include duplicate
625                        // transaction IDs or solution IDs that may have a different transmission ID.
626                        match transmission_id {
627                            TransmissionID::Solution(solution_id, _) => {
628                                // If the solution already exists, skip it.
629                                if seen_solution_ids.contains(&solution_id) {
630                                    continue;
631                                }
632                            }
633                            TransmissionID::Transaction(transaction_id, _) => {
634                                // If the transaction already exists, skip it.
635                                if seen_transaction_ids.contains(transaction_id) {
636                                    continue;
637                                }
638                            }
639                            TransmissionID::Ratification => {
640                                bail!("Ratifications are currently not supported in the BFT.")
641                            }
642                        }
643                        // If the transmission already exists in the map, skip it.
644                        if transmissions.contains_key(transmission_id) {
645                            continue;
646                        }
647                        // If the transmission already exists in the ledger, skip it.
648                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
649                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
650                            continue;
651                        }
652                        // Retrieve the transmission.
653                        let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
654                            bail!(
655                                "BFT failed to retrieve transmission '{}.{}' from round {}",
656                                fmt_id(transmission_id),
657                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
658                                certificate.round()
659                            );
660                        };
661                        // Insert the transaction ID or solution ID into the map.
662                        match transmission_id {
663                            TransmissionID::Solution(id, _) => {
664                                seen_solution_ids.insert(id);
665                            }
666                            TransmissionID::Transaction(id, _) => {
667                                seen_transaction_ids.insert(id);
668                            }
669                            TransmissionID::Ratification => {}
670                        }
671                        // Add the transmission to the set.
672                        transmissions.insert(*transmission_id, transmission);
673                    }
674                }
675                // Trigger consensus, as this will build a new block for the ledger.
676                // Construct the subdag.
677                let subdag = Subdag::from(commit_subdag.clone())?;
678                // Retrieve the anchor round.
679                let anchor_round = subdag.anchor_round();
680                // Retrieve the number of transmissions.
681                let num_transmissions = transmissions.len();
682                // Retrieve metadata about the subdag.
683                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
684
685                // Ensure the subdag anchor round matches the leader round.
686                ensure!(
687                    anchor_round == leader_round,
688                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
689                );
690
691                // Trigger consensus.
692                if let Some(consensus_sender) = self.consensus_sender.get() {
693                    // Initialize a callback sender and receiver.
694                    let (callback_sender, callback_receiver) = oneshot::channel();
695                    // Send the subdag and transmissions to consensus.
696                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
697                    // Await the callback to continue.
698                    match callback_receiver.await {
699                        Ok(Ok(())) => (), // continue
700                        Ok(Err(e)) => {
701                            error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
702                            return Ok(());
703                        }
704                        Err(e) => {
705                            error!("BFT failed to receive the callback for round {anchor_round} - {e}");
706                            return Ok(());
707                        }
708                    }
709                }
710
711                info!(
712                    "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
713                );
714            }
715
716            // Update the DAG, as the subdag was successfully included into a block.
717            let mut dag_write = self.dag.write();
718            for certificate in commit_subdag.values().flatten() {
719                dag_write.commit(certificate, self.storage().max_gc_rounds());
720            }
721
722            // Update the validator telemetry.
723            #[cfg(feature = "telemetry")]
724            self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
725        }
726
727        // Perform garbage collection based on the latest committed leader round.
728        self.storage().garbage_collect_certificates(latest_leader_round);
729
730        Ok(())
731    }
732
733    /// Returns the subdag of batch certificates to commit.
734    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
735        &self,
736        leader_certificate: BatchCertificate<N>,
737    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
738        // Initialize a map for the certificates to commit.
739        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
740        // Initialize a set for the already ordered certificates.
741        let mut already_ordered = HashSet::new();
742        // Initialize a buffer for the certificates to order.
743        let mut buffer = vec![leader_certificate];
744        // Iterate over the certificates to order.
745        while let Some(certificate) = buffer.pop() {
746            // Insert the certificate into the map.
747            commit.entry(certificate.round()).or_default().insert(certificate.clone());
748
749            // Check if the previous certificate is below the GC round.
750            let previous_round = certificate.round().saturating_sub(1);
751            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
752                continue;
753            }
754            // Iterate over the previous certificate IDs.
755            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
756            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
757            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
758                // If the previous certificate is already ordered, continue.
759                if already_ordered.contains(previous_certificate_id) {
760                    continue;
761                }
762                // If the previous certificate was recently committed, continue.
763                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
764                    continue;
765                }
766                // If the previous certificate already exists in the ledger, continue.
767                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
768                    continue;
769                }
770
771                // Retrieve the previous certificate.
772                let previous_certificate = {
773                    // Start by retrieving the previous certificate from the DAG.
774                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
775                        // If the previous certificate is found, return it.
776                        Some(previous_certificate) => previous_certificate,
777                        // If the previous certificate is not found, retrieve it from the storage.
778                        None => match self.storage().get_certificate(*previous_certificate_id) {
779                            // If the previous certificate is found, return it.
780                            Some(previous_certificate) => previous_certificate,
781                            // Otherwise, the previous certificate is missing, and throw an error.
782                            None => bail!(
783                                "Missing previous certificate {} for round {previous_round}",
784                                fmt_id(previous_certificate_id)
785                            ),
786                        },
787                    }
788                };
789                // Insert the previous certificate into the set of already ordered certificates.
790                already_ordered.insert(previous_certificate.id());
791                // Insert the previous certificate into the buffer.
792                buffer.push(previous_certificate);
793            }
794        }
795        // Ensure we only retain certificates that are above the GC round.
796        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
797        // Return the certificates to commit.
798        Ok(commit)
799    }
800
801    /// Returns `true` if there is a path from the previous certificate to the current certificate.
802    fn is_linked(
803        &self,
804        previous_certificate: BatchCertificate<N>,
805        current_certificate: BatchCertificate<N>,
806    ) -> Result<bool> {
807        // Initialize the list containing the traversal.
808        let mut traversal = vec![current_certificate.clone()];
809        // Iterate over the rounds from the current certificate to the previous certificate.
810        for round in (previous_certificate.round()..current_certificate.round()).rev() {
811            // Retrieve all of the certificates for this past round.
812            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
813                // This is a critical error, as the traversal should have these certificates.
814                // If this error is hit, it is likely that the maximum GC rounds should be increased.
815                bail!("BFT failed to retrieve the certificates for past round {round}");
816            };
817            // Filter the certificates to only include those that are in the traversal.
818            traversal = certificates
819                .into_values()
820                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
821                .collect();
822        }
823        Ok(traversal.contains(&previous_certificate))
824    }
825}
826
827impl<N: Network> BFT<N> {
828    /// Starts the BFT handlers.
829    fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
830        let BFTReceiver {
831            mut rx_primary_round,
832            mut rx_primary_certificate,
833            mut rx_sync_bft_dag_at_bootup,
834            mut rx_sync_bft,
835        } = bft_receiver;
836
837        // Process the current round from the primary.
838        let self_ = self.clone();
839        self.spawn(async move {
840            while let Some((current_round, callback)) = rx_primary_round.recv().await {
841                callback.send(self_.update_to_next_round(current_round)).ok();
842            }
843        });
844
845        // Process the certificate from the primary.
846        let self_ = self.clone();
847        self.spawn(async move {
848            while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
849                // Update the DAG with the certificate.
850                let result = self_.update_dag::<true, false>(certificate).await;
851                // Send the callback **after** updating the DAG.
852                // Note: We must await the DAG update before proceeding.
853                callback.send(result).ok();
854            }
855        });
856
857        // Process the request to sync the BFT DAG at bootup.
858        let self_ = self.clone();
859        self.spawn(async move {
860            while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
861                self_.sync_bft_dag_at_bootup(certificates).await;
862            }
863        });
864
865        // Process the request to sync the BFT.
866        let self_ = self.clone();
867        self.spawn(async move {
868            while let Some((certificate, callback)) = rx_sync_bft.recv().await {
869                // Update the DAG with the certificate.
870                let result = self_.update_dag::<true, true>(certificate).await;
871                // Send the callback **after** updating the DAG.
872                // Note: We must await the DAG update before proceeding.
873                callback.send(result).ok();
874            }
875        });
876    }
877
878    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
879    /// already exist in the ledger.
880    ///
881    /// This method commits all the certificates into the DAG.
882    /// Note that there is no need to insert the certificates into the DAG, because these certificates
883    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
884    async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
885        // Acquire the BFT write lock.
886        let mut dag = self.dag.write();
887
888        // Commit all the certificates.
889        for certificate in certificates {
890            dag.commit(&certificate, self.storage().max_gc_rounds());
891        }
892    }
893
894    /// Spawns a task with the given future; it should only be used for long-running tasks.
895    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
896        self.handles.lock().push(tokio::spawn(future));
897    }
898
899    /// Shuts down the BFT.
900    pub async fn shut_down(&self) {
901        info!("Shutting down the BFT...");
902        // Acquire the lock.
903        let _lock = self.lock.lock().await;
904        // Shut down the primary.
905        self.primary.shut_down().await;
906        // Abort the tasks.
907        self.handles.lock().iter().for_each(|handle| handle.abort());
908    }
909}
910
911#[cfg(test)]
912mod tests {
913    use crate::{BFT, MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, helpers::Storage};
914    use snarkos_account::Account;
915    use snarkos_node_bft_ledger_service::MockLedgerService;
916    use snarkos_node_bft_storage_service::BFTMemoryService;
917    use snarkvm::{
918        console::account::{Address, PrivateKey},
919        ledger::{
920            committee::Committee,
921            narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
922        },
923        utilities::TestRng,
924    };
925
926    use aleo_std::StorageMode;
927    use anyhow::Result;
928    use indexmap::{IndexMap, IndexSet};
929    use std::sync::Arc;
930
931    type CurrentNetwork = snarkvm::console::network::MainnetV0;
932
933    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
934    fn sample_test_instance(
935        committee_round: Option<u64>,
936        max_gc_rounds: u64,
937        rng: &mut TestRng,
938    ) -> (
939        Committee<CurrentNetwork>,
940        Account<CurrentNetwork>,
941        Arc<MockLedgerService<CurrentNetwork>>,
942        Storage<CurrentNetwork>,
943    ) {
944        let committee = match committee_round {
945            Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
946            None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
947        };
948        let account = Account::new(rng).unwrap();
949        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
950        let transmissions = Arc::new(BFTMemoryService::new());
951        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
952
953        (committee, account, ledger, storage)
954    }
955
956    #[test]
957    #[tracing_test::traced_test]
958    fn test_is_leader_quorum_odd() -> Result<()> {
959        let rng = &mut TestRng::default();
960
961        // Sample batch certificates.
962        let mut certificates = IndexSet::new();
963        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
964        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
965        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
966        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
967
968        // Initialize the committee.
969        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
970            1,
971            vec![
972                certificates[0].author(),
973                certificates[1].author(),
974                certificates[2].author(),
975                certificates[3].author(),
976            ],
977            rng,
978        );
979
980        // Initialize the ledger.
981        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
982        // Initialize the storage.
983        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
984        // Initialize the account.
985        let account = Account::new(rng)?;
986        // Initialize the BFT.
987        let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
988        assert!(bft.is_timer_expired());
989        // Ensure this call succeeds on an odd round.
990        let result = bft.is_leader_quorum_or_nonleaders_available(1);
991        // If timer has expired but quorum threshold is not reached, return 'false'.
992        assert!(!result);
993        // Insert certificates into storage.
994        for certificate in certificates.iter() {
995            storage.testing_only_insert_certificate_testing_only(certificate.clone());
996        }
997        // Ensure this call succeeds on an odd round.
998        let result = bft.is_leader_quorum_or_nonleaders_available(1);
999        assert!(result); // no previous leader certificate
1000        // Set the leader certificate.
1001        let leader_certificate = sample_batch_certificate(rng);
1002        *bft.leader_certificate.write() = Some(leader_certificate);
1003        // Ensure this call succeeds on an odd round.
1004        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1005        assert!(result); // should now fall through to the end of function
1006
1007        Ok(())
1008    }
1009
1010    #[test]
1011    #[tracing_test::traced_test]
1012    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1013        let rng = &mut TestRng::default();
1014
1015        // Sample the test instance.
1016        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1017        assert_eq!(committee.starting_round(), 1);
1018        assert_eq!(storage.current_round(), 1);
1019        assert_eq!(storage.max_gc_rounds(), 10);
1020
1021        // Initialize the BFT.
1022        let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1023        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1024
1025        // Store is at round 1, and we are checking for round 2.
1026        // Ensure this call fails on an even round.
1027        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1028        assert!(!result);
1029        Ok(())
1030    }
1031
1032    #[test]
1033    #[tracing_test::traced_test]
1034    fn test_is_leader_quorum_even() -> Result<()> {
1035        let rng = &mut TestRng::default();
1036
1037        // Sample the test instance.
1038        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1039        assert_eq!(committee.starting_round(), 2);
1040        assert_eq!(storage.current_round(), 2);
1041        assert_eq!(storage.max_gc_rounds(), 10);
1042
1043        // Initialize the BFT.
1044        let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1045        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1046
1047        // Ensure this call fails on an even round.
1048        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1049        assert!(!result);
1050        Ok(())
1051    }
1052
1053    #[test]
1054    #[tracing_test::traced_test]
1055    fn test_is_even_round_ready() -> Result<()> {
1056        let rng = &mut TestRng::default();
1057
1058        // Sample batch certificates.
1059        let mut certificates = IndexSet::new();
1060        certificates.insert(sample_batch_certificate_for_round(2, rng));
1061        certificates.insert(sample_batch_certificate_for_round(2, rng));
1062        certificates.insert(sample_batch_certificate_for_round(2, rng));
1063        certificates.insert(sample_batch_certificate_for_round(2, rng));
1064
1065        // Initialize the committee.
1066        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1067            2,
1068            vec![
1069                certificates[0].author(),
1070                certificates[1].author(),
1071                certificates[2].author(),
1072                certificates[3].author(),
1073            ],
1074            rng,
1075        );
1076
1077        // Initialize the ledger.
1078        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1079        // Initialize the storage.
1080        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1081        // Initialize the account.
1082        let account = Account::new(rng)?;
1083        // Initialize the BFT.
1084        let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1085        // Set the leader certificate.
1086        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1087        *bft.leader_certificate.write() = Some(leader_certificate);
1088        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1089        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1090        assert!(!result);
1091        // Once quorum threshold is reached, we are ready for the next round.
1092        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1093        assert!(result);
1094
1095        // Initialize a new BFT.
1096        let bft_timer =
1097            BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1098        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1099        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1100        if !bft_timer.is_timer_expired() {
1101            assert!(!result);
1102        }
1103        // Wait for the timer to expire.
1104        let leader_certificate_timeout =
1105            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1106        std::thread::sleep(leader_certificate_timeout);
1107        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1108        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1109        if bft_timer.is_timer_expired() {
1110            assert!(result);
1111        } else {
1112            assert!(!result);
1113        }
1114
1115        Ok(())
1116    }
1117
1118    #[test]
1119    #[tracing_test::traced_test]
1120    fn test_update_leader_certificate_odd() -> Result<()> {
1121        let rng = &mut TestRng::default();
1122
1123        // Sample the test instance.
1124        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1125        assert_eq!(storage.max_gc_rounds(), 10);
1126
1127        // Initialize the BFT.
1128        let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1129
1130        // Ensure this call fails on an odd round.
1131        let result = bft.update_leader_certificate_to_even_round(1);
1132        assert!(!result);
1133        Ok(())
1134    }
1135
1136    #[test]
1137    #[tracing_test::traced_test]
1138    fn test_update_leader_certificate_bad_round() -> Result<()> {
1139        let rng = &mut TestRng::default();
1140
1141        // Sample the test instance.
1142        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1143        assert_eq!(storage.max_gc_rounds(), 10);
1144
1145        // Initialize the BFT.
1146        let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1147
1148        // Ensure this call succeeds on an even round.
1149        let result = bft.update_leader_certificate_to_even_round(6);
1150        assert!(!result);
1151        Ok(())
1152    }
1153
1154    #[test]
1155    #[tracing_test::traced_test]
1156    fn test_update_leader_certificate_even() -> Result<()> {
1157        let rng = &mut TestRng::default();
1158
1159        // Set the current round.
1160        let current_round = 3;
1161
1162        // Sample the certificates.
1163        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1164            current_round,
1165            rng,
1166        );
1167
1168        // Initialize the committee.
1169        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1170            2,
1171            vec![
1172                certificates[0].author(),
1173                certificates[1].author(),
1174                certificates[2].author(),
1175                certificates[3].author(),
1176            ],
1177            rng,
1178        );
1179
1180        // Initialize the ledger.
1181        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1182
1183        // Initialize the storage.
1184        let transmissions = Arc::new(BFTMemoryService::new());
1185        let storage = Storage::new(ledger.clone(), transmissions, 10);
1186        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1187        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1188        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1189        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1190        assert_eq!(storage.current_round(), 2);
1191
1192        // Retrieve the leader certificate.
1193        let leader = committee.get_leader(2).unwrap();
1194        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1195
1196        // Initialize the BFT.
1197        let account = Account::new(rng)?;
1198        let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;
1199
1200        // Set the leader certificate.
1201        *bft.leader_certificate.write() = Some(leader_certificate);
1202
1203        // Update the leader certificate.
1204        // Ensure this call succeeds on an even round.
1205        let result = bft.update_leader_certificate_to_even_round(2);
1206        assert!(result);
1207
1208        Ok(())
1209    }
1210
1211    #[tokio::test]
1212    #[tracing_test::traced_test]
1213    async fn test_order_dag_with_dfs() -> Result<()> {
1214        let rng = &mut TestRng::default();
1215
1216        // Sample the test instance.
1217        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1218
1219        // Initialize the round parameters.
1220        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1221        let current_round = previous_round + 1;
1222
1223        // Sample the current certificate and previous certificates.
1224        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1225            current_round,
1226            rng,
1227        );
1228
1229        /* Test GC */
1230
1231        // Ensure the function succeeds in returning only certificates above GC.
1232        {
1233            // Initialize the storage.
1234            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1235            // Initialize the BFT.
1236            let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
1237
1238            // Insert a mock DAG in the BFT.
1239            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1240
1241            // Insert the previous certificates into the BFT.
1242            for certificate in previous_certificates.clone() {
1243                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1244            }
1245
1246            // Ensure this call succeeds and returns all given certificates.
1247            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1248            assert!(result.is_ok());
1249            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1250            assert_eq!(candidate_certificates.len(), 1);
1251            let expected_certificates = vec![certificate.clone()];
1252            assert_eq!(
1253                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1254                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1255            );
1256            assert_eq!(candidate_certificates, expected_certificates);
1257        }
1258
1259        /* Test normal case */
1260
1261        // Ensure the function succeeds in returning all given certificates.
1262        {
1263            // Initialize the storage.
1264            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1265            // Initialize the BFT.
1266            let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1267
1268            // Insert a mock DAG in the BFT.
1269            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1270
1271            // Insert the previous certificates into the BFT.
1272            for certificate in previous_certificates.clone() {
1273                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1274            }
1275
1276            // Ensure this call succeeds and returns all given certificates.
1277            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1278            assert!(result.is_ok());
1279            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1280            assert_eq!(candidate_certificates.len(), 5);
1281            let expected_certificates = vec![
1282                previous_certificates[0].clone(),
1283                previous_certificates[1].clone(),
1284                previous_certificates[2].clone(),
1285                previous_certificates[3].clone(),
1286                certificate,
1287            ];
1288            assert_eq!(
1289                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1290                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1291            );
1292            assert_eq!(candidate_certificates, expected_certificates);
1293        }
1294
1295        Ok(())
1296    }
1297
1298    #[test]
1299    #[tracing_test::traced_test]
1300    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1301        let rng = &mut TestRng::default();
1302
1303        // Sample the test instance.
1304        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1305        assert_eq!(committee.starting_round(), 1);
1306        assert_eq!(storage.current_round(), 1);
1307        assert_eq!(storage.max_gc_rounds(), 1);
1308
1309        // Initialize the round parameters.
1310        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1311        let current_round = previous_round + 1;
1312
1313        // Sample the current certificate and previous certificates.
1314        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1315            current_round,
1316            rng,
1317        );
1318        // Construct the previous certificate IDs.
1319        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1320
1321        /* Test missing previous certificate. */
1322
1323        // Initialize the BFT.
1324        let bft = BFT::new(account, storage, ledger, None, &[], StorageMode::new_test(None))?;
1325
1326        // The expected error message.
1327        let error_msg = format!(
1328            "Missing previous certificate {} for round {previous_round}",
1329            crate::helpers::fmt_id(previous_certificate_ids[3]),
1330        );
1331
1332        // Ensure this call fails on a missing previous certificate.
1333        let result = bft.order_dag_with_dfs::<false>(certificate);
1334        assert!(result.is_err());
1335        assert_eq!(result.unwrap_err().to_string(), error_msg);
1336        Ok(())
1337    }
1338
1339    #[tokio::test]
1340    #[tracing_test::traced_test]
1341    async fn test_bft_gc_on_commit() -> Result<()> {
1342        let rng = &mut TestRng::default();
1343
1344        // Initialize the round parameters.
1345        let max_gc_rounds = 1;
1346        let committee_round = 0;
1347        let commit_round = 2;
1348        let current_round = commit_round + 1;
1349
1350        // Sample the certificates.
1351        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1352            current_round,
1353            rng,
1354        );
1355
1356        // Initialize the committee.
1357        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1358            committee_round,
1359            vec![
1360                certificates[0].author(),
1361                certificates[1].author(),
1362                certificates[2].author(),
1363                certificates[3].author(),
1364            ],
1365            rng,
1366        );
1367
1368        // Initialize the ledger.
1369        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1370
1371        // Initialize the storage.
1372        let transmissions = Arc::new(BFTMemoryService::new());
1373        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1374        // Insert the certificates into the storage.
1375        for certificate in certificates.iter() {
1376            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1377        }
1378
1379        // Get the leader certificate.
1380        let leader = committee.get_leader(commit_round).unwrap();
1381        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1382
1383        // Initialize the BFT.
1384        let account = Account::new(rng)?;
1385        let bft = BFT::new(account, storage.clone(), ledger, None, &[], StorageMode::new_test(None))?;
1386        // Insert a mock DAG in the BFT.
1387        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1388
1389        // Ensure that the `gc_round` has not been updated yet.
1390        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1391
1392        // Insert the certificates into the BFT.
1393        for certificate in certificates {
1394            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1395        }
1396
1397        // Commit the leader certificate.
1398        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1399
1400        // Ensure that the `gc_round` has been updated.
1401        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1402
1403        Ok(())
1404    }
1405
1406    #[tokio::test]
1407    #[tracing_test::traced_test]
1408    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1409        let rng = &mut TestRng::default();
1410
1411        // Initialize the round parameters.
1412        let max_gc_rounds = 1;
1413        let committee_round = 0;
1414        let commit_round = 2;
1415        let current_round = commit_round + 1;
1416
1417        // Sample the current certificate and previous certificates.
1418        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1419            current_round,
1420            rng,
1421        );
1422
1423        // Initialize the committee.
1424        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1425            committee_round,
1426            vec![
1427                certificates[0].author(),
1428                certificates[1].author(),
1429                certificates[2].author(),
1430                certificates[3].author(),
1431            ],
1432            rng,
1433        );
1434
1435        // Initialize the ledger.
1436        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1437
1438        // Initialize the storage.
1439        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1440        // Insert the certificates into the storage.
1441        for certificate in certificates.iter() {
1442            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1443        }
1444
1445        // Get the leader certificate.
1446        let leader = committee.get_leader(commit_round).unwrap();
1447        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1448
1449        // Initialize the BFT.
1450        let account = Account::new(rng)?;
1451        let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
1452
1453        // Insert a mock DAG in the BFT.
1454        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1455
1456        // Insert the previous certificates into the BFT.
1457        for certificate in certificates.clone() {
1458            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1459        }
1460
1461        // Commit the leader certificate.
1462        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1463
1464        // Simulate a bootup of the BFT.
1465
1466        // Initialize a new instance of storage.
1467        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1468        // Initialize a new instance of BFT.
1469        let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], StorageMode::new_test(None))?;
1470
1471        // Sync the BFT DAG at bootup.
1472        bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1473
1474        // Check that the BFT starts from the same last committed round.
1475        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1476
1477        // Ensure that both BFTs have committed the leader certificate.
1478        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1479        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1480
1481        // Check the state of the bootup BFT.
1482        for certificate in certificates {
1483            let certificate_round = certificate.round();
1484            let certificate_id = certificate.id();
1485            // Check that the bootup BFT has committed the certificates.
1486            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1487            // Check that the bootup BFT does not contain the certificates in its graph, because
1488            // it should not need to order them again in subsequent subdags.
1489            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1490        }
1491
1492        Ok(())
1493    }
1494
1495    #[tokio::test]
1496    #[tracing_test::traced_test]
1497    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1498        /*
1499        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1500        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1501        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1502        */
1503
1504        let rng = &mut TestRng::default();
1505
1506        // Initialize the round parameters.
1507        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1508        let committee_round = 0;
1509        let commit_round = 2;
1510        let current_round = commit_round + 1;
1511        let next_round = current_round + 1;
1512
1513        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1514        let (round_to_certificates_map, committee) = {
1515            let private_keys = vec![
1516                PrivateKey::new(rng).unwrap(),
1517                PrivateKey::new(rng).unwrap(),
1518                PrivateKey::new(rng).unwrap(),
1519                PrivateKey::new(rng).unwrap(),
1520            ];
1521            let addresses = vec![
1522                Address::try_from(private_keys[0])?,
1523                Address::try_from(private_keys[1])?,
1524                Address::try_from(private_keys[2])?,
1525                Address::try_from(private_keys[3])?,
1526            ];
1527            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1528                committee_round,
1529                addresses,
1530                rng,
1531            );
1532            // Initialize a mapping from the round number to the set of batch certificates in the round.
1533            let mut round_to_certificates_map: IndexMap<
1534                u64,
1535                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1536            > = IndexMap::new();
1537            let mut previous_certificates = IndexSet::with_capacity(4);
1538            // Initialize the genesis batch certificates.
1539            for _ in 0..4 {
1540                previous_certificates.insert(sample_batch_certificate(rng));
1541            }
1542            for round in 0..commit_round + 3 {
1543                let mut current_certificates = IndexSet::new();
1544                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1545                    IndexSet::new()
1546                } else {
1547                    previous_certificates.iter().map(|c| c.id()).collect()
1548                };
1549                let transmission_ids =
1550                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1551                        .into_iter()
1552                        .collect::<IndexSet<_>>();
1553                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1554                let committee_id = committee.id();
1555                for (i, private_key_1) in private_keys.iter().enumerate() {
1556                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1557                        private_key_1,
1558                        round,
1559                        timestamp,
1560                        committee_id,
1561                        transmission_ids.clone(),
1562                        previous_certificate_ids.clone(),
1563                        rng,
1564                    )
1565                    .unwrap();
1566                    let mut signatures = IndexSet::with_capacity(4);
1567                    for (j, private_key_2) in private_keys.iter().enumerate() {
1568                        if i != j {
1569                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1570                        }
1571                    }
1572                    let certificate =
1573                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1574                    current_certificates.insert(certificate);
1575                }
1576                // Update the mapping.
1577                round_to_certificates_map.insert(round, current_certificates.clone());
1578                previous_certificates = current_certificates.clone();
1579            }
1580            (round_to_certificates_map, committee)
1581        };
1582
1583        // Initialize the ledger.
1584        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1585        // Initialize the storage.
1586        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1587        // Get the leaders for the next 2 commit rounds.
1588        let leader = committee.get_leader(commit_round).unwrap();
1589        let next_leader = committee.get_leader(next_round).unwrap();
1590        // Insert the pre shutdown certificates into the storage.
1591        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1592        for i in 1..=commit_round {
1593            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1594            if i == commit_round {
1595                // Only insert the leader certificate for the commit round.
1596                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1597                if let Some(c) = leader_certificate {
1598                    pre_shutdown_certificates.push(c.clone());
1599                }
1600                continue;
1601            }
1602            pre_shutdown_certificates.extend(certificates);
1603        }
1604        for certificate in pre_shutdown_certificates.iter() {
1605            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1606        }
1607        // Insert the post shutdown certificates into the storage.
1608        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1609            Vec::new();
1610        for j in commit_round..=commit_round + 2 {
1611            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1612            post_shutdown_certificates.extend(certificate);
1613        }
1614        for certificate in post_shutdown_certificates.iter() {
1615            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1616        }
1617        // Get the leader certificates.
1618        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1619        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1620
1621        // Initialize the BFT without bootup.
1622        let account = Account::new(rng)?;
1623        let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], StorageMode::new_test(None))?;
1624
1625        // Insert a mock DAG in the BFT without bootup.
1626        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1627
1628        // Insert the certificates into the BFT without bootup.
1629        for certificate in pre_shutdown_certificates.clone() {
1630            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1631        }
1632
1633        // Insert the post shutdown certificates into the BFT without bootup.
1634        for certificate in post_shutdown_certificates.clone() {
1635            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1636        }
1637        // Commit the second leader certificate.
1638        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1639        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1640        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1641
1642        // Simulate a bootup of the BFT.
1643
1644        // Initialize a new instance of storage.
1645        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1646
1647        // Initialize a new instance of BFT with bootup.
1648        let bootup_bft =
1649            BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1650
1651        // Sync the BFT DAG at bootup.
1652        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1653
1654        // Insert the post shutdown certificates to the storage and BFT with bootup.
1655        for certificate in post_shutdown_certificates.iter() {
1656            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1657        }
1658        for certificate in post_shutdown_certificates.clone() {
1659            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1660        }
1661        // Commit the second leader certificate.
1662        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1663        let commit_subdag_metadata_bootup =
1664            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1665        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1666        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1667
1668        // Check that the final state of both BFTs is the same.
1669
1670        // Check that both BFTs start from the same last committed round.
1671        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1672
1673        // Ensure that both BFTs have committed the leader certificates.
1674        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1675        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1676        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1677        assert!(
1678            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1679        );
1680
1681        // Check that the bootup BFT has committed the pre shutdown certificates.
1682        for certificate in pre_shutdown_certificates.clone() {
1683            let certificate_round = certificate.round();
1684            let certificate_id = certificate.id();
1685            // Check that both BFTs have committed the certificates.
1686            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1687            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1688            // Check that the bootup BFT does not contain the certificates in its graph, because
1689            // it should not need to order them again in subsequent subdags.
1690            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1691            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1692        }
1693
1694        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1695        for certificate in committed_certificates_bootup.clone() {
1696            let certificate_round = certificate.round();
1697            let certificate_id = certificate.id();
1698            // Check that the both BFTs have committed the certificates.
1699            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1700            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1701            // Check that the bootup BFT does not contain the certificates in its graph, because
1702            // it should not need to order them again in subsequent subdags.
1703            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1704            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1705        }
1706
1707        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1708        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1709
1710        Ok(())
1711    }
1712
1713    #[tokio::test]
1714    #[tracing_test::traced_test]
1715    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1716        /*
1717        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1718        2. Add post shutdown certificates to the bootup BFT.
1719        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1720        */
1721
1722        let rng = &mut TestRng::default();
1723
1724        // Initialize the round parameters.
1725        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1726        let committee_round = 0;
1727        let commit_round = 2;
1728        let current_round = commit_round + 1;
1729        let next_round = current_round + 1;
1730
1731        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1732        let (round_to_certificates_map, committee) = {
1733            let private_keys = vec![
1734                PrivateKey::new(rng).unwrap(),
1735                PrivateKey::new(rng).unwrap(),
1736                PrivateKey::new(rng).unwrap(),
1737                PrivateKey::new(rng).unwrap(),
1738            ];
1739            let addresses = vec![
1740                Address::try_from(private_keys[0])?,
1741                Address::try_from(private_keys[1])?,
1742                Address::try_from(private_keys[2])?,
1743                Address::try_from(private_keys[3])?,
1744            ];
1745            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1746                committee_round,
1747                addresses,
1748                rng,
1749            );
1750            // Initialize a mapping from the round number to the set of batch certificates in the round.
1751            let mut round_to_certificates_map: IndexMap<
1752                u64,
1753                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1754            > = IndexMap::new();
1755            let mut previous_certificates = IndexSet::with_capacity(4);
1756            // Initialize the genesis batch certificates.
1757            for _ in 0..4 {
1758                previous_certificates.insert(sample_batch_certificate(rng));
1759            }
1760            for round in 0..=commit_round + 2 {
1761                let mut current_certificates = IndexSet::new();
1762                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1763                    IndexSet::new()
1764                } else {
1765                    previous_certificates.iter().map(|c| c.id()).collect()
1766                };
1767                let transmission_ids =
1768                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1769                        .into_iter()
1770                        .collect::<IndexSet<_>>();
1771                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1772                let committee_id = committee.id();
1773                for (i, private_key_1) in private_keys.iter().enumerate() {
1774                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1775                        private_key_1,
1776                        round,
1777                        timestamp,
1778                        committee_id,
1779                        transmission_ids.clone(),
1780                        previous_certificate_ids.clone(),
1781                        rng,
1782                    )
1783                    .unwrap();
1784                    let mut signatures = IndexSet::with_capacity(4);
1785                    for (j, private_key_2) in private_keys.iter().enumerate() {
1786                        if i != j {
1787                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1788                        }
1789                    }
1790                    let certificate =
1791                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1792                    current_certificates.insert(certificate);
1793                }
1794                // Update the mapping.
1795                round_to_certificates_map.insert(round, current_certificates.clone());
1796                previous_certificates = current_certificates.clone();
1797            }
1798            (round_to_certificates_map, committee)
1799        };
1800
1801        // Initialize the ledger.
1802        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1803        // Initialize the storage.
1804        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1805        // Get the leaders for the next 2 commit rounds.
1806        let leader = committee.get_leader(commit_round).unwrap();
1807        let next_leader = committee.get_leader(next_round).unwrap();
1808        // Insert the pre shutdown certificates into the storage.
1809        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1810        for i in 1..=commit_round {
1811            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1812            if i == commit_round {
1813                // Only insert the leader certificate for the commit round.
1814                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1815                if let Some(c) = leader_certificate {
1816                    pre_shutdown_certificates.push(c.clone());
1817                }
1818                continue;
1819            }
1820            pre_shutdown_certificates.extend(certificates);
1821        }
1822        for certificate in pre_shutdown_certificates.iter() {
1823            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1824        }
1825        // Initialize the bootup BFT.
1826        let account = Account::new(rng)?;
1827        let bootup_bft =
1828            BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], StorageMode::new_test(None))?;
1829        // Insert a mock DAG in the BFT without bootup.
1830        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1831        // Sync the BFT DAG at bootup.
1832        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1833
1834        // Insert the post shutdown certificates into the storage.
1835        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1836            Vec::new();
1837        for j in commit_round..=commit_round + 2 {
1838            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1839            post_shutdown_certificates.extend(certificate);
1840        }
1841        for certificate in post_shutdown_certificates.iter() {
1842            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1843        }
1844
1845        // Insert the post shutdown certificates into the DAG.
1846        for certificate in post_shutdown_certificates.clone() {
1847            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1848        }
1849
1850        // Get the next leader certificate to commit.
1851        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1852        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1853        let committed_certificates = commit_subdag.values().flatten();
1854
1855        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1856        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1857            for committed_certificate in committed_certificates.clone() {
1858                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1859            }
1860        }
1861        Ok(())
1862    }
1863}