snarkos_node_bft/
bft.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    Primary,
19    helpers::{
20        BFTReceiver,
21        ConsensusSender,
22        DAG,
23        PrimaryReceiver,
24        PrimarySender,
25        Storage,
26        fmt_id,
27        init_bft_channels,
28        now,
29    },
30};
31use snarkos_account::Account;
32use snarkos_node_bft_ledger_service::LedgerService;
33use snarkos_node_sync::{BlockSync, Ping};
34use snarkvm::{
35    console::account::Address,
36    ledger::{
37        block::Transaction,
38        committee::Committee,
39        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
40        puzzle::{Solution, SolutionID},
41    },
42    prelude::{Field, Network, Result, bail, ensure},
43};
44
45use aleo_std::StorageMode;
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48#[cfg(feature = "locktick")]
49use locktick::{
50    parking_lot::{Mutex, RwLock},
51    tokio::Mutex as TMutex,
52};
53#[cfg(not(feature = "locktick"))]
54use parking_lot::{Mutex, RwLock};
55use std::{
56    collections::{BTreeMap, HashSet},
57    future::Future,
58    net::SocketAddr,
59    sync::{
60        Arc,
61        atomic::{AtomicI64, Ordering},
62    },
63};
64#[cfg(not(feature = "locktick"))]
65use tokio::sync::Mutex as TMutex;
66use tokio::{
67    sync::{OnceCell, oneshot},
68    task::JoinHandle,
69};
70
71#[derive(Clone)]
72pub struct BFT<N: Network> {
73    /// The primary for this node.
74    primary: Primary<N>,
75    /// The DAG of batches from which we build the blockchain.
76    dag: Arc<RwLock<DAG<N>>>,
77    /// The batch certificate of the leader from the current even round, if one was present.
78    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
79    /// The timer for the leader certificate to be received.
80    leader_certificate_timer: Arc<AtomicI64>,
81    /// The consensus sender.
82    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
83    /// Handles for all spawned tasks.
84    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
85    /// The BFT lock.
86    lock: Arc<TMutex<()>>,
87}
88
89impl<N: Network> BFT<N> {
90    /// Initializes a new instance of the BFT.
91    #[allow(clippy::too_many_arguments)]
92    pub fn new(
93        account: Account<N>,
94        storage: Storage<N>,
95        ledger: Arc<dyn LedgerService<N>>,
96        block_sync: Arc<BlockSync<N>>,
97        ip: Option<SocketAddr>,
98        trusted_validators: &[SocketAddr],
99        storage_mode: StorageMode,
100        dev: Option<u16>,
101    ) -> Result<Self> {
102        Ok(Self {
103            primary: Primary::new(account, storage, ledger, block_sync, ip, trusted_validators, storage_mode, dev)?,
104            dag: Default::default(),
105            leader_certificate: Default::default(),
106            leader_certificate_timer: Default::default(),
107            consensus_sender: Default::default(),
108            handles: Default::default(),
109            lock: Default::default(),
110        })
111    }
112
113    /// Run the BFT instance.
114    ///
115    /// This will return as soon as all required tasks are spawned.
116    /// The function must not be called more than once per instance.
117    pub async fn run(
118        &mut self,
119        ping: Option<Arc<Ping<N>>>,
120        consensus_sender: Option<ConsensusSender<N>>,
121        primary_sender: PrimarySender<N>,
122        primary_receiver: PrimaryReceiver<N>,
123    ) -> Result<()> {
124        info!("Starting the BFT instance...");
125        // Initialize the BFT channels.
126        let (bft_sender, bft_receiver) = init_bft_channels::<N>();
127        // First, start the BFT handlers.
128        self.start_handlers(bft_receiver);
129        // Next, run the primary instance.
130        self.primary.run(ping, Some(bft_sender), primary_sender, primary_receiver).await?;
131        // Lastly, set the consensus sender.
132        // Note: This ensures during initial syncing, that the BFT does not advance the ledger.
133        if let Some(consensus_sender) = consensus_sender {
134            self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
135        }
136        Ok(())
137    }
138
139    /// Returns `true` if the primary is synced.
140    pub fn is_synced(&self) -> bool {
141        self.primary.is_synced()
142    }
143
144    /// Returns the primary.
145    pub const fn primary(&self) -> &Primary<N> {
146        &self.primary
147    }
148
149    /// Returns the storage.
150    pub const fn storage(&self) -> &Storage<N> {
151        self.primary.storage()
152    }
153
154    /// Returns the ledger.
155    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
156        self.primary.ledger()
157    }
158
159    /// Returns the leader of the current even round, if one was present.
160    pub fn leader(&self) -> Option<Address<N>> {
161        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
162    }
163
164    /// Returns the certificate of the leader from the current even round, if one was present.
165    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
166        &self.leader_certificate
167    }
168}
169
170impl<N: Network> BFT<N> {
171    /// Returns the number of unconfirmed transmissions.
172    pub fn num_unconfirmed_transmissions(&self) -> usize {
173        self.primary.num_unconfirmed_transmissions()
174    }
175
176    /// Returns the number of unconfirmed ratifications.
177    pub fn num_unconfirmed_ratifications(&self) -> usize {
178        self.primary.num_unconfirmed_ratifications()
179    }
180
181    /// Returns the number of solutions.
182    pub fn num_unconfirmed_solutions(&self) -> usize {
183        self.primary.num_unconfirmed_solutions()
184    }
185
186    /// Returns the number of unconfirmed transactions.
187    pub fn num_unconfirmed_transactions(&self) -> usize {
188        self.primary.num_unconfirmed_transactions()
189    }
190}
191
192impl<N: Network> BFT<N> {
193    /// Returns the worker transmission IDs.
194    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
195        self.primary.worker_transmission_ids()
196    }
197
198    /// Returns the worker transmissions.
199    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
200        self.primary.worker_transmissions()
201    }
202
203    /// Returns the worker solutions.
204    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
205        self.primary.worker_solutions()
206    }
207
208    /// Returns the worker transactions.
209    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
210        self.primary.worker_transactions()
211    }
212}
213
214impl<N: Network> BFT<N> {
215    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
216    fn update_to_next_round(&self, current_round: u64) -> bool {
217        // Ensure the current round is at least the storage round (this is a sanity check).
218        let storage_round = self.storage().current_round();
219        if current_round < storage_round {
220            debug!(
221                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
222            );
223            return false;
224        }
225
226        // Determine if the BFT is ready to update to the next round.
227        let is_ready = match current_round % 2 == 0 {
228            true => self.update_leader_certificate_to_even_round(current_round),
229            false => self.is_leader_quorum_or_nonleaders_available(current_round),
230        };
231
232        #[cfg(feature = "metrics")]
233        {
234            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
235            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
236            if start > 0 {
237                let end = now();
238                let elapsed = std::time::Duration::from_secs((end - start) as u64);
239                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
240            }
241        }
242
243        // Log whether the round is going to update.
244        if current_round % 2 == 0 {
245            // Determine if there is a leader certificate.
246            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
247                // Ensure the state of the leader certificate is consistent with the BFT being ready.
248                if !is_ready {
249                    trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
250                }
251                // Log the leader election.
252                let leader_round = leader_certificate.round();
253                match leader_round == current_round {
254                    true => {
255                        info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
256                        #[cfg(feature = "metrics")]
257                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
258                    }
259                    false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
260                }
261            } else {
262                match is_ready {
263                    true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
264                    false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()),
265                }
266            }
267        }
268
269        // If the BFT is ready, then update to the next round.
270        if is_ready {
271            // Update to the next round in storage.
272            if let Err(e) = self.storage().increment_to_next_round(current_round) {
273                warn!("BFT failed to increment to the next round from round {current_round} - {e}");
274                return false;
275            }
276            // Update the timer for the leader certificate.
277            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
278        }
279
280        is_ready
281    }
282
283    /// Updates the leader certificate to the current even round,
284    /// returning `true` if the BFT is ready to update to the next round.
285    ///
286    /// This method runs on every even round, by determining the leader of the current even round,
287    /// and setting the leader certificate to their certificate in the round, if they were present.
288    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
289        // Retrieve the current round.
290        let current_round = self.storage().current_round();
291        // Ensure the current round matches the given round.
292        if current_round != even_round {
293            warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
294            return false;
295        }
296
297        // If the current round is odd, return false.
298        if current_round % 2 != 0 || current_round < 2 {
299            error!("BFT cannot update the leader certificate in an odd round");
300            return false;
301        }
302
303        // Retrieve the certificates for the current round.
304        let current_certificates = self.storage().get_certificates_for_round(current_round);
305        // If there are no current certificates, set the leader certificate to 'None', and return early.
306        if current_certificates.is_empty() {
307            // Set the leader certificate to 'None'.
308            *self.leader_certificate.write() = None;
309            return false;
310        }
311
312        // Retrieve the committee lookback of the current round.
313        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
314            Ok(committee) => committee,
315            Err(e) => {
316                error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
317                return false;
318            }
319        };
320        // Determine the leader of the current round.
321        let leader = match self.ledger().latest_leader() {
322            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
323            _ => {
324                // Compute the leader for the current round.
325                let computed_leader = match committee_lookback.get_leader(current_round) {
326                    Ok(leader) => leader,
327                    Err(e) => {
328                        error!("BFT failed to compute the leader for the even round {current_round} - {e}");
329                        return false;
330                    }
331                };
332
333                // Cache the computed leader.
334                self.ledger().update_latest_leader(current_round, computed_leader);
335
336                computed_leader
337            }
338        };
339        // Find and set the leader certificate, if the leader was present in the current even round.
340        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
341        *self.leader_certificate.write() = leader_certificate.cloned();
342
343        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
344    }
345
346    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
347    ///  - If the leader certificate is set for the current even round.
348    ///  - The timer for the leader certificate has expired.
349    fn is_even_round_ready_for_next_round(
350        &self,
351        certificates: IndexSet<BatchCertificate<N>>,
352        committee: Committee<N>,
353        current_round: u64,
354    ) -> bool {
355        // Retrieve the authors for the current round.
356        let authors = certificates.into_iter().map(|c| c.author()).collect();
357        // Check if quorum threshold is reached.
358        if !committee.is_quorum_threshold_reached(&authors) {
359            trace!("BFT failed to reach quorum threshold in even round {current_round}");
360            return false;
361        }
362        // If the leader certificate is set for the current even round, return 'true'.
363        if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
364            if leader_certificate.round() == current_round {
365                return true;
366            }
367        }
368        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
369        if self.is_timer_expired() {
370            debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
371            return true;
372        }
373        // Otherwise, return 'false'.
374        false
375    }
376
377    /// Returns `true` if the timer for the leader certificate has expired.
378    ///
379    /// This is always true for a new BFT instance.
380    fn is_timer_expired(&self) -> bool {
381        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
382    }
383
384    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
385    ///  - The leader certificate is `None`.
386    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
387    ///  - The leader certificate timer has expired.
388    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
389        // Retrieve the current round.
390        let current_round = self.storage().current_round();
391        // Ensure the current round matches the given round.
392        if current_round != odd_round {
393            warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
394            return false;
395        }
396        // If the current round is even, return false.
397        if current_round % 2 != 1 {
398            error!("BFT does not compute stakes for the leader certificate in an even round");
399            return false;
400        }
401        // Retrieve the certificates for the current round.
402        let current_certificates = self.storage().get_certificates_for_round(current_round);
403        // Retrieve the committee lookback for the current round.
404        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
405            Ok(committee) => committee,
406            Err(e) => {
407                error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
408                return false;
409            }
410        };
411        // Retrieve the authors of the current certificates.
412        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
413        // Check if quorum threshold is reached.
414        if !committee_lookback.is_quorum_threshold_reached(&authors) {
415            trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
416            return false;
417        }
418        // Retrieve the leader certificate.
419        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
420            // If there is no leader certificate for the previous round, return 'true'.
421            return true;
422        };
423        // Compute the stake for the leader certificate.
424        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
425            leader_certificate.id(),
426            current_certificates,
427            &committee_lookback,
428        );
429        // Return 'true' if any of the following conditions hold:
430        stake_with_leader >= committee_lookback.availability_threshold()
431            || stake_without_leader >= committee_lookback.quorum_threshold()
432            || self.is_timer_expired()
433    }
434
435    /// Computes the amount of stake that has & has not signed for the leader certificate.
436    fn compute_stake_for_leader_certificate(
437        &self,
438        leader_certificate_id: Field<N>,
439        current_certificates: IndexSet<BatchCertificate<N>>,
440        current_committee: &Committee<N>,
441    ) -> (u64, u64) {
442        // If there are no current certificates, return early.
443        if current_certificates.is_empty() {
444            return (0, 0);
445        }
446
447        // Initialize a tracker for the stake with the leader.
448        let mut stake_with_leader = 0u64;
449        // Initialize a tracker for the stake without the leader.
450        let mut stake_without_leader = 0u64;
451        // Iterate over the current certificates.
452        for certificate in current_certificates {
453            // Retrieve the stake for the author of the certificate.
454            let stake = current_committee.get_stake(certificate.author());
455            // Determine if the certificate includes the leader.
456            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
457                // If the certificate includes the leader, add the stake to the stake with the leader.
458                true => stake_with_leader = stake_with_leader.saturating_add(stake),
459                // If the certificate does not include the leader, add the stake to the stake without the leader.
460                false => stake_without_leader = stake_without_leader.saturating_add(stake),
461            }
462        }
463        // Return the stake with the leader, and the stake without the leader.
464        (stake_with_leader, stake_without_leader)
465    }
466}
467
468impl<N: Network> BFT<N> {
469    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
470    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
471        &self,
472        certificate: BatchCertificate<N>,
473    ) -> Result<()> {
474        // Acquire the BFT lock.
475        let _lock = self.lock.lock().await;
476
477        // Retrieve the round of the new certificate to add to the DAG.
478        let certificate_round = certificate.round();
479
480        // Insert the certificate into the DAG.
481        self.dag.write().insert(certificate);
482
483        // Get the previous round number.
484        let commit_round = certificate_round.saturating_sub(1);
485
486        // Leaders are elected in even rounds.
487        // If the previous round is odd, the current round cannot commit any leader certs.
488        if commit_round % 2 != 0 || commit_round < 2 {
489            return Ok(());
490        }
491        // If the commit round is at or below the last committed round, return early.
492        if commit_round <= self.dag.read().last_committed_round() {
493            return Ok(());
494        }
495
496        /* Proceeding to check if the leader is ready to be committed. */
497        trace!("Checking if the leader is ready to be committed for round {commit_round}...");
498
499        // Retrieve the committee lookback for the commit round.
500        let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
501            bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
502        };
503
504        // Either retrieve the cached leader or compute it.
505        let leader = match self.ledger().latest_leader() {
506            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
507            _ => {
508                // Compute the leader for the commit round.
509                let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
510                    bail!("BFT failed to compute the leader for commit round {commit_round}");
511                };
512
513                // Cache the computed leader.
514                self.ledger().update_latest_leader(commit_round, computed_leader);
515
516                computed_leader
517            }
518        };
519
520        // Retrieve the leader certificate for the commit round.
521        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
522        else {
523            trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
524            return Ok(());
525        };
526        // Retrieve all of the certificates for the **certificate** round.
527        let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
528            // TODO (howardwu): Investigate how many certificates we should have at this point.
529            bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
530        };
531        // Retrieve the committee lookback for the certificate round (i.e. the round just after the commit round).
532        let Ok(certificate_committee_lookback) = self.ledger().get_committee_lookback_for_round(certificate_round)
533        else {
534            bail!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}");
535        };
536        // Construct a set over the authors who included the leader's certificate in the certificate round.
537        let authors = certificates
538            .values()
539            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
540                true => Some(c.author()),
541                false => None,
542            })
543            .collect();
544        // Check if the leader is ready to be committed.
545        if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
546            // If the leader is not ready to be committed, return early.
547            trace!("BFT is not ready to commit {commit_round}");
548            return Ok(());
549        }
550
551        /* Proceeding to commit the leader. */
552        info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
553
554        // Commit the leader certificate, and all previous leader certificates since the last committed round.
555        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
556    }
557
558    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
559    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
560        &self,
561        leader_certificate: BatchCertificate<N>,
562    ) -> Result<()> {
563        // Fetch the leader round.
564        let latest_leader_round = leader_certificate.round();
565        // Determine the list of all previous leader certificates since the last committed round.
566        // The order of the leader certificates is from **newest** to **oldest**.
567        let mut leader_certificates = vec![leader_certificate.clone()];
568        {
569            // Retrieve the leader round.
570            let leader_round = leader_certificate.round();
571
572            let mut current_certificate = leader_certificate;
573            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
574            {
575                // Retrieve the previous committee for the leader round.
576                let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
577                    Ok(committee) => committee,
578                    Err(e) => {
579                        bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
580                    }
581                };
582                // Either retrieve the cached leader or compute it.
583                let leader = match self.ledger().latest_leader() {
584                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
585                    _ => {
586                        // Compute the leader for the commit round.
587                        let computed_leader = match previous_committee_lookback.get_leader(round) {
588                            Ok(leader) => leader,
589                            Err(e) => {
590                                bail!("BFT failed to compute the leader for the even round {round} - {e}");
591                            }
592                        };
593
594                        // Cache the computed leader.
595                        self.ledger().update_latest_leader(round, computed_leader);
596
597                        computed_leader
598                    }
599                };
600                // Retrieve the previous leader certificate.
601                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
602                else {
603                    continue;
604                };
605                // Determine if there is a path between the previous certificate and the current certificate.
606                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
607                    // Add the previous leader certificate to the list of certificates to commit.
608                    leader_certificates.push(previous_certificate.clone());
609                    // Update the current certificate to the previous leader certificate.
610                    current_certificate = previous_certificate;
611                }
612            }
613        }
614
615        // Iterate over the leader certificates to commit.
616        for leader_certificate in leader_certificates.into_iter().rev() {
617            // Retrieve the leader certificate round.
618            let leader_round = leader_certificate.round();
619            // Compute the commit subdag.
620            let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
621                Ok(subdag) => subdag,
622                Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
623            };
624            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
625            if !IS_SYNCING {
626                // Initialize a map for the deduped transmissions.
627                let mut transmissions = IndexMap::new();
628                // Initialize a map for the deduped transaction ids.
629                let mut seen_transaction_ids = IndexSet::new();
630                // Initialize a map for the deduped solution ids.
631                let mut seen_solution_ids = IndexSet::new();
632                // Start from the oldest leader certificate.
633                for certificate in commit_subdag.values().flatten() {
634                    // Retrieve the transmissions.
635                    for transmission_id in certificate.transmission_ids() {
636                        // If the transaction ID or solution ID already exists in the map, skip it.
637                        // Note: This additional check is done to ensure that we do not include duplicate
638                        // transaction IDs or solution IDs that may have a different transmission ID.
639                        match transmission_id {
640                            TransmissionID::Solution(solution_id, _) => {
641                                // If the solution already exists, skip it.
642                                if seen_solution_ids.contains(&solution_id) {
643                                    continue;
644                                }
645                            }
646                            TransmissionID::Transaction(transaction_id, _) => {
647                                // If the transaction already exists, skip it.
648                                if seen_transaction_ids.contains(transaction_id) {
649                                    continue;
650                                }
651                            }
652                            TransmissionID::Ratification => {
653                                bail!("Ratifications are currently not supported in the BFT.")
654                            }
655                        }
656                        // If the transmission already exists in the map, skip it.
657                        if transmissions.contains_key(transmission_id) {
658                            continue;
659                        }
660                        // If the transmission already exists in the ledger, skip it.
661                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
662                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
663                            continue;
664                        }
665                        // Retrieve the transmission.
666                        let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
667                            bail!(
668                                "BFT failed to retrieve transmission '{}.{}' from round {}",
669                                fmt_id(transmission_id),
670                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
671                                certificate.round()
672                            );
673                        };
674                        // Insert the transaction ID or solution ID into the map.
675                        match transmission_id {
676                            TransmissionID::Solution(id, _) => {
677                                seen_solution_ids.insert(id);
678                            }
679                            TransmissionID::Transaction(id, _) => {
680                                seen_transaction_ids.insert(id);
681                            }
682                            TransmissionID::Ratification => {}
683                        }
684                        // Add the transmission to the set.
685                        transmissions.insert(*transmission_id, transmission);
686                    }
687                }
688                // Trigger consensus, as this will build a new block for the ledger.
689                // Construct the subdag.
690                let subdag = Subdag::from(commit_subdag.clone())?;
691                // Retrieve the anchor round.
692                let anchor_round = subdag.anchor_round();
693                // Retrieve the number of transmissions.
694                let num_transmissions = transmissions.len();
695                // Retrieve metadata about the subdag.
696                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
697
698                // Ensure the subdag anchor round matches the leader round.
699                ensure!(
700                    anchor_round == leader_round,
701                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
702                );
703
704                // Trigger consensus.
705                if let Some(consensus_sender) = self.consensus_sender.get() {
706                    // Initialize a callback sender and receiver.
707                    let (callback_sender, callback_receiver) = oneshot::channel();
708                    // Send the subdag and transmissions to consensus.
709                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
710                    // Await the callback to continue.
711                    match callback_receiver.await {
712                        Ok(Ok(())) => (), // continue
713                        Ok(Err(e)) => {
714                            error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
715                            return Ok(());
716                        }
717                        Err(e) => {
718                            error!("BFT failed to receive the callback for round {anchor_round} - {e}");
719                            return Ok(());
720                        }
721                    }
722                }
723
724                info!(
725                    "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
726                );
727            }
728
729            // Update the DAG, as the subdag was successfully included into a block.
730            let mut dag_write = self.dag.write();
731            for certificate in commit_subdag.values().flatten() {
732                dag_write.commit(certificate, self.storage().max_gc_rounds());
733            }
734
735            // Update the validator telemetry.
736            #[cfg(feature = "telemetry")]
737            self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
738        }
739
740        // Perform garbage collection based on the latest committed leader round.
741        // The protocol guarantees that validators commit the same anchors in the same order,
742        // but they may do so in different chunks of anchors,
743        // where 'chunk' refers to the vector of certificates that the loop just above iterates over.
744        // Doing garbage collection at the end of each chunk (as we do here),
745        // as opposed to after each certificate in the chunk (if we moved this call inside the loop, at the end),
746        // may give raise to a discrepancy between the DAGs of different validators who commit different chunks:
747        // one validator may have more certificates than the other, not yet garbage collected.
748        // However, when `order_dag_with_dfs()` collects the sub-DAG to commit from an anchor,
749        // it excludes certificates that are below the GC round,
750        // so the possible aforementioned discrepancy between DAGs should not affect the consensus.
751        // That exclusion in `order_dag_with_dfs()` is critical to prevent forking,
752        // so long as garbage collection is done after each chunk.
753        // If garbage collection were done after each committed certificate,
754        // that exclusion in `order_dag_with_dfs()` should be unnecessary.
755        self.storage().garbage_collect_certificates(latest_leader_round);
756
757        Ok(())
758    }
759
760    /// Returns the subdag of batch certificates to commit.
761    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
762        &self,
763        leader_certificate: BatchCertificate<N>,
764    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
765        // Initialize a map for the certificates to commit.
766        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
767        // Initialize a set for the already ordered certificates.
768        let mut already_ordered = HashSet::new();
769        // Initialize a buffer for the certificates to order.
770        let mut buffer = vec![leader_certificate];
771        // Iterate over the certificates to order.
772        while let Some(certificate) = buffer.pop() {
773            // Insert the certificate into the map.
774            commit.entry(certificate.round()).or_default().insert(certificate.clone());
775
776            // Check if the previous certificate is below the GC round.
777            // This is currently a critical check to prevent forking,
778            // as explained in the comment at the end of `commit_leader_certificate()`,
779            // just before the call to garbage collection.
780            let previous_round = certificate.round().saturating_sub(1);
781            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
782                continue;
783            }
784            // Iterate over the previous certificate IDs.
785            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
786            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
787            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
788                // If the previous certificate is already ordered, continue.
789                if already_ordered.contains(previous_certificate_id) {
790                    continue;
791                }
792                // If the previous certificate was recently committed, continue.
793                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
794                    continue;
795                }
796                // If the previous certificate already exists in the ledger, continue.
797                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
798                    continue;
799                }
800
801                // Retrieve the previous certificate.
802                let previous_certificate = {
803                    // Start by retrieving the previous certificate from the DAG.
804                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
805                        // If the previous certificate is found, return it.
806                        Some(previous_certificate) => previous_certificate,
807                        // If the previous certificate is not found, retrieve it from the storage.
808                        None => match self.storage().get_certificate(*previous_certificate_id) {
809                            // If the previous certificate is found, return it.
810                            Some(previous_certificate) => previous_certificate,
811                            // Otherwise, the previous certificate is missing, and throw an error.
812                            None => bail!(
813                                "Missing previous certificate {} for round {previous_round}",
814                                fmt_id(previous_certificate_id)
815                            ),
816                        },
817                    }
818                };
819                // Insert the previous certificate into the set of already ordered certificates.
820                already_ordered.insert(previous_certificate.id());
821                // Insert the previous certificate into the buffer.
822                buffer.push(previous_certificate);
823            }
824        }
825        // Ensure we only retain certificates that are above the GC round.
826        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
827        // Return the certificates to commit.
828        Ok(commit)
829    }
830
831    /// Returns `true` if there is a path from the previous certificate to the current certificate.
832    fn is_linked(
833        &self,
834        previous_certificate: BatchCertificate<N>,
835        current_certificate: BatchCertificate<N>,
836    ) -> Result<bool> {
837        // Initialize the list containing the traversal.
838        let mut traversal = vec![current_certificate.clone()];
839        // Iterate over the rounds from the current certificate to the previous certificate.
840        for round in (previous_certificate.round()..current_certificate.round()).rev() {
841            // Retrieve all of the certificates for this past round.
842            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
843                // This is a critical error, as the traversal should have these certificates.
844                // If this error is hit, it is likely that the maximum GC rounds should be increased.
845                bail!("BFT failed to retrieve the certificates for past round {round}");
846            };
847            // Filter the certificates to only include those that are in the traversal.
848            traversal = certificates
849                .into_values()
850                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
851                .collect();
852        }
853        Ok(traversal.contains(&previous_certificate))
854    }
855}
856
857impl<N: Network> BFT<N> {
858    /// Starts the BFT handlers.
859    fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
860        let BFTReceiver {
861            mut rx_primary_round,
862            mut rx_primary_certificate,
863            mut rx_sync_bft_dag_at_bootup,
864            mut rx_sync_bft,
865        } = bft_receiver;
866
867        // Process the current round from the primary.
868        let self_ = self.clone();
869        self.spawn(async move {
870            while let Some((current_round, callback)) = rx_primary_round.recv().await {
871                callback.send(self_.update_to_next_round(current_round)).ok();
872            }
873        });
874
875        // Process the certificate from the primary.
876        let self_ = self.clone();
877        self.spawn(async move {
878            while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
879                // Update the DAG with the certificate.
880                let result = self_.update_dag::<true, false>(certificate).await;
881                // Send the callback **after** updating the DAG.
882                // Note: We must await the DAG update before proceeding.
883                callback.send(result).ok();
884            }
885        });
886
887        // Process the request to sync the BFT DAG at bootup.
888        let self_ = self.clone();
889        self.spawn(async move {
890            while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
891                self_.sync_bft_dag_at_bootup(certificates).await;
892            }
893        });
894
895        // Handler for new certificates that were fetched by the sync module.
896        let self_ = self.clone();
897        self.spawn(async move {
898            while let Some((certificate, callback)) = rx_sync_bft.recv().await {
899                // Update the DAG with the certificate.
900                let result = self_.update_dag::<true, true>(certificate).await;
901                // Send the callback **after** updating the DAG.
902                // Note: We must await the DAG update before proceeding.
903                callback.send(result).ok();
904            }
905        });
906    }
907
908    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
909    /// already exist in the ledger.
910    ///
911    /// This method commits all the certificates into the DAG.
912    /// Note that there is no need to insert the certificates into the DAG, because these certificates
913    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
914    async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
915        // Acquire the BFT write lock.
916        let mut dag = self.dag.write();
917
918        // Commit all the certificates.
919        for certificate in certificates {
920            dag.commit(&certificate, self.storage().max_gc_rounds());
921        }
922    }
923
924    /// Spawns a task with the given future; it should only be used for long-running tasks.
925    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
926        self.handles.lock().push(tokio::spawn(future));
927    }
928
929    /// Shuts down the BFT.
930    pub async fn shut_down(&self) {
931        info!("Shutting down the BFT...");
932        // Acquire the lock.
933        let _lock = self.lock.lock().await;
934        // Shut down the primary.
935        self.primary.shut_down().await;
936        // Abort the tasks.
937        self.handles.lock().iter().for_each(|handle| handle.abort());
938    }
939}
940
941#[cfg(test)]
942mod tests {
943    use crate::{BFT, MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, helpers::Storage};
944    use snarkos_account::Account;
945    use snarkos_node_bft_ledger_service::MockLedgerService;
946    use snarkos_node_bft_storage_service::BFTMemoryService;
947    use snarkos_node_sync::BlockSync;
948    use snarkvm::{
949        console::account::{Address, PrivateKey},
950        ledger::{
951            committee::Committee,
952            narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
953        },
954        utilities::TestRng,
955    };
956
957    use aleo_std::StorageMode;
958    use anyhow::Result;
959    use indexmap::{IndexMap, IndexSet};
960    use std::sync::Arc;
961
962    type CurrentNetwork = snarkvm::console::network::MainnetV0;
963
964    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
965    fn sample_test_instance(
966        committee_round: Option<u64>,
967        max_gc_rounds: u64,
968        rng: &mut TestRng,
969    ) -> (
970        Committee<CurrentNetwork>,
971        Account<CurrentNetwork>,
972        Arc<MockLedgerService<CurrentNetwork>>,
973        Storage<CurrentNetwork>,
974    ) {
975        let committee = match committee_round {
976            Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
977            None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
978        };
979        let account = Account::new(rng).unwrap();
980        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
981        let transmissions = Arc::new(BFTMemoryService::new());
982        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
983
984        (committee, account, ledger, storage)
985    }
986
987    // Helper function to set up BFT for testing.
988    fn initialize_bft(
989        account: Account<CurrentNetwork>,
990        storage: Storage<CurrentNetwork>,
991        ledger: Arc<MockLedgerService<CurrentNetwork>>,
992    ) -> anyhow::Result<BFT<CurrentNetwork>> {
993        // Create the block synchronization logic.
994        let block_sync = Arc::new(BlockSync::new(ledger.clone()));
995        // Initialize the BFT.
996        BFT::new(
997            account.clone(),
998            storage.clone(),
999            ledger.clone(),
1000            block_sync,
1001            None,
1002            &[],
1003            StorageMode::new_test(None),
1004            None,
1005        )
1006    }
1007
1008    #[test]
1009    #[tracing_test::traced_test]
1010    fn test_is_leader_quorum_odd() -> Result<()> {
1011        let rng = &mut TestRng::default();
1012
1013        // Sample batch certificates.
1014        let mut certificates = IndexSet::new();
1015        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1016        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1017        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1018        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1019
1020        // Initialize the committee.
1021        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1022            1,
1023            vec![
1024                certificates[0].author(),
1025                certificates[1].author(),
1026                certificates[2].author(),
1027                certificates[3].author(),
1028            ],
1029            rng,
1030        );
1031
1032        // Initialize the ledger.
1033        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1034        // Initialize the storage.
1035        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1036        // Initialize the account.
1037        let account = Account::new(rng)?;
1038        // Initialize the BFT.
1039        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1040        assert!(bft.is_timer_expired());
1041        // Ensure this call succeeds on an odd round.
1042        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1043        // If timer has expired but quorum threshold is not reached, return 'false'.
1044        assert!(!result);
1045        // Insert certificates into storage.
1046        for certificate in certificates.iter() {
1047            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1048        }
1049        // Ensure this call succeeds on an odd round.
1050        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1051        assert!(result); // no previous leader certificate
1052        // Set the leader certificate.
1053        let leader_certificate = sample_batch_certificate(rng);
1054        *bft.leader_certificate.write() = Some(leader_certificate);
1055        // Ensure this call succeeds on an odd round.
1056        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1057        assert!(result); // should now fall through to the end of function
1058
1059        Ok(())
1060    }
1061
1062    #[test]
1063    #[tracing_test::traced_test]
1064    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1065        let rng = &mut TestRng::default();
1066
1067        // Sample the test instance.
1068        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1069        assert_eq!(committee.starting_round(), 1);
1070        assert_eq!(storage.current_round(), 1);
1071        assert_eq!(storage.max_gc_rounds(), 10);
1072
1073        // Set up the BFT logic.
1074        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1075        assert!(bft.is_timer_expired());
1076
1077        // Store is at round 1, and we are checking for round 2.
1078        // Ensure this call fails on an even round.
1079        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1080        assert!(!result);
1081        Ok(())
1082    }
1083
1084    #[test]
1085    #[tracing_test::traced_test]
1086    fn test_is_leader_quorum_even() -> Result<()> {
1087        let rng = &mut TestRng::default();
1088
1089        // Sample the test instance.
1090        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1091        assert_eq!(committee.starting_round(), 2);
1092        assert_eq!(storage.current_round(), 2);
1093        assert_eq!(storage.max_gc_rounds(), 10);
1094
1095        // Set up the BFT logic.
1096        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1097        assert!(bft.is_timer_expired());
1098
1099        // Ensure this call fails on an even round.
1100        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1101        assert!(!result);
1102        Ok(())
1103    }
1104
1105    #[test]
1106    #[tracing_test::traced_test]
1107    fn test_is_even_round_ready() -> Result<()> {
1108        let rng = &mut TestRng::default();
1109
1110        // Sample batch certificates.
1111        let mut certificates = IndexSet::new();
1112        certificates.insert(sample_batch_certificate_for_round(2, rng));
1113        certificates.insert(sample_batch_certificate_for_round(2, rng));
1114        certificates.insert(sample_batch_certificate_for_round(2, rng));
1115        certificates.insert(sample_batch_certificate_for_round(2, rng));
1116
1117        // Initialize the committee.
1118        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1119            2,
1120            vec![
1121                certificates[0].author(),
1122                certificates[1].author(),
1123                certificates[2].author(),
1124                certificates[3].author(),
1125            ],
1126            rng,
1127        );
1128
1129        // Initialize the ledger.
1130        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1131        // Initialize the storage.
1132        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1133        // Initialize the account.
1134        let account = Account::new(rng)?;
1135
1136        // Set up the BFT logic.
1137        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1138        assert!(bft.is_timer_expired());
1139
1140        // Set the leader certificate.
1141        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1142        *bft.leader_certificate.write() = Some(leader_certificate);
1143        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1144        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1145        assert!(!result);
1146        // Once quorum threshold is reached, we are ready for the next round.
1147        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1148        assert!(result);
1149
1150        // Initialize a new BFT.
1151        let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1152        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1153        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1154        if !bft_timer.is_timer_expired() {
1155            assert!(!result);
1156        }
1157        // Wait for the timer to expire.
1158        let leader_certificate_timeout =
1159            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1160        std::thread::sleep(leader_certificate_timeout);
1161        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1162        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1163        if bft_timer.is_timer_expired() {
1164            assert!(result);
1165        } else {
1166            assert!(!result);
1167        }
1168
1169        Ok(())
1170    }
1171
1172    #[test]
1173    #[tracing_test::traced_test]
1174    fn test_update_leader_certificate_odd() -> Result<()> {
1175        let rng = &mut TestRng::default();
1176
1177        // Sample the test instance.
1178        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1179        assert_eq!(storage.max_gc_rounds(), 10);
1180
1181        // Initialize the BFT.
1182        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1183        assert!(bft.is_timer_expired());
1184
1185        // Ensure this call fails on an odd round.
1186        let result = bft.update_leader_certificate_to_even_round(1);
1187        assert!(!result);
1188        Ok(())
1189    }
1190
1191    #[test]
1192    #[tracing_test::traced_test]
1193    fn test_update_leader_certificate_bad_round() -> Result<()> {
1194        let rng = &mut TestRng::default();
1195
1196        // Sample the test instance.
1197        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1198        assert_eq!(storage.max_gc_rounds(), 10);
1199
1200        // Initialize the BFT.
1201        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1202
1203        // Ensure this call succeeds on an even round.
1204        let result = bft.update_leader_certificate_to_even_round(6);
1205        assert!(!result);
1206        Ok(())
1207    }
1208
1209    #[test]
1210    #[tracing_test::traced_test]
1211    fn test_update_leader_certificate_even() -> Result<()> {
1212        let rng = &mut TestRng::default();
1213
1214        // Set the current round.
1215        let current_round = 3;
1216
1217        // Sample the certificates.
1218        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1219            current_round,
1220            rng,
1221        );
1222
1223        // Initialize the committee.
1224        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1225            2,
1226            vec![
1227                certificates[0].author(),
1228                certificates[1].author(),
1229                certificates[2].author(),
1230                certificates[3].author(),
1231            ],
1232            rng,
1233        );
1234
1235        // Initialize the ledger.
1236        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1237
1238        // Initialize the storage.
1239        let transmissions = Arc::new(BFTMemoryService::new());
1240        let storage = Storage::new(ledger.clone(), transmissions, 10);
1241        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1242        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1243        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1244        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1245        assert_eq!(storage.current_round(), 2);
1246
1247        // Retrieve the leader certificate.
1248        let leader = committee.get_leader(2).unwrap();
1249        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1250
1251        // Initialize the BFT.
1252        let account = Account::new(rng)?;
1253        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1254
1255        // Set the leader certificate.
1256        *bft.leader_certificate.write() = Some(leader_certificate);
1257
1258        // Update the leader certificate.
1259        // Ensure this call succeeds on an even round.
1260        let result = bft.update_leader_certificate_to_even_round(2);
1261        assert!(result);
1262
1263        Ok(())
1264    }
1265
1266    #[tokio::test]
1267    #[tracing_test::traced_test]
1268    async fn test_order_dag_with_dfs() -> Result<()> {
1269        let rng = &mut TestRng::default();
1270
1271        // Sample the test instance.
1272        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1273
1274        // Initialize the round parameters.
1275        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1276        let current_round = previous_round + 1;
1277
1278        // Sample the current certificate and previous certificates.
1279        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1280            current_round,
1281            rng,
1282        );
1283
1284        /* Test GC */
1285
1286        // Ensure the function succeeds in returning only certificates above GC.
1287        {
1288            // Initialize the storage.
1289            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1290            // Initialize the BFT.
1291            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1292
1293            // Insert a mock DAG in the BFT.
1294            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1295
1296            // Insert the previous certificates into the BFT.
1297            for certificate in previous_certificates.clone() {
1298                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1299            }
1300
1301            // Ensure this call succeeds and returns all given certificates.
1302            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1303            assert!(result.is_ok());
1304            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1305            assert_eq!(candidate_certificates.len(), 1);
1306            let expected_certificates = vec![certificate.clone()];
1307            assert_eq!(
1308                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1309                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1310            );
1311            assert_eq!(candidate_certificates, expected_certificates);
1312        }
1313
1314        /* Test normal case */
1315
1316        // Ensure the function succeeds in returning all given certificates.
1317        {
1318            // Initialize the storage.
1319            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1320            // Initialize the BFT.
1321            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1322
1323            // Insert a mock DAG in the BFT.
1324            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1325
1326            // Insert the previous certificates into the BFT.
1327            for certificate in previous_certificates.clone() {
1328                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1329            }
1330
1331            // Ensure this call succeeds and returns all given certificates.
1332            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1333            assert!(result.is_ok());
1334            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1335            assert_eq!(candidate_certificates.len(), 5);
1336            let expected_certificates = vec![
1337                previous_certificates[0].clone(),
1338                previous_certificates[1].clone(),
1339                previous_certificates[2].clone(),
1340                previous_certificates[3].clone(),
1341                certificate,
1342            ];
1343            assert_eq!(
1344                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1345                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1346            );
1347            assert_eq!(candidate_certificates, expected_certificates);
1348        }
1349
1350        Ok(())
1351    }
1352
1353    #[test]
1354    #[tracing_test::traced_test]
1355    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1356        let rng = &mut TestRng::default();
1357
1358        // Sample the test instance.
1359        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1360        assert_eq!(committee.starting_round(), 1);
1361        assert_eq!(storage.current_round(), 1);
1362        assert_eq!(storage.max_gc_rounds(), 1);
1363
1364        // Initialize the round parameters.
1365        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1366        let current_round = previous_round + 1;
1367
1368        // Sample the current certificate and previous certificates.
1369        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1370            current_round,
1371            rng,
1372        );
1373        // Construct the previous certificate IDs.
1374        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1375
1376        /* Test missing previous certificate. */
1377
1378        // Initialize the BFT.
1379        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1380
1381        // The expected error message.
1382        let error_msg = format!(
1383            "Missing previous certificate {} for round {previous_round}",
1384            crate::helpers::fmt_id(previous_certificate_ids[3]),
1385        );
1386
1387        // Ensure this call fails on a missing previous certificate.
1388        let result = bft.order_dag_with_dfs::<false>(certificate);
1389        assert!(result.is_err());
1390        assert_eq!(result.unwrap_err().to_string(), error_msg);
1391        Ok(())
1392    }
1393
1394    #[tokio::test]
1395    #[tracing_test::traced_test]
1396    async fn test_bft_gc_on_commit() -> Result<()> {
1397        let rng = &mut TestRng::default();
1398
1399        // Initialize the round parameters.
1400        let max_gc_rounds = 1;
1401        let committee_round = 0;
1402        let commit_round = 2;
1403        let current_round = commit_round + 1;
1404
1405        // Sample the certificates.
1406        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1407            current_round,
1408            rng,
1409        );
1410
1411        // Initialize the committee.
1412        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1413            committee_round,
1414            vec![
1415                certificates[0].author(),
1416                certificates[1].author(),
1417                certificates[2].author(),
1418                certificates[3].author(),
1419            ],
1420            rng,
1421        );
1422
1423        // Initialize the ledger.
1424        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1425
1426        // Initialize the storage.
1427        let transmissions = Arc::new(BFTMemoryService::new());
1428        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1429        // Insert the certificates into the storage.
1430        for certificate in certificates.iter() {
1431            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1432        }
1433
1434        // Get the leader certificate.
1435        let leader = committee.get_leader(commit_round).unwrap();
1436        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1437
1438        // Initialize the BFT.
1439        let account = Account::new(rng)?;
1440        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1441
1442        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1443
1444        // Ensure that the `gc_round` has not been updated yet.
1445        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1446
1447        // Insert the certificates into the BFT.
1448        for certificate in certificates {
1449            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1450        }
1451
1452        // Commit the leader certificate.
1453        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1454
1455        // Ensure that the `gc_round` has been updated.
1456        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1457
1458        Ok(())
1459    }
1460
1461    #[tokio::test]
1462    #[tracing_test::traced_test]
1463    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1464        let rng = &mut TestRng::default();
1465
1466        // Initialize the round parameters.
1467        let max_gc_rounds = 1;
1468        let committee_round = 0;
1469        let commit_round = 2;
1470        let current_round = commit_round + 1;
1471
1472        // Sample the current certificate and previous certificates.
1473        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1474            current_round,
1475            rng,
1476        );
1477
1478        // Initialize the committee.
1479        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1480            committee_round,
1481            vec![
1482                certificates[0].author(),
1483                certificates[1].author(),
1484                certificates[2].author(),
1485                certificates[3].author(),
1486            ],
1487            rng,
1488        );
1489
1490        // Initialize the ledger.
1491        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1492
1493        // Initialize the storage.
1494        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1495        // Insert the certificates into the storage.
1496        for certificate in certificates.iter() {
1497            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1498        }
1499
1500        // Get the leader certificate.
1501        let leader = committee.get_leader(commit_round).unwrap();
1502        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1503
1504        // Initialize the BFT.
1505        let account = Account::new(rng)?;
1506        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1507
1508        // Insert a mock DAG in the BFT.
1509        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1510
1511        // Insert the previous certificates into the BFT.
1512        for certificate in certificates.clone() {
1513            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1514        }
1515
1516        // Commit the leader certificate.
1517        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1518
1519        // Simulate a bootup of the BFT.
1520
1521        // Initialize a new instance of storage.
1522        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1523        // Initialize a new instance of BFT.
1524        let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1525
1526        // Sync the BFT DAG at bootup.
1527        bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1528
1529        // Check that the BFT starts from the same last committed round.
1530        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1531
1532        // Ensure that both BFTs have committed the leader certificate.
1533        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1534        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1535
1536        // Check the state of the bootup BFT.
1537        for certificate in certificates {
1538            let certificate_round = certificate.round();
1539            let certificate_id = certificate.id();
1540            // Check that the bootup BFT has committed the certificates.
1541            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1542            // Check that the bootup BFT does not contain the certificates in its graph, because
1543            // it should not need to order them again in subsequent subdags.
1544            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1545        }
1546
1547        Ok(())
1548    }
1549
1550    #[tokio::test]
1551    #[tracing_test::traced_test]
1552    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1553        /*
1554        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1555        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1556        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1557        */
1558
1559        let rng = &mut TestRng::default();
1560
1561        // Initialize the round parameters.
1562        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1563        let committee_round = 0;
1564        let commit_round = 2;
1565        let current_round = commit_round + 1;
1566        let next_round = current_round + 1;
1567
1568        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1569        let (round_to_certificates_map, committee) = {
1570            let private_keys = vec![
1571                PrivateKey::new(rng).unwrap(),
1572                PrivateKey::new(rng).unwrap(),
1573                PrivateKey::new(rng).unwrap(),
1574                PrivateKey::new(rng).unwrap(),
1575            ];
1576            let addresses = vec![
1577                Address::try_from(private_keys[0])?,
1578                Address::try_from(private_keys[1])?,
1579                Address::try_from(private_keys[2])?,
1580                Address::try_from(private_keys[3])?,
1581            ];
1582            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1583                committee_round,
1584                addresses,
1585                rng,
1586            );
1587            // Initialize a mapping from the round number to the set of batch certificates in the round.
1588            let mut round_to_certificates_map: IndexMap<
1589                u64,
1590                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1591            > = IndexMap::new();
1592            let mut previous_certificates = IndexSet::with_capacity(4);
1593            // Initialize the genesis batch certificates.
1594            for _ in 0..4 {
1595                previous_certificates.insert(sample_batch_certificate(rng));
1596            }
1597            for round in 0..commit_round + 3 {
1598                let mut current_certificates = IndexSet::new();
1599                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1600                    IndexSet::new()
1601                } else {
1602                    previous_certificates.iter().map(|c| c.id()).collect()
1603                };
1604                let transmission_ids =
1605                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1606                        .into_iter()
1607                        .collect::<IndexSet<_>>();
1608                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1609                let committee_id = committee.id();
1610                for (i, private_key_1) in private_keys.iter().enumerate() {
1611                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1612                        private_key_1,
1613                        round,
1614                        timestamp,
1615                        committee_id,
1616                        transmission_ids.clone(),
1617                        previous_certificate_ids.clone(),
1618                        rng,
1619                    )
1620                    .unwrap();
1621                    let mut signatures = IndexSet::with_capacity(4);
1622                    for (j, private_key_2) in private_keys.iter().enumerate() {
1623                        if i != j {
1624                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1625                        }
1626                    }
1627                    let certificate =
1628                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1629                    current_certificates.insert(certificate);
1630                }
1631                // Update the mapping.
1632                round_to_certificates_map.insert(round, current_certificates.clone());
1633                previous_certificates = current_certificates.clone();
1634            }
1635            (round_to_certificates_map, committee)
1636        };
1637
1638        // Initialize the ledger.
1639        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1640        // Initialize the storage.
1641        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1642        // Get the leaders for the next 2 commit rounds.
1643        let leader = committee.get_leader(commit_round).unwrap();
1644        let next_leader = committee.get_leader(next_round).unwrap();
1645        // Insert the pre shutdown certificates into the storage.
1646        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1647        for i in 1..=commit_round {
1648            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1649            if i == commit_round {
1650                // Only insert the leader certificate for the commit round.
1651                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1652                if let Some(c) = leader_certificate {
1653                    pre_shutdown_certificates.push(c.clone());
1654                }
1655                continue;
1656            }
1657            pre_shutdown_certificates.extend(certificates);
1658        }
1659        for certificate in pre_shutdown_certificates.iter() {
1660            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1661        }
1662        // Insert the post shutdown certificates into the storage.
1663        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1664            Vec::new();
1665        for j in commit_round..=commit_round + 2 {
1666            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1667            post_shutdown_certificates.extend(certificate);
1668        }
1669        for certificate in post_shutdown_certificates.iter() {
1670            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1671        }
1672        // Get the leader certificates.
1673        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1674        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1675
1676        // Initialize the BFT without bootup.
1677        let account = Account::new(rng)?;
1678        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1679
1680        // Insert a mock DAG in the BFT without bootup.
1681        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1682
1683        // Insert the certificates into the BFT without bootup.
1684        for certificate in pre_shutdown_certificates.clone() {
1685            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1686        }
1687
1688        // Insert the post shutdown certificates into the BFT without bootup.
1689        for certificate in post_shutdown_certificates.clone() {
1690            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1691        }
1692        // Commit the second leader certificate.
1693        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1694        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1695        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1696
1697        // Simulate a bootup of the BFT.
1698
1699        // Initialize a new instance of storage.
1700        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1701
1702        // Initialize a new instance of BFT with bootup.
1703        let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1704
1705        // Sync the BFT DAG at bootup.
1706        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1707
1708        // Insert the post shutdown certificates to the storage and BFT with bootup.
1709        for certificate in post_shutdown_certificates.iter() {
1710            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1711        }
1712        for certificate in post_shutdown_certificates.clone() {
1713            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1714        }
1715        // Commit the second leader certificate.
1716        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1717        let commit_subdag_metadata_bootup =
1718            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1719        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1720        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1721
1722        // Check that the final state of both BFTs is the same.
1723
1724        // Check that both BFTs start from the same last committed round.
1725        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1726
1727        // Ensure that both BFTs have committed the leader certificates.
1728        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1729        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1730        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1731        assert!(
1732            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1733        );
1734
1735        // Check that the bootup BFT has committed the pre shutdown certificates.
1736        for certificate in pre_shutdown_certificates.clone() {
1737            let certificate_round = certificate.round();
1738            let certificate_id = certificate.id();
1739            // Check that both BFTs have committed the certificates.
1740            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1741            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1742            // Check that the bootup BFT does not contain the certificates in its graph, because
1743            // it should not need to order them again in subsequent subdags.
1744            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1745            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1746        }
1747
1748        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1749        for certificate in committed_certificates_bootup.clone() {
1750            let certificate_round = certificate.round();
1751            let certificate_id = certificate.id();
1752            // Check that the both BFTs have committed the certificates.
1753            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1754            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1755            // Check that the bootup BFT does not contain the certificates in its graph, because
1756            // it should not need to order them again in subsequent subdags.
1757            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1758            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1759        }
1760
1761        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1762        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1763
1764        Ok(())
1765    }
1766
1767    #[tokio::test]
1768    #[tracing_test::traced_test]
1769    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1770        /*
1771        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1772        2. Add post shutdown certificates to the bootup BFT.
1773        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1774        */
1775
1776        let rng = &mut TestRng::default();
1777
1778        // Initialize the round parameters.
1779        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1780        let committee_round = 0;
1781        let commit_round = 2;
1782        let current_round = commit_round + 1;
1783        let next_round = current_round + 1;
1784
1785        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1786        let (round_to_certificates_map, committee) = {
1787            let private_keys = vec![
1788                PrivateKey::new(rng).unwrap(),
1789                PrivateKey::new(rng).unwrap(),
1790                PrivateKey::new(rng).unwrap(),
1791                PrivateKey::new(rng).unwrap(),
1792            ];
1793            let addresses = vec![
1794                Address::try_from(private_keys[0])?,
1795                Address::try_from(private_keys[1])?,
1796                Address::try_from(private_keys[2])?,
1797                Address::try_from(private_keys[3])?,
1798            ];
1799            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1800                committee_round,
1801                addresses,
1802                rng,
1803            );
1804            // Initialize a mapping from the round number to the set of batch certificates in the round.
1805            let mut round_to_certificates_map: IndexMap<
1806                u64,
1807                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1808            > = IndexMap::new();
1809            let mut previous_certificates = IndexSet::with_capacity(4);
1810            // Initialize the genesis batch certificates.
1811            for _ in 0..4 {
1812                previous_certificates.insert(sample_batch_certificate(rng));
1813            }
1814            for round in 0..=commit_round + 2 {
1815                let mut current_certificates = IndexSet::new();
1816                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1817                    IndexSet::new()
1818                } else {
1819                    previous_certificates.iter().map(|c| c.id()).collect()
1820                };
1821                let transmission_ids =
1822                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1823                        .into_iter()
1824                        .collect::<IndexSet<_>>();
1825                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1826                let committee_id = committee.id();
1827                for (i, private_key_1) in private_keys.iter().enumerate() {
1828                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1829                        private_key_1,
1830                        round,
1831                        timestamp,
1832                        committee_id,
1833                        transmission_ids.clone(),
1834                        previous_certificate_ids.clone(),
1835                        rng,
1836                    )
1837                    .unwrap();
1838                    let mut signatures = IndexSet::with_capacity(4);
1839                    for (j, private_key_2) in private_keys.iter().enumerate() {
1840                        if i != j {
1841                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1842                        }
1843                    }
1844                    let certificate =
1845                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1846                    current_certificates.insert(certificate);
1847                }
1848                // Update the mapping.
1849                round_to_certificates_map.insert(round, current_certificates.clone());
1850                previous_certificates = current_certificates.clone();
1851            }
1852            (round_to_certificates_map, committee)
1853        };
1854
1855        // Initialize the ledger.
1856        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1857        // Initialize the storage.
1858        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1859        // Get the leaders for the next 2 commit rounds.
1860        let leader = committee.get_leader(commit_round).unwrap();
1861        let next_leader = committee.get_leader(next_round).unwrap();
1862        // Insert the pre shutdown certificates into the storage.
1863        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1864        for i in 1..=commit_round {
1865            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1866            if i == commit_round {
1867                // Only insert the leader certificate for the commit round.
1868                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1869                if let Some(c) = leader_certificate {
1870                    pre_shutdown_certificates.push(c.clone());
1871                }
1872                continue;
1873            }
1874            pre_shutdown_certificates.extend(certificates);
1875        }
1876        for certificate in pre_shutdown_certificates.iter() {
1877            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1878        }
1879        // Initialize the bootup BFT.
1880        let account = Account::new(rng)?;
1881        let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1882
1883        // Insert a mock DAG in the BFT without bootup.
1884        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1885        // Sync the BFT DAG at bootup.
1886        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1887
1888        // Insert the post shutdown certificates into the storage.
1889        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1890            Vec::new();
1891        for j in commit_round..=commit_round + 2 {
1892            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1893            post_shutdown_certificates.extend(certificate);
1894        }
1895        for certificate in post_shutdown_certificates.iter() {
1896            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1897        }
1898
1899        // Insert the post shutdown certificates into the DAG.
1900        for certificate in post_shutdown_certificates.clone() {
1901            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1902        }
1903
1904        // Get the next leader certificate to commit.
1905        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1906        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1907        let committed_certificates = commit_subdag.values().flatten();
1908
1909        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1910        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1911            for committed_certificate in committed_certificates.clone() {
1912                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1913            }
1914        }
1915        Ok(())
1916    }
1917}