snarkos_node_bft/
bft.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    Primary,
19    helpers::{
20        BFTReceiver,
21        ConsensusSender,
22        DAG,
23        PrimaryReceiver,
24        PrimarySender,
25        Storage,
26        fmt_id,
27        init_bft_channels,
28        now,
29    },
30};
31use snarkos_account::Account;
32use snarkos_node_bft_ledger_service::LedgerService;
33use snarkos_node_sync::{BlockSync, Ping};
34use snarkvm::{
35    console::account::Address,
36    ledger::{
37        block::Transaction,
38        committee::Committee,
39        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
40        puzzle::{Solution, SolutionID},
41    },
42    prelude::{Field, Network, Result, bail, ensure},
43};
44
45use aleo_std::StorageMode;
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48#[cfg(feature = "locktick")]
49use locktick::{
50    parking_lot::{Mutex, RwLock},
51    tokio::Mutex as TMutex,
52};
53#[cfg(not(feature = "locktick"))]
54use parking_lot::{Mutex, RwLock};
55use std::{
56    collections::{BTreeMap, HashSet},
57    future::Future,
58    net::SocketAddr,
59    sync::{
60        Arc,
61        atomic::{AtomicI64, Ordering},
62    },
63};
64#[cfg(not(feature = "locktick"))]
65use tokio::sync::Mutex as TMutex;
66use tokio::{
67    sync::{OnceCell, oneshot},
68    task::JoinHandle,
69};
70
71#[derive(Clone)]
72pub struct BFT<N: Network> {
73    /// The primary for this node.
74    primary: Primary<N>,
75    /// The DAG of batches from which we build the blockchain.
76    dag: Arc<RwLock<DAG<N>>>,
77    /// The batch certificate of the leader from the current even round, if one was present.
78    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
79    /// The timer for the leader certificate to be received.
80    leader_certificate_timer: Arc<AtomicI64>,
81    /// The consensus sender.
82    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
83    /// Handles for all spawned tasks.
84    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
85    /// The BFT lock.
86    lock: Arc<TMutex<()>>,
87}
88
89impl<N: Network> BFT<N> {
90    /// Initializes a new instance of the BFT.
91    #[allow(clippy::too_many_arguments)]
92    pub fn new(
93        account: Account<N>,
94        storage: Storage<N>,
95        ledger: Arc<dyn LedgerService<N>>,
96        block_sync: Arc<BlockSync<N>>,
97        ip: Option<SocketAddr>,
98        trusted_validators: &[SocketAddr],
99        trusted_peers_only: bool,
100        storage_mode: StorageMode,
101        dev: Option<u16>,
102    ) -> Result<Self> {
103        Ok(Self {
104            primary: Primary::new(
105                account,
106                storage,
107                ledger,
108                block_sync,
109                ip,
110                trusted_validators,
111                trusted_peers_only,
112                storage_mode,
113                dev,
114            )?,
115            dag: Default::default(),
116            leader_certificate: Default::default(),
117            leader_certificate_timer: Default::default(),
118            consensus_sender: Default::default(),
119            handles: Default::default(),
120            lock: Default::default(),
121        })
122    }
123
124    /// Run the BFT instance.
125    ///
126    /// This will return as soon as all required tasks are spawned.
127    /// The function must not be called more than once per instance.
128    pub async fn run(
129        &mut self,
130        ping: Option<Arc<Ping<N>>>,
131        consensus_sender: Option<ConsensusSender<N>>,
132        primary_sender: PrimarySender<N>,
133        primary_receiver: PrimaryReceiver<N>,
134    ) -> Result<()> {
135        info!("Starting the BFT instance...");
136        // Initialize the BFT channels.
137        let (bft_sender, bft_receiver) = init_bft_channels::<N>();
138        // First, start the BFT handlers.
139        self.start_handlers(bft_receiver);
140        // Next, run the primary instance.
141        self.primary.run(ping, Some(bft_sender), primary_sender, primary_receiver).await?;
142        // Lastly, set the consensus sender.
143        // Note: This ensures during initial syncing, that the BFT does not advance the ledger.
144        if let Some(consensus_sender) = consensus_sender {
145            self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
146        }
147        Ok(())
148    }
149
150    /// Returns `true` if the primary is synced.
151    pub fn is_synced(&self) -> bool {
152        self.primary.is_synced()
153    }
154
155    /// Returns the primary.
156    pub const fn primary(&self) -> &Primary<N> {
157        &self.primary
158    }
159
160    /// Returns the storage.
161    pub const fn storage(&self) -> &Storage<N> {
162        self.primary.storage()
163    }
164
165    /// Returns the ledger.
166    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
167        self.primary.ledger()
168    }
169
170    /// Returns the leader of the current even round, if one was present.
171    pub fn leader(&self) -> Option<Address<N>> {
172        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
173    }
174
175    /// Returns the certificate of the leader from the current even round, if one was present.
176    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
177        &self.leader_certificate
178    }
179}
180
181impl<N: Network> BFT<N> {
182    /// Returns the number of unconfirmed transmissions.
183    pub fn num_unconfirmed_transmissions(&self) -> usize {
184        self.primary.num_unconfirmed_transmissions()
185    }
186
187    /// Returns the number of unconfirmed ratifications.
188    pub fn num_unconfirmed_ratifications(&self) -> usize {
189        self.primary.num_unconfirmed_ratifications()
190    }
191
192    /// Returns the number of solutions.
193    pub fn num_unconfirmed_solutions(&self) -> usize {
194        self.primary.num_unconfirmed_solutions()
195    }
196
197    /// Returns the number of unconfirmed transactions.
198    pub fn num_unconfirmed_transactions(&self) -> usize {
199        self.primary.num_unconfirmed_transactions()
200    }
201}
202
203impl<N: Network> BFT<N> {
204    /// Returns the worker transmission IDs.
205    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
206        self.primary.worker_transmission_ids()
207    }
208
209    /// Returns the worker transmissions.
210    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
211        self.primary.worker_transmissions()
212    }
213
214    /// Returns the worker solutions.
215    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
216        self.primary.worker_solutions()
217    }
218
219    /// Returns the worker transactions.
220    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
221        self.primary.worker_transactions()
222    }
223}
224
225impl<N: Network> BFT<N> {
226    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
227    fn update_to_next_round(&self, current_round: u64) -> bool {
228        // Ensure the current round is at least the storage round (this is a sanity check).
229        let storage_round = self.storage().current_round();
230        if current_round < storage_round {
231            debug!(
232                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
233            );
234            return false;
235        }
236
237        // Determine if the BFT is ready to update to the next round.
238        let is_ready = match current_round % 2 == 0 {
239            true => self.update_leader_certificate_to_even_round(current_round),
240            false => self.is_leader_quorum_or_nonleaders_available(current_round),
241        };
242
243        #[cfg(feature = "metrics")]
244        {
245            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
246            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
247            if start > 0 {
248                let end = now();
249                let elapsed = std::time::Duration::from_secs((end - start) as u64);
250                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
251            }
252        }
253
254        // Log whether the round is going to update.
255        if current_round % 2 == 0 {
256            // Determine if there is a leader certificate.
257            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
258                // Ensure the state of the leader certificate is consistent with the BFT being ready.
259                if !is_ready {
260                    trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
261                }
262                // Log the leader election.
263                let leader_round = leader_certificate.round();
264                match leader_round == current_round {
265                    true => {
266                        info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
267                        #[cfg(feature = "metrics")]
268                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
269                    }
270                    false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
271                }
272            } else {
273                match is_ready {
274                    true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
275                    false => info!("{}", format!("\n\nRound {current_round} did not elect a leader (yet)\n").dimmed()),
276                }
277            }
278        }
279
280        // If the BFT is ready, then update to the next round.
281        if is_ready {
282            // Update to the next round in storage.
283            if let Err(e) = self.storage().increment_to_next_round(current_round) {
284                warn!("BFT failed to increment to the next round from round {current_round} - {e}");
285                return false;
286            }
287            // Update the timer for the leader certificate.
288            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
289        }
290
291        is_ready
292    }
293
294    /// Updates the leader certificate to the current even round,
295    /// returning `true` if the BFT is ready to update to the next round.
296    ///
297    /// This method runs on every even round, by determining the leader of the current even round,
298    /// and setting the leader certificate to their certificate in the round, if they were present.
299    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
300        // Retrieve the current round.
301        let current_round = self.storage().current_round();
302        // Ensure the current round matches the given round.
303        if current_round != even_round {
304            warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
305            return false;
306        }
307
308        // If the current round is odd, return false.
309        if current_round % 2 != 0 || current_round < 2 {
310            error!("BFT cannot update the leader certificate in an odd round");
311            return false;
312        }
313
314        // Retrieve the certificates for the current round.
315        let current_certificates = self.storage().get_certificates_for_round(current_round);
316        // If there are no current certificates, set the leader certificate to 'None', and return early.
317        if current_certificates.is_empty() {
318            // Set the leader certificate to 'None'.
319            *self.leader_certificate.write() = None;
320            return false;
321        }
322
323        // Retrieve the committee lookback of the current round.
324        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
325            Ok(committee) => committee,
326            Err(e) => {
327                error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
328                return false;
329            }
330        };
331        // Determine the leader of the current round.
332        let leader = match self.ledger().latest_leader() {
333            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
334            _ => {
335                // Compute the leader for the current round.
336                let computed_leader = match committee_lookback.get_leader(current_round) {
337                    Ok(leader) => leader,
338                    Err(e) => {
339                        error!("BFT failed to compute the leader for the even round {current_round} - {e}");
340                        return false;
341                    }
342                };
343
344                // Cache the computed leader.
345                self.ledger().update_latest_leader(current_round, computed_leader);
346
347                computed_leader
348            }
349        };
350        // Find and set the leader certificate, if the leader was present in the current even round.
351        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
352        *self.leader_certificate.write() = leader_certificate.cloned();
353
354        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
355    }
356
357    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
358    ///  - If the leader certificate is set for the current even round.
359    ///  - The timer for the leader certificate has expired.
360    fn is_even_round_ready_for_next_round(
361        &self,
362        certificates: IndexSet<BatchCertificate<N>>,
363        committee: Committee<N>,
364        current_round: u64,
365    ) -> bool {
366        // Retrieve the authors for the current round.
367        let authors = certificates.into_iter().map(|c| c.author()).collect();
368        // Check if quorum threshold is reached.
369        if !committee.is_quorum_threshold_reached(&authors) {
370            trace!("BFT failed to reach quorum threshold in even round {current_round}");
371            return false;
372        }
373        // If the leader certificate is set for the current even round, return 'true'.
374        if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
375            if leader_certificate.round() == current_round {
376                return true;
377            }
378        }
379        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
380        if self.is_timer_expired() {
381            debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
382            return true;
383        }
384        // Otherwise, return 'false'.
385        false
386    }
387
388    /// Returns `true` if the timer for the leader certificate has expired.
389    ///
390    /// This is always true for a new BFT instance.
391    fn is_timer_expired(&self) -> bool {
392        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
393    }
394
395    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
396    ///  - The leader certificate is `None`.
397    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
398    ///  - The leader certificate timer has expired.
399    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
400        // Retrieve the current round.
401        let current_round = self.storage().current_round();
402        // Ensure the current round matches the given round.
403        if current_round != odd_round {
404            warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
405            return false;
406        }
407        // If the current round is even, return false.
408        if current_round % 2 != 1 {
409            error!("BFT does not compute stakes for the leader certificate in an even round");
410            return false;
411        }
412        // Retrieve the certificates for the current round.
413        let current_certificates = self.storage().get_certificates_for_round(current_round);
414        // Retrieve the committee lookback for the current round.
415        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
416            Ok(committee) => committee,
417            Err(e) => {
418                error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
419                return false;
420            }
421        };
422        // Retrieve the authors of the current certificates.
423        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
424        // Check if quorum threshold is reached.
425        if !committee_lookback.is_quorum_threshold_reached(&authors) {
426            trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
427            return false;
428        }
429        // Retrieve the leader certificate.
430        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
431            // If there is no leader certificate for the previous round, return 'true'.
432            return true;
433        };
434        // Compute the stake for the leader certificate.
435        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
436            leader_certificate.id(),
437            current_certificates,
438            &committee_lookback,
439        );
440        // Return 'true' if any of the following conditions hold:
441        stake_with_leader >= committee_lookback.availability_threshold()
442            || stake_without_leader >= committee_lookback.quorum_threshold()
443            || self.is_timer_expired()
444    }
445
446    /// Computes the amount of stake that has & has not signed for the leader certificate.
447    fn compute_stake_for_leader_certificate(
448        &self,
449        leader_certificate_id: Field<N>,
450        current_certificates: IndexSet<BatchCertificate<N>>,
451        current_committee: &Committee<N>,
452    ) -> (u64, u64) {
453        // If there are no current certificates, return early.
454        if current_certificates.is_empty() {
455            return (0, 0);
456        }
457
458        // Initialize a tracker for the stake with the leader.
459        let mut stake_with_leader = 0u64;
460        // Initialize a tracker for the stake without the leader.
461        let mut stake_without_leader = 0u64;
462        // Iterate over the current certificates.
463        for certificate in current_certificates {
464            // Retrieve the stake for the author of the certificate.
465            let stake = current_committee.get_stake(certificate.author());
466            // Determine if the certificate includes the leader.
467            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
468                // If the certificate includes the leader, add the stake to the stake with the leader.
469                true => stake_with_leader = stake_with_leader.saturating_add(stake),
470                // If the certificate does not include the leader, add the stake to the stake without the leader.
471                false => stake_without_leader = stake_without_leader.saturating_add(stake),
472            }
473        }
474        // Return the stake with the leader, and the stake without the leader.
475        (stake_with_leader, stake_without_leader)
476    }
477}
478
479impl<N: Network> BFT<N> {
480    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
481    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
482        &self,
483        certificate: BatchCertificate<N>,
484    ) -> Result<()> {
485        // Acquire the BFT lock.
486        let _lock = self.lock.lock().await;
487
488        // Retrieve the round of the new certificate to add to the DAG.
489        let certificate_round = certificate.round();
490
491        // Insert the certificate into the DAG.
492        self.dag.write().insert(certificate);
493
494        // Get the previous round number.
495        let commit_round = certificate_round.saturating_sub(1);
496
497        // Leaders are elected in even rounds.
498        // If the previous round is odd, the current round cannot commit any leader certs.
499        if commit_round % 2 != 0 || commit_round < 2 {
500            return Ok(());
501        }
502        // If the commit round is at or below the last committed round, return early.
503        if commit_round <= self.dag.read().last_committed_round() {
504            return Ok(());
505        }
506
507        /* Proceeding to check if the leader is ready to be committed. */
508        trace!("Checking if the leader is ready to be committed for round {commit_round}...");
509
510        // Retrieve the committee lookback for the commit round.
511        let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
512            bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
513        };
514
515        // Either retrieve the cached leader or compute it.
516        let leader = match self.ledger().latest_leader() {
517            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
518            _ => {
519                // Compute the leader for the commit round.
520                let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
521                    bail!("BFT failed to compute the leader for commit round {commit_round}");
522                };
523
524                // Cache the computed leader.
525                self.ledger().update_latest_leader(commit_round, computed_leader);
526
527                computed_leader
528            }
529        };
530
531        // Retrieve the leader certificate for the commit round.
532        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
533        else {
534            trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
535            return Ok(());
536        };
537        // Retrieve all of the certificates for the **certificate** round.
538        let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
539            // TODO (howardwu): Investigate how many certificates we should have at this point.
540            bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
541        };
542        // Retrieve the committee lookback for the certificate round (i.e. the round just after the commit round).
543        let Ok(certificate_committee_lookback) = self.ledger().get_committee_lookback_for_round(certificate_round)
544        else {
545            bail!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}");
546        };
547        // Construct a set over the authors who included the leader's certificate in the certificate round.
548        let authors = certificates
549            .values()
550            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
551                true => Some(c.author()),
552                false => None,
553            })
554            .collect();
555        // Check if the leader is ready to be committed.
556        if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
557            // If the leader is not ready to be committed, return early.
558            trace!("BFT is not ready to commit {commit_round}");
559            return Ok(());
560        }
561
562        /* Proceeding to commit the leader. */
563        info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
564
565        // Commit the leader certificate, and all previous leader certificates since the last committed round.
566        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
567    }
568
569    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
570    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
571        &self,
572        leader_certificate: BatchCertificate<N>,
573    ) -> Result<()> {
574        // Fetch the leader round.
575        let latest_leader_round = leader_certificate.round();
576        // Determine the list of all previous leader certificates since the last committed round.
577        // The order of the leader certificates is from **newest** to **oldest**.
578        let mut leader_certificates = vec![leader_certificate.clone()];
579        {
580            // Retrieve the leader round.
581            let leader_round = leader_certificate.round();
582
583            let mut current_certificate = leader_certificate;
584            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
585            {
586                // Retrieve the previous committee for the leader round.
587                let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
588                    Ok(committee) => committee,
589                    Err(e) => {
590                        bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
591                    }
592                };
593                // Either retrieve the cached leader or compute it.
594                let leader = match self.ledger().latest_leader() {
595                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
596                    _ => {
597                        // Compute the leader for the commit round.
598                        let computed_leader = match previous_committee_lookback.get_leader(round) {
599                            Ok(leader) => leader,
600                            Err(e) => {
601                                bail!("BFT failed to compute the leader for the even round {round} - {e}");
602                            }
603                        };
604
605                        // Cache the computed leader.
606                        self.ledger().update_latest_leader(round, computed_leader);
607
608                        computed_leader
609                    }
610                };
611                // Retrieve the previous leader certificate.
612                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
613                else {
614                    continue;
615                };
616                // Determine if there is a path between the previous certificate and the current certificate.
617                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
618                    // Add the previous leader certificate to the list of certificates to commit.
619                    leader_certificates.push(previous_certificate.clone());
620                    // Update the current certificate to the previous leader certificate.
621                    current_certificate = previous_certificate;
622                }
623            }
624        }
625
626        // Iterate over the leader certificates to commit.
627        for leader_certificate in leader_certificates.into_iter().rev() {
628            // Retrieve the leader certificate round.
629            let leader_round = leader_certificate.round();
630            // Compute the commit subdag.
631            let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
632                Ok(subdag) => subdag,
633                Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
634            };
635            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
636            if !IS_SYNCING {
637                // Initialize a map for the deduped transmissions.
638                let mut transmissions = IndexMap::new();
639                // Initialize a map for the deduped transaction ids.
640                let mut seen_transaction_ids = IndexSet::new();
641                // Initialize a map for the deduped solution ids.
642                let mut seen_solution_ids = IndexSet::new();
643                // Start from the oldest leader certificate.
644                for certificate in commit_subdag.values().flatten() {
645                    // Retrieve the transmissions.
646                    for transmission_id in certificate.transmission_ids() {
647                        // If the transaction ID or solution ID already exists in the map, skip it.
648                        // Note: This additional check is done to ensure that we do not include duplicate
649                        // transaction IDs or solution IDs that may have a different transmission ID.
650                        match transmission_id {
651                            TransmissionID::Solution(solution_id, _) => {
652                                // If the solution already exists, skip it.
653                                if seen_solution_ids.contains(&solution_id) {
654                                    continue;
655                                }
656                            }
657                            TransmissionID::Transaction(transaction_id, _) => {
658                                // If the transaction already exists, skip it.
659                                if seen_transaction_ids.contains(transaction_id) {
660                                    continue;
661                                }
662                            }
663                            TransmissionID::Ratification => {
664                                bail!("Ratifications are currently not supported in the BFT.")
665                            }
666                        }
667                        // If the transmission already exists in the map, skip it.
668                        if transmissions.contains_key(transmission_id) {
669                            continue;
670                        }
671                        // If the transmission already exists in the ledger, skip it.
672                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
673                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
674                            continue;
675                        }
676                        // Retrieve the transmission.
677                        let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
678                            bail!(
679                                "BFT failed to retrieve transmission '{}.{}' from round {}",
680                                fmt_id(transmission_id),
681                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
682                                certificate.round()
683                            );
684                        };
685                        // Insert the transaction ID or solution ID into the map.
686                        match transmission_id {
687                            TransmissionID::Solution(id, _) => {
688                                seen_solution_ids.insert(id);
689                            }
690                            TransmissionID::Transaction(id, _) => {
691                                seen_transaction_ids.insert(id);
692                            }
693                            TransmissionID::Ratification => {}
694                        }
695                        // Add the transmission to the set.
696                        transmissions.insert(*transmission_id, transmission);
697                    }
698                }
699                // Trigger consensus, as this will build a new block for the ledger.
700                // Construct the subdag.
701                let subdag = Subdag::from(commit_subdag.clone())?;
702                // Retrieve the anchor round.
703                let anchor_round = subdag.anchor_round();
704                // Retrieve the number of transmissions.
705                let num_transmissions = transmissions.len();
706                // Retrieve metadata about the subdag.
707                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
708
709                // Ensure the subdag anchor round matches the leader round.
710                ensure!(
711                    anchor_round == leader_round,
712                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
713                );
714
715                // Trigger consensus.
716                if let Some(consensus_sender) = self.consensus_sender.get() {
717                    // Initialize a callback sender and receiver.
718                    let (callback_sender, callback_receiver) = oneshot::channel();
719                    // Send the subdag and transmissions to consensus.
720                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
721                    // Await the callback to continue.
722                    match callback_receiver.await {
723                        Ok(Ok(())) => (), // continue
724                        Ok(Err(e)) => {
725                            error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
726                            return Ok(());
727                        }
728                        Err(e) => {
729                            error!("BFT failed to receive the callback for round {anchor_round} - {e}");
730                            return Ok(());
731                        }
732                    }
733                }
734
735                info!(
736                    "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
737                );
738            }
739
740            // Update the DAG, as the subdag was successfully included into a block.
741            let mut dag_write = self.dag.write();
742            for certificate in commit_subdag.values().flatten() {
743                dag_write.commit(certificate, self.storage().max_gc_rounds());
744            }
745
746            // Update the validator telemetry.
747            #[cfg(feature = "telemetry")]
748            self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
749        }
750
751        // Perform garbage collection based on the latest committed leader round.
752        // The protocol guarantees that validators commit the same anchors in the same order,
753        // but they may do so in different chunks of anchors,
754        // where 'chunk' refers to the vector of certificates that the loop just above iterates over.
755        // Doing garbage collection at the end of each chunk (as we do here),
756        // as opposed to after each certificate in the chunk (if we moved this call inside the loop, at the end),
757        // may give raise to a discrepancy between the DAGs of different validators who commit different chunks:
758        // one validator may have more certificates than the other, not yet garbage collected.
759        // However, when `order_dag_with_dfs()` collects the sub-DAG to commit from an anchor,
760        // it excludes certificates that are below the GC round,
761        // so the possible aforementioned discrepancy between DAGs should not affect the consensus.
762        // That exclusion in `order_dag_with_dfs()` is critical to prevent forking,
763        // so long as garbage collection is done after each chunk.
764        // If garbage collection were done after each committed certificate,
765        // that exclusion in `order_dag_with_dfs()` should be unnecessary.
766        self.storage().garbage_collect_certificates(latest_leader_round);
767
768        Ok(())
769    }
770
771    /// Returns the subdag of batch certificates to commit.
772    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
773        &self,
774        leader_certificate: BatchCertificate<N>,
775    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
776        // Initialize a map for the certificates to commit.
777        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
778        // Initialize a set for the already ordered certificates.
779        let mut already_ordered = HashSet::new();
780        // Initialize a buffer for the certificates to order.
781        let mut buffer = vec![leader_certificate];
782        // Iterate over the certificates to order.
783        while let Some(certificate) = buffer.pop() {
784            // Insert the certificate into the map.
785            commit.entry(certificate.round()).or_default().insert(certificate.clone());
786
787            // Check if the previous certificate is below the GC round.
788            // This is currently a critical check to prevent forking,
789            // as explained in the comment at the end of `commit_leader_certificate()`,
790            // just before the call to garbage collection.
791            let previous_round = certificate.round().saturating_sub(1);
792            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
793                continue;
794            }
795            // Iterate over the previous certificate IDs.
796            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
797            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
798            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
799                // If the previous certificate is already ordered, continue.
800                if already_ordered.contains(previous_certificate_id) {
801                    continue;
802                }
803                // If the previous certificate was recently committed, continue.
804                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
805                    continue;
806                }
807                // If the previous certificate already exists in the ledger, continue.
808                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
809                    continue;
810                }
811
812                // Retrieve the previous certificate.
813                let previous_certificate = {
814                    // Start by retrieving the previous certificate from the DAG.
815                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
816                        // If the previous certificate is found, return it.
817                        Some(previous_certificate) => previous_certificate,
818                        // If the previous certificate is not found, retrieve it from the storage.
819                        None => match self.storage().get_certificate(*previous_certificate_id) {
820                            // If the previous certificate is found, return it.
821                            Some(previous_certificate) => previous_certificate,
822                            // Otherwise, the previous certificate is missing, and throw an error.
823                            None => bail!(
824                                "Missing previous certificate {} for round {previous_round}",
825                                fmt_id(previous_certificate_id)
826                            ),
827                        },
828                    }
829                };
830                // Insert the previous certificate into the set of already ordered certificates.
831                already_ordered.insert(previous_certificate.id());
832                // Insert the previous certificate into the buffer.
833                buffer.push(previous_certificate);
834            }
835        }
836        // Ensure we only retain certificates that are above the GC round.
837        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
838        // Return the certificates to commit.
839        Ok(commit)
840    }
841
842    /// Returns `true` if there is a path from the previous certificate to the current certificate.
843    fn is_linked(
844        &self,
845        previous_certificate: BatchCertificate<N>,
846        current_certificate: BatchCertificate<N>,
847    ) -> Result<bool> {
848        // Initialize the list containing the traversal.
849        let mut traversal = vec![current_certificate.clone()];
850        // Iterate over the rounds from the current certificate to the previous certificate.
851        for round in (previous_certificate.round()..current_certificate.round()).rev() {
852            // Retrieve all of the certificates for this past round.
853            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
854                // This is a critical error, as the traversal should have these certificates.
855                // If this error is hit, it is likely that the maximum GC rounds should be increased.
856                bail!("BFT failed to retrieve the certificates for past round {round}");
857            };
858            // Filter the certificates to only include those that are in the traversal.
859            traversal = certificates
860                .into_values()
861                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
862                .collect();
863        }
864        Ok(traversal.contains(&previous_certificate))
865    }
866}
867
868impl<N: Network> BFT<N> {
869    /// Starts the BFT handlers.
870    fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
871        let BFTReceiver {
872            mut rx_primary_round,
873            mut rx_primary_certificate,
874            mut rx_sync_bft_dag_at_bootup,
875            mut rx_sync_bft,
876        } = bft_receiver;
877
878        // Process the current round from the primary.
879        let self_ = self.clone();
880        self.spawn(async move {
881            while let Some((current_round, callback)) = rx_primary_round.recv().await {
882                callback.send(self_.update_to_next_round(current_round)).ok();
883            }
884        });
885
886        // Process the certificate from the primary.
887        let self_ = self.clone();
888        self.spawn(async move {
889            while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
890                // Update the DAG with the certificate.
891                let result = self_.update_dag::<true, false>(certificate).await;
892                // Send the callback **after** updating the DAG.
893                // Note: We must await the DAG update before proceeding.
894                callback.send(result).ok();
895            }
896        });
897
898        // Process the request to sync the BFT DAG at bootup.
899        let self_ = self.clone();
900        self.spawn(async move {
901            while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
902                self_.sync_bft_dag_at_bootup(certificates).await;
903            }
904        });
905
906        // Handler for new certificates that were fetched by the sync module.
907        let self_ = self.clone();
908        self.spawn(async move {
909            while let Some((certificate, callback)) = rx_sync_bft.recv().await {
910                // Update the DAG with the certificate.
911                let result = self_.update_dag::<true, true>(certificate).await;
912                // Send the callback **after** updating the DAG.
913                // Note: We must await the DAG update before proceeding.
914                callback.send(result).ok();
915            }
916        });
917    }
918
919    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
920    /// already exist in the ledger.
921    ///
922    /// This method commits all the certificates into the DAG.
923    /// Note that there is no need to insert the certificates into the DAG, because these certificates
924    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
925    async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
926        // Acquire the BFT write lock.
927        let mut dag = self.dag.write();
928
929        // Commit all the certificates.
930        for certificate in certificates {
931            dag.commit(&certificate, self.storage().max_gc_rounds());
932        }
933    }
934
935    /// Spawns a task with the given future; it should only be used for long-running tasks.
936    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
937        self.handles.lock().push(tokio::spawn(future));
938    }
939
940    /// Shuts down the BFT.
941    pub async fn shut_down(&self) {
942        info!("Shutting down the BFT...");
943        // Acquire the lock.
944        let _lock = self.lock.lock().await;
945        // Shut down the primary.
946        self.primary.shut_down().await;
947        // Abort the tasks.
948        self.handles.lock().iter().for_each(|handle| handle.abort());
949    }
950}
951
952#[cfg(test)]
953mod tests {
954    use crate::{BFT, MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, helpers::Storage};
955    use snarkos_account::Account;
956    use snarkos_node_bft_ledger_service::MockLedgerService;
957    use snarkos_node_bft_storage_service::BFTMemoryService;
958    use snarkos_node_sync::BlockSync;
959    use snarkvm::{
960        console::account::{Address, PrivateKey},
961        ledger::{
962            committee::Committee,
963            narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
964        },
965        utilities::TestRng,
966    };
967
968    use aleo_std::StorageMode;
969    use anyhow::Result;
970    use indexmap::{IndexMap, IndexSet};
971    use std::sync::Arc;
972
973    type CurrentNetwork = snarkvm::console::network::MainnetV0;
974
975    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
976    fn sample_test_instance(
977        committee_round: Option<u64>,
978        max_gc_rounds: u64,
979        rng: &mut TestRng,
980    ) -> (
981        Committee<CurrentNetwork>,
982        Account<CurrentNetwork>,
983        Arc<MockLedgerService<CurrentNetwork>>,
984        Storage<CurrentNetwork>,
985    ) {
986        let committee = match committee_round {
987            Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
988            None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
989        };
990        let account = Account::new(rng).unwrap();
991        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
992        let transmissions = Arc::new(BFTMemoryService::new());
993        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
994
995        (committee, account, ledger, storage)
996    }
997
998    // Helper function to set up BFT for testing.
999    fn initialize_bft(
1000        account: Account<CurrentNetwork>,
1001        storage: Storage<CurrentNetwork>,
1002        ledger: Arc<MockLedgerService<CurrentNetwork>>,
1003    ) -> anyhow::Result<BFT<CurrentNetwork>> {
1004        // Create the block synchronization logic.
1005        let block_sync = Arc::new(BlockSync::new(ledger.clone()));
1006        // Initialize the BFT.
1007        BFT::new(
1008            account.clone(),
1009            storage.clone(),
1010            ledger.clone(),
1011            block_sync,
1012            None,
1013            &[],
1014            false,
1015            StorageMode::new_test(None),
1016            None,
1017        )
1018    }
1019
1020    #[test]
1021    #[tracing_test::traced_test]
1022    fn test_is_leader_quorum_odd() -> Result<()> {
1023        let rng = &mut TestRng::default();
1024
1025        // Sample batch certificates.
1026        let mut certificates = IndexSet::new();
1027        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1028        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1029        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1030        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1031
1032        // Initialize the committee.
1033        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1034            1,
1035            vec![
1036                certificates[0].author(),
1037                certificates[1].author(),
1038                certificates[2].author(),
1039                certificates[3].author(),
1040            ],
1041            rng,
1042        );
1043
1044        // Initialize the ledger.
1045        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1046        // Initialize the storage.
1047        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1048        // Initialize the account.
1049        let account = Account::new(rng)?;
1050        // Initialize the BFT.
1051        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1052        assert!(bft.is_timer_expired());
1053        // Ensure this call succeeds on an odd round.
1054        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1055        // If timer has expired but quorum threshold is not reached, return 'false'.
1056        assert!(!result);
1057        // Insert certificates into storage.
1058        for certificate in certificates.iter() {
1059            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1060        }
1061        // Ensure this call succeeds on an odd round.
1062        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1063        assert!(result); // no previous leader certificate
1064        // Set the leader certificate.
1065        let leader_certificate = sample_batch_certificate(rng);
1066        *bft.leader_certificate.write() = Some(leader_certificate);
1067        // Ensure this call succeeds on an odd round.
1068        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1069        assert!(result); // should now fall through to the end of function
1070
1071        Ok(())
1072    }
1073
1074    #[test]
1075    #[tracing_test::traced_test]
1076    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1077        let rng = &mut TestRng::default();
1078
1079        // Sample the test instance.
1080        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1081        assert_eq!(committee.starting_round(), 1);
1082        assert_eq!(storage.current_round(), 1);
1083        assert_eq!(storage.max_gc_rounds(), 10);
1084
1085        // Set up the BFT logic.
1086        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1087        assert!(bft.is_timer_expired());
1088
1089        // Store is at round 1, and we are checking for round 2.
1090        // Ensure this call fails on an even round.
1091        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1092        assert!(!result);
1093        Ok(())
1094    }
1095
1096    #[test]
1097    #[tracing_test::traced_test]
1098    fn test_is_leader_quorum_even() -> Result<()> {
1099        let rng = &mut TestRng::default();
1100
1101        // Sample the test instance.
1102        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1103        assert_eq!(committee.starting_round(), 2);
1104        assert_eq!(storage.current_round(), 2);
1105        assert_eq!(storage.max_gc_rounds(), 10);
1106
1107        // Set up the BFT logic.
1108        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1109        assert!(bft.is_timer_expired());
1110
1111        // Ensure this call fails on an even round.
1112        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1113        assert!(!result);
1114        Ok(())
1115    }
1116
1117    #[test]
1118    #[tracing_test::traced_test]
1119    fn test_is_even_round_ready() -> Result<()> {
1120        let rng = &mut TestRng::default();
1121
1122        // Sample batch certificates.
1123        let mut certificates = IndexSet::new();
1124        certificates.insert(sample_batch_certificate_for_round(2, rng));
1125        certificates.insert(sample_batch_certificate_for_round(2, rng));
1126        certificates.insert(sample_batch_certificate_for_round(2, rng));
1127        certificates.insert(sample_batch_certificate_for_round(2, rng));
1128
1129        // Initialize the committee.
1130        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1131            2,
1132            vec![
1133                certificates[0].author(),
1134                certificates[1].author(),
1135                certificates[2].author(),
1136                certificates[3].author(),
1137            ],
1138            rng,
1139        );
1140
1141        // Initialize the ledger.
1142        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1143        // Initialize the storage.
1144        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1145        // Initialize the account.
1146        let account = Account::new(rng)?;
1147
1148        // Set up the BFT logic.
1149        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1150        assert!(bft.is_timer_expired());
1151
1152        // Set the leader certificate.
1153        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1154        *bft.leader_certificate.write() = Some(leader_certificate);
1155        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1156        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1157        assert!(!result);
1158        // Once quorum threshold is reached, we are ready for the next round.
1159        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1160        assert!(result);
1161
1162        // Initialize a new BFT.
1163        let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1164        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1165        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1166        if !bft_timer.is_timer_expired() {
1167            assert!(!result);
1168        }
1169        // Wait for the timer to expire.
1170        let leader_certificate_timeout =
1171            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1172        std::thread::sleep(leader_certificate_timeout);
1173        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1174        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1175        if bft_timer.is_timer_expired() {
1176            assert!(result);
1177        } else {
1178            assert!(!result);
1179        }
1180
1181        Ok(())
1182    }
1183
1184    #[test]
1185    #[tracing_test::traced_test]
1186    fn test_update_leader_certificate_odd() -> Result<()> {
1187        let rng = &mut TestRng::default();
1188
1189        // Sample the test instance.
1190        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1191        assert_eq!(storage.max_gc_rounds(), 10);
1192
1193        // Initialize the BFT.
1194        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1195        assert!(bft.is_timer_expired());
1196
1197        // Ensure this call fails on an odd round.
1198        let result = bft.update_leader_certificate_to_even_round(1);
1199        assert!(!result);
1200        Ok(())
1201    }
1202
1203    #[test]
1204    #[tracing_test::traced_test]
1205    fn test_update_leader_certificate_bad_round() -> Result<()> {
1206        let rng = &mut TestRng::default();
1207
1208        // Sample the test instance.
1209        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1210        assert_eq!(storage.max_gc_rounds(), 10);
1211
1212        // Initialize the BFT.
1213        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1214
1215        // Ensure this call succeeds on an even round.
1216        let result = bft.update_leader_certificate_to_even_round(6);
1217        assert!(!result);
1218        Ok(())
1219    }
1220
1221    #[test]
1222    #[tracing_test::traced_test]
1223    fn test_update_leader_certificate_even() -> Result<()> {
1224        let rng = &mut TestRng::default();
1225
1226        // Set the current round.
1227        let current_round = 3;
1228
1229        // Sample the certificates.
1230        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1231            current_round,
1232            rng,
1233        );
1234
1235        // Initialize the committee.
1236        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1237            2,
1238            vec![
1239                certificates[0].author(),
1240                certificates[1].author(),
1241                certificates[2].author(),
1242                certificates[3].author(),
1243            ],
1244            rng,
1245        );
1246
1247        // Initialize the ledger.
1248        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1249
1250        // Initialize the storage.
1251        let transmissions = Arc::new(BFTMemoryService::new());
1252        let storage = Storage::new(ledger.clone(), transmissions, 10);
1253        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1254        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1255        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1256        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1257        assert_eq!(storage.current_round(), 2);
1258
1259        // Retrieve the leader certificate.
1260        let leader = committee.get_leader(2).unwrap();
1261        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1262
1263        // Initialize the BFT.
1264        let account = Account::new(rng)?;
1265        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1266
1267        // Set the leader certificate.
1268        *bft.leader_certificate.write() = Some(leader_certificate);
1269
1270        // Update the leader certificate.
1271        // Ensure this call succeeds on an even round.
1272        let result = bft.update_leader_certificate_to_even_round(2);
1273        assert!(result);
1274
1275        Ok(())
1276    }
1277
1278    #[tokio::test]
1279    #[tracing_test::traced_test]
1280    async fn test_order_dag_with_dfs() -> Result<()> {
1281        let rng = &mut TestRng::default();
1282
1283        // Sample the test instance.
1284        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1285
1286        // Initialize the round parameters.
1287        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1288        let current_round = previous_round + 1;
1289
1290        // Sample the current certificate and previous certificates.
1291        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1292            current_round,
1293            rng,
1294        );
1295
1296        /* Test GC */
1297
1298        // Ensure the function succeeds in returning only certificates above GC.
1299        {
1300            // Initialize the storage.
1301            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1302            // Initialize the BFT.
1303            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1304
1305            // Insert a mock DAG in the BFT.
1306            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1307
1308            // Insert the previous certificates into the BFT.
1309            for certificate in previous_certificates.clone() {
1310                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1311            }
1312
1313            // Ensure this call succeeds and returns all given certificates.
1314            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1315            assert!(result.is_ok());
1316            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1317            assert_eq!(candidate_certificates.len(), 1);
1318            let expected_certificates = vec![certificate.clone()];
1319            assert_eq!(
1320                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1321                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1322            );
1323            assert_eq!(candidate_certificates, expected_certificates);
1324        }
1325
1326        /* Test normal case */
1327
1328        // Ensure the function succeeds in returning all given certificates.
1329        {
1330            // Initialize the storage.
1331            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1332            // Initialize the BFT.
1333            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1334
1335            // Insert a mock DAG in the BFT.
1336            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1337
1338            // Insert the previous certificates into the BFT.
1339            for certificate in previous_certificates.clone() {
1340                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1341            }
1342
1343            // Ensure this call succeeds and returns all given certificates.
1344            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1345            assert!(result.is_ok());
1346            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1347            assert_eq!(candidate_certificates.len(), 5);
1348            let expected_certificates = vec![
1349                previous_certificates[0].clone(),
1350                previous_certificates[1].clone(),
1351                previous_certificates[2].clone(),
1352                previous_certificates[3].clone(),
1353                certificate,
1354            ];
1355            assert_eq!(
1356                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1357                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1358            );
1359            assert_eq!(candidate_certificates, expected_certificates);
1360        }
1361
1362        Ok(())
1363    }
1364
1365    #[test]
1366    #[tracing_test::traced_test]
1367    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1368        let rng = &mut TestRng::default();
1369
1370        // Sample the test instance.
1371        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1372        assert_eq!(committee.starting_round(), 1);
1373        assert_eq!(storage.current_round(), 1);
1374        assert_eq!(storage.max_gc_rounds(), 1);
1375
1376        // Initialize the round parameters.
1377        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1378        let current_round = previous_round + 1;
1379
1380        // Sample the current certificate and previous certificates.
1381        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1382            current_round,
1383            rng,
1384        );
1385        // Construct the previous certificate IDs.
1386        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1387
1388        /* Test missing previous certificate. */
1389
1390        // Initialize the BFT.
1391        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1392
1393        // The expected error message.
1394        let error_msg = format!(
1395            "Missing previous certificate {} for round {previous_round}",
1396            crate::helpers::fmt_id(previous_certificate_ids[3]),
1397        );
1398
1399        // Ensure this call fails on a missing previous certificate.
1400        let result = bft.order_dag_with_dfs::<false>(certificate);
1401        assert!(result.is_err());
1402        assert_eq!(result.unwrap_err().to_string(), error_msg);
1403        Ok(())
1404    }
1405
1406    #[tokio::test]
1407    #[tracing_test::traced_test]
1408    async fn test_bft_gc_on_commit() -> Result<()> {
1409        let rng = &mut TestRng::default();
1410
1411        // Initialize the round parameters.
1412        let max_gc_rounds = 1;
1413        let committee_round = 0;
1414        let commit_round = 2;
1415        let current_round = commit_round + 1;
1416
1417        // Sample the certificates.
1418        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1419            current_round,
1420            rng,
1421        );
1422
1423        // Initialize the committee.
1424        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1425            committee_round,
1426            vec![
1427                certificates[0].author(),
1428                certificates[1].author(),
1429                certificates[2].author(),
1430                certificates[3].author(),
1431            ],
1432            rng,
1433        );
1434
1435        // Initialize the ledger.
1436        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1437
1438        // Initialize the storage.
1439        let transmissions = Arc::new(BFTMemoryService::new());
1440        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1441        // Insert the certificates into the storage.
1442        for certificate in certificates.iter() {
1443            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1444        }
1445
1446        // Get the leader certificate.
1447        let leader = committee.get_leader(commit_round).unwrap();
1448        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1449
1450        // Initialize the BFT.
1451        let account = Account::new(rng)?;
1452        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1453
1454        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1455
1456        // Ensure that the `gc_round` has not been updated yet.
1457        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1458
1459        // Insert the certificates into the BFT.
1460        for certificate in certificates {
1461            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1462        }
1463
1464        // Commit the leader certificate.
1465        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1466
1467        // Ensure that the `gc_round` has been updated.
1468        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1469
1470        Ok(())
1471    }
1472
1473    #[tokio::test]
1474    #[tracing_test::traced_test]
1475    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1476        let rng = &mut TestRng::default();
1477
1478        // Initialize the round parameters.
1479        let max_gc_rounds = 1;
1480        let committee_round = 0;
1481        let commit_round = 2;
1482        let current_round = commit_round + 1;
1483
1484        // Sample the current certificate and previous certificates.
1485        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1486            current_round,
1487            rng,
1488        );
1489
1490        // Initialize the committee.
1491        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1492            committee_round,
1493            vec![
1494                certificates[0].author(),
1495                certificates[1].author(),
1496                certificates[2].author(),
1497                certificates[3].author(),
1498            ],
1499            rng,
1500        );
1501
1502        // Initialize the ledger.
1503        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1504
1505        // Initialize the storage.
1506        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1507        // Insert the certificates into the storage.
1508        for certificate in certificates.iter() {
1509            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1510        }
1511
1512        // Get the leader certificate.
1513        let leader = committee.get_leader(commit_round).unwrap();
1514        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1515
1516        // Initialize the BFT.
1517        let account = Account::new(rng)?;
1518        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1519
1520        // Insert a mock DAG in the BFT.
1521        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1522
1523        // Insert the previous certificates into the BFT.
1524        for certificate in certificates.clone() {
1525            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1526        }
1527
1528        // Commit the leader certificate.
1529        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1530
1531        // Simulate a bootup of the BFT.
1532
1533        // Initialize a new instance of storage.
1534        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1535        // Initialize a new instance of BFT.
1536        let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1537
1538        // Sync the BFT DAG at bootup.
1539        bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1540
1541        // Check that the BFT starts from the same last committed round.
1542        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1543
1544        // Ensure that both BFTs have committed the leader certificate.
1545        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1546        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1547
1548        // Check the state of the bootup BFT.
1549        for certificate in certificates {
1550            let certificate_round = certificate.round();
1551            let certificate_id = certificate.id();
1552            // Check that the bootup BFT has committed the certificates.
1553            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1554            // Check that the bootup BFT does not contain the certificates in its graph, because
1555            // it should not need to order them again in subsequent subdags.
1556            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1557        }
1558
1559        Ok(())
1560    }
1561
1562    #[tokio::test]
1563    #[tracing_test::traced_test]
1564    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1565        /*
1566        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1567        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1568        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1569        */
1570
1571        let rng = &mut TestRng::default();
1572
1573        // Initialize the round parameters.
1574        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1575        let committee_round = 0;
1576        let commit_round = 2;
1577        let current_round = commit_round + 1;
1578        let next_round = current_round + 1;
1579
1580        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1581        let (round_to_certificates_map, committee) = {
1582            let private_keys = vec![
1583                PrivateKey::new(rng).unwrap(),
1584                PrivateKey::new(rng).unwrap(),
1585                PrivateKey::new(rng).unwrap(),
1586                PrivateKey::new(rng).unwrap(),
1587            ];
1588            let addresses = vec![
1589                Address::try_from(private_keys[0])?,
1590                Address::try_from(private_keys[1])?,
1591                Address::try_from(private_keys[2])?,
1592                Address::try_from(private_keys[3])?,
1593            ];
1594            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1595                committee_round,
1596                addresses,
1597                rng,
1598            );
1599            // Initialize a mapping from the round number to the set of batch certificates in the round.
1600            let mut round_to_certificates_map: IndexMap<
1601                u64,
1602                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1603            > = IndexMap::new();
1604            let mut previous_certificates = IndexSet::with_capacity(4);
1605            // Initialize the genesis batch certificates.
1606            for _ in 0..4 {
1607                previous_certificates.insert(sample_batch_certificate(rng));
1608            }
1609            for round in 0..commit_round + 3 {
1610                let mut current_certificates = IndexSet::new();
1611                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1612                    IndexSet::new()
1613                } else {
1614                    previous_certificates.iter().map(|c| c.id()).collect()
1615                };
1616                let transmission_ids =
1617                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1618                        .into_iter()
1619                        .collect::<IndexSet<_>>();
1620                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1621                let committee_id = committee.id();
1622                for (i, private_key_1) in private_keys.iter().enumerate() {
1623                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1624                        private_key_1,
1625                        round,
1626                        timestamp,
1627                        committee_id,
1628                        transmission_ids.clone(),
1629                        previous_certificate_ids.clone(),
1630                        rng,
1631                    )
1632                    .unwrap();
1633                    let mut signatures = IndexSet::with_capacity(4);
1634                    for (j, private_key_2) in private_keys.iter().enumerate() {
1635                        if i != j {
1636                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1637                        }
1638                    }
1639                    let certificate =
1640                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1641                    current_certificates.insert(certificate);
1642                }
1643                // Update the mapping.
1644                round_to_certificates_map.insert(round, current_certificates.clone());
1645                previous_certificates = current_certificates.clone();
1646            }
1647            (round_to_certificates_map, committee)
1648        };
1649
1650        // Initialize the ledger.
1651        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1652        // Initialize the storage.
1653        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1654        // Get the leaders for the next 2 commit rounds.
1655        let leader = committee.get_leader(commit_round).unwrap();
1656        let next_leader = committee.get_leader(next_round).unwrap();
1657        // Insert the pre shutdown certificates into the storage.
1658        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1659        for i in 1..=commit_round {
1660            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1661            if i == commit_round {
1662                // Only insert the leader certificate for the commit round.
1663                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1664                if let Some(c) = leader_certificate {
1665                    pre_shutdown_certificates.push(c.clone());
1666                }
1667                continue;
1668            }
1669            pre_shutdown_certificates.extend(certificates);
1670        }
1671        for certificate in pre_shutdown_certificates.iter() {
1672            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1673        }
1674        // Insert the post shutdown certificates into the storage.
1675        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1676            Vec::new();
1677        for j in commit_round..=commit_round + 2 {
1678            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1679            post_shutdown_certificates.extend(certificate);
1680        }
1681        for certificate in post_shutdown_certificates.iter() {
1682            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1683        }
1684        // Get the leader certificates.
1685        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1686        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1687
1688        // Initialize the BFT without bootup.
1689        let account = Account::new(rng)?;
1690        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1691
1692        // Insert a mock DAG in the BFT without bootup.
1693        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1694
1695        // Insert the certificates into the BFT without bootup.
1696        for certificate in pre_shutdown_certificates.clone() {
1697            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1698        }
1699
1700        // Insert the post shutdown certificates into the BFT without bootup.
1701        for certificate in post_shutdown_certificates.clone() {
1702            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1703        }
1704        // Commit the second leader certificate.
1705        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1706        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1707        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1708
1709        // Simulate a bootup of the BFT.
1710
1711        // Initialize a new instance of storage.
1712        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1713
1714        // Initialize a new instance of BFT with bootup.
1715        let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1716
1717        // Sync the BFT DAG at bootup.
1718        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1719
1720        // Insert the post shutdown certificates to the storage and BFT with bootup.
1721        for certificate in post_shutdown_certificates.iter() {
1722            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1723        }
1724        for certificate in post_shutdown_certificates.clone() {
1725            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1726        }
1727        // Commit the second leader certificate.
1728        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1729        let commit_subdag_metadata_bootup =
1730            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1731        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1732        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1733
1734        // Check that the final state of both BFTs is the same.
1735
1736        // Check that both BFTs start from the same last committed round.
1737        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1738
1739        // Ensure that both BFTs have committed the leader certificates.
1740        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1741        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1742        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1743        assert!(
1744            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1745        );
1746
1747        // Check that the bootup BFT has committed the pre shutdown certificates.
1748        for certificate in pre_shutdown_certificates.clone() {
1749            let certificate_round = certificate.round();
1750            let certificate_id = certificate.id();
1751            // Check that both BFTs have committed the certificates.
1752            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1753            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1754            // Check that the bootup BFT does not contain the certificates in its graph, because
1755            // it should not need to order them again in subsequent subdags.
1756            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1757            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1758        }
1759
1760        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1761        for certificate in committed_certificates_bootup.clone() {
1762            let certificate_round = certificate.round();
1763            let certificate_id = certificate.id();
1764            // Check that the both BFTs have committed the certificates.
1765            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1766            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1767            // Check that the bootup BFT does not contain the certificates in its graph, because
1768            // it should not need to order them again in subsequent subdags.
1769            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1770            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1771        }
1772
1773        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1774        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1775
1776        Ok(())
1777    }
1778
1779    #[tokio::test]
1780    #[tracing_test::traced_test]
1781    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1782        /*
1783        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1784        2. Add post shutdown certificates to the bootup BFT.
1785        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1786        */
1787
1788        let rng = &mut TestRng::default();
1789
1790        // Initialize the round parameters.
1791        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1792        let committee_round = 0;
1793        let commit_round = 2;
1794        let current_round = commit_round + 1;
1795        let next_round = current_round + 1;
1796
1797        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1798        let (round_to_certificates_map, committee) = {
1799            let private_keys = vec![
1800                PrivateKey::new(rng).unwrap(),
1801                PrivateKey::new(rng).unwrap(),
1802                PrivateKey::new(rng).unwrap(),
1803                PrivateKey::new(rng).unwrap(),
1804            ];
1805            let addresses = vec![
1806                Address::try_from(private_keys[0])?,
1807                Address::try_from(private_keys[1])?,
1808                Address::try_from(private_keys[2])?,
1809                Address::try_from(private_keys[3])?,
1810            ];
1811            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1812                committee_round,
1813                addresses,
1814                rng,
1815            );
1816            // Initialize a mapping from the round number to the set of batch certificates in the round.
1817            let mut round_to_certificates_map: IndexMap<
1818                u64,
1819                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1820            > = IndexMap::new();
1821            let mut previous_certificates = IndexSet::with_capacity(4);
1822            // Initialize the genesis batch certificates.
1823            for _ in 0..4 {
1824                previous_certificates.insert(sample_batch_certificate(rng));
1825            }
1826            for round in 0..=commit_round + 2 {
1827                let mut current_certificates = IndexSet::new();
1828                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1829                    IndexSet::new()
1830                } else {
1831                    previous_certificates.iter().map(|c| c.id()).collect()
1832                };
1833                let transmission_ids =
1834                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1835                        .into_iter()
1836                        .collect::<IndexSet<_>>();
1837                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1838                let committee_id = committee.id();
1839                for (i, private_key_1) in private_keys.iter().enumerate() {
1840                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1841                        private_key_1,
1842                        round,
1843                        timestamp,
1844                        committee_id,
1845                        transmission_ids.clone(),
1846                        previous_certificate_ids.clone(),
1847                        rng,
1848                    )
1849                    .unwrap();
1850                    let mut signatures = IndexSet::with_capacity(4);
1851                    for (j, private_key_2) in private_keys.iter().enumerate() {
1852                        if i != j {
1853                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1854                        }
1855                    }
1856                    let certificate =
1857                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1858                    current_certificates.insert(certificate);
1859                }
1860                // Update the mapping.
1861                round_to_certificates_map.insert(round, current_certificates.clone());
1862                previous_certificates = current_certificates.clone();
1863            }
1864            (round_to_certificates_map, committee)
1865        };
1866
1867        // Initialize the ledger.
1868        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1869        // Initialize the storage.
1870        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1871        // Get the leaders for the next 2 commit rounds.
1872        let leader = committee.get_leader(commit_round).unwrap();
1873        let next_leader = committee.get_leader(next_round).unwrap();
1874        // Insert the pre shutdown certificates into the storage.
1875        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1876        for i in 1..=commit_round {
1877            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1878            if i == commit_round {
1879                // Only insert the leader certificate for the commit round.
1880                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1881                if let Some(c) = leader_certificate {
1882                    pre_shutdown_certificates.push(c.clone());
1883                }
1884                continue;
1885            }
1886            pre_shutdown_certificates.extend(certificates);
1887        }
1888        for certificate in pre_shutdown_certificates.iter() {
1889            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1890        }
1891        // Initialize the bootup BFT.
1892        let account = Account::new(rng)?;
1893        let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1894
1895        // Insert a mock DAG in the BFT without bootup.
1896        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1897        // Sync the BFT DAG at bootup.
1898        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1899
1900        // Insert the post shutdown certificates into the storage.
1901        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1902            Vec::new();
1903        for j in commit_round..=commit_round + 2 {
1904            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1905            post_shutdown_certificates.extend(certificate);
1906        }
1907        for certificate in post_shutdown_certificates.iter() {
1908            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1909        }
1910
1911        // Insert the post shutdown certificates into the DAG.
1912        for certificate in post_shutdown_certificates.clone() {
1913            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1914        }
1915
1916        // Get the next leader certificate to commit.
1917        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1918        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1919        let committed_certificates = commit_subdag.values().flatten();
1920
1921        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1922        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1923            for committed_certificate in committed_certificates.clone() {
1924                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1925            }
1926        }
1927        Ok(())
1928    }
1929}