snarkos_node_bft/
bft.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    Primary,
19    helpers::{
20        BFTReceiver,
21        ConsensusSender,
22        DAG,
23        PrimaryReceiver,
24        PrimarySender,
25        Storage,
26        fmt_id,
27        init_bft_channels,
28        now,
29    },
30};
31use snarkos_account::Account;
32use snarkos_node_bft_ledger_service::LedgerService;
33use snarkvm::{
34    console::account::Address,
35    ledger::{
36        block::Transaction,
37        committee::Committee,
38        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
39        puzzle::{Solution, SolutionID},
40    },
41    prelude::{Field, Network, Result, bail, ensure},
42};
43
44use colored::Colorize;
45use indexmap::{IndexMap, IndexSet};
46use parking_lot::{Mutex, RwLock};
47use std::{
48    collections::{BTreeMap, HashSet},
49    future::Future,
50    net::SocketAddr,
51    sync::{
52        Arc,
53        atomic::{AtomicI64, Ordering},
54    },
55};
56use tokio::{
57    sync::{Mutex as TMutex, OnceCell, oneshot},
58    task::JoinHandle,
59};
60
61#[derive(Clone)]
62pub struct BFT<N: Network> {
63    /// The primary.
64    primary: Primary<N>,
65    /// The DAG.
66    dag: Arc<RwLock<DAG<N>>>,
67    /// The batch certificate of the leader from the current even round, if one was present.
68    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
69    /// The timer for the leader certificate to be received.
70    leader_certificate_timer: Arc<AtomicI64>,
71    /// The consensus sender.
72    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
73    /// The spawned handles.
74    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
75    /// The BFT lock.
76    lock: Arc<TMutex<()>>,
77}
78
79impl<N: Network> BFT<N> {
80    /// Initializes a new instance of the BFT.
81    pub fn new(
82        account: Account<N>,
83        storage: Storage<N>,
84        ledger: Arc<dyn LedgerService<N>>,
85        ip: Option<SocketAddr>,
86        trusted_validators: &[SocketAddr],
87        dev: Option<u16>,
88    ) -> Result<Self> {
89        Ok(Self {
90            primary: Primary::new(account, storage, ledger, ip, trusted_validators, dev)?,
91            dag: Default::default(),
92            leader_certificate: Default::default(),
93            leader_certificate_timer: Default::default(),
94            consensus_sender: Default::default(),
95            handles: Default::default(),
96            lock: Default::default(),
97        })
98    }
99
100    /// Run the BFT instance.
101    pub async fn run(
102        &mut self,
103        consensus_sender: Option<ConsensusSender<N>>,
104        primary_sender: PrimarySender<N>,
105        primary_receiver: PrimaryReceiver<N>,
106    ) -> Result<()> {
107        info!("Starting the BFT instance...");
108        // Initialize the BFT channels.
109        let (bft_sender, bft_receiver) = init_bft_channels::<N>();
110        // First, start the BFT handlers.
111        self.start_handlers(bft_receiver);
112        // Next, run the primary instance.
113        self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await?;
114        // Lastly, set the consensus sender.
115        // Note: This ensures during initial syncing, that the BFT does not advance the ledger.
116        if let Some(consensus_sender) = consensus_sender {
117            self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
118        }
119        Ok(())
120    }
121
122    /// Returns `true` if the primary is synced.
123    pub fn is_synced(&self) -> bool {
124        self.primary.is_synced()
125    }
126
127    /// Returns the primary.
128    pub const fn primary(&self) -> &Primary<N> {
129        &self.primary
130    }
131
132    /// Returns the storage.
133    pub const fn storage(&self) -> &Storage<N> {
134        self.primary.storage()
135    }
136
137    /// Returns the ledger.
138    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
139        self.primary.ledger()
140    }
141
142    /// Returns the leader of the current even round, if one was present.
143    pub fn leader(&self) -> Option<Address<N>> {
144        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
145    }
146
147    /// Returns the certificate of the leader from the current even round, if one was present.
148    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
149        &self.leader_certificate
150    }
151}
152
153impl<N: Network> BFT<N> {
154    /// Returns the number of unconfirmed transmissions.
155    pub fn num_unconfirmed_transmissions(&self) -> usize {
156        self.primary.num_unconfirmed_transmissions()
157    }
158
159    /// Returns the number of unconfirmed ratifications.
160    pub fn num_unconfirmed_ratifications(&self) -> usize {
161        self.primary.num_unconfirmed_ratifications()
162    }
163
164    /// Returns the number of solutions.
165    pub fn num_unconfirmed_solutions(&self) -> usize {
166        self.primary.num_unconfirmed_solutions()
167    }
168
169    /// Returns the number of unconfirmed transactions.
170    pub fn num_unconfirmed_transactions(&self) -> usize {
171        self.primary.num_unconfirmed_transactions()
172    }
173}
174
175impl<N: Network> BFT<N> {
176    /// Returns the worker transmission IDs.
177    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
178        self.primary.worker_transmission_ids()
179    }
180
181    /// Returns the worker transmissions.
182    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
183        self.primary.worker_transmissions()
184    }
185
186    /// Returns the worker solutions.
187    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
188        self.primary.worker_solutions()
189    }
190
191    /// Returns the worker transactions.
192    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
193        self.primary.worker_transactions()
194    }
195}
196
197impl<N: Network> BFT<N> {
198    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
199    fn update_to_next_round(&self, current_round: u64) -> bool {
200        // Ensure the current round is at least the storage round (this is a sanity check).
201        let storage_round = self.storage().current_round();
202        if current_round < storage_round {
203            debug!(
204                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
205            );
206            return false;
207        }
208
209        // Determine if the BFT is ready to update to the next round.
210        let is_ready = match current_round % 2 == 0 {
211            true => self.update_leader_certificate_to_even_round(current_round),
212            false => self.is_leader_quorum_or_nonleaders_available(current_round),
213        };
214
215        #[cfg(feature = "metrics")]
216        {
217            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
218            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
219            if start > 0 {
220                let end = now();
221                let elapsed = std::time::Duration::from_secs((end - start) as u64);
222                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
223            }
224        }
225
226        // Log whether the round is going to update.
227        if current_round % 2 == 0 {
228            // Determine if there is a leader certificate.
229            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
230                // Ensure the state of the leader certificate is consistent with the BFT being ready.
231                if !is_ready {
232                    trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
233                }
234                // Log the leader election.
235                let leader_round = leader_certificate.round();
236                match leader_round == current_round {
237                    true => {
238                        info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
239                        #[cfg(feature = "metrics")]
240                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
241                    }
242                    false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
243                }
244            } else {
245                match is_ready {
246                    true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
247                    false => info!("{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed()),
248                }
249            }
250        }
251
252        // If the BFT is ready, then update to the next round.
253        if is_ready {
254            // Update to the next round in storage.
255            if let Err(e) = self.storage().increment_to_next_round(current_round) {
256                warn!("BFT failed to increment to the next round from round {current_round} - {e}");
257                return false;
258            }
259            // Update the timer for the leader certificate.
260            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
261        }
262
263        is_ready
264    }
265
266    /// Updates the leader certificate to the current even round,
267    /// returning `true` if the BFT is ready to update to the next round.
268    ///
269    /// This method runs on every even round, by determining the leader of the current even round,
270    /// and setting the leader certificate to their certificate in the round, if they were present.
271    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
272        // Retrieve the current round.
273        let current_round = self.storage().current_round();
274        // Ensure the current round matches the given round.
275        if current_round != even_round {
276            warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
277            return false;
278        }
279
280        // If the current round is odd, return false.
281        if current_round % 2 != 0 || current_round < 2 {
282            error!("BFT cannot update the leader certificate in an odd round");
283            return false;
284        }
285
286        // Retrieve the certificates for the current round.
287        let current_certificates = self.storage().get_certificates_for_round(current_round);
288        // If there are no current certificates, set the leader certificate to 'None', and return early.
289        if current_certificates.is_empty() {
290            // Set the leader certificate to 'None'.
291            *self.leader_certificate.write() = None;
292            return false;
293        }
294
295        // Retrieve the committee lookback of the current round.
296        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
297            Ok(committee) => committee,
298            Err(e) => {
299                error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
300                return false;
301            }
302        };
303        // Determine the leader of the current round.
304        let leader = match self.ledger().latest_leader() {
305            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
306            _ => {
307                // Compute the leader for the current round.
308                let computed_leader = match committee_lookback.get_leader(current_round) {
309                    Ok(leader) => leader,
310                    Err(e) => {
311                        error!("BFT failed to compute the leader for the even round {current_round} - {e}");
312                        return false;
313                    }
314                };
315
316                // Cache the computed leader.
317                self.ledger().update_latest_leader(current_round, computed_leader);
318
319                computed_leader
320            }
321        };
322        // Find and set the leader certificate, if the leader was present in the current even round.
323        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
324        *self.leader_certificate.write() = leader_certificate.cloned();
325
326        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
327    }
328
329    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
330    ///  - If the leader certificate is set for the current even round.
331    ///  - The timer for the leader certificate has expired.
332    fn is_even_round_ready_for_next_round(
333        &self,
334        certificates: IndexSet<BatchCertificate<N>>,
335        committee: Committee<N>,
336        current_round: u64,
337    ) -> bool {
338        // Retrieve the authors for the current round.
339        let authors = certificates.into_iter().map(|c| c.author()).collect();
340        // Check if quorum threshold is reached.
341        if !committee.is_quorum_threshold_reached(&authors) {
342            trace!("BFT failed to reach quorum threshold in even round {current_round}");
343            return false;
344        }
345        // If the leader certificate is set for the current even round, return 'true'.
346        if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
347            if leader_certificate.round() == current_round {
348                return true;
349            }
350        }
351        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
352        if self.is_timer_expired() {
353            debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
354            return true;
355        }
356        // Otherwise, return 'false'.
357        false
358    }
359
360    /// Returns `true` if the timer for the leader certificate has expired.
361    fn is_timer_expired(&self) -> bool {
362        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
363    }
364
365    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
366    ///  - The leader certificate is `None`.
367    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
368    ///  - The leader certificate timer has expired.
369    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
370        // Retrieve the current round.
371        let current_round = self.storage().current_round();
372        // Ensure the current round matches the given round.
373        if current_round != odd_round {
374            warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
375            return false;
376        }
377        // If the current round is even, return false.
378        if current_round % 2 != 1 {
379            error!("BFT does not compute stakes for the leader certificate in an even round");
380            return false;
381        }
382        // Retrieve the certificates for the current round.
383        let current_certificates = self.storage().get_certificates_for_round(current_round);
384        // Retrieve the committee lookback for the current round.
385        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
386            Ok(committee) => committee,
387            Err(e) => {
388                error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
389                return false;
390            }
391        };
392        // Retrieve the authors of the current certificates.
393        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
394        // Check if quorum threshold is reached.
395        if !committee_lookback.is_quorum_threshold_reached(&authors) {
396            trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
397            return false;
398        }
399        // Retrieve the leader certificate.
400        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
401            // If there is no leader certificate for the previous round, return 'true'.
402            return true;
403        };
404        // Compute the stake for the leader certificate.
405        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
406            leader_certificate.id(),
407            current_certificates,
408            &committee_lookback,
409        );
410        // Return 'true' if any of the following conditions hold:
411        stake_with_leader >= committee_lookback.availability_threshold()
412            || stake_without_leader >= committee_lookback.quorum_threshold()
413            || self.is_timer_expired()
414    }
415
416    /// Computes the amount of stake that has & has not signed for the leader certificate.
417    fn compute_stake_for_leader_certificate(
418        &self,
419        leader_certificate_id: Field<N>,
420        current_certificates: IndexSet<BatchCertificate<N>>,
421        current_committee: &Committee<N>,
422    ) -> (u64, u64) {
423        // If there are no current certificates, return early.
424        if current_certificates.is_empty() {
425            return (0, 0);
426        }
427
428        // Initialize a tracker for the stake with the leader.
429        let mut stake_with_leader = 0u64;
430        // Initialize a tracker for the stake without the leader.
431        let mut stake_without_leader = 0u64;
432        // Iterate over the current certificates.
433        for certificate in current_certificates {
434            // Retrieve the stake for the author of the certificate.
435            let stake = current_committee.get_stake(certificate.author());
436            // Determine if the certificate includes the leader.
437            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
438                // If the certificate includes the leader, add the stake to the stake with the leader.
439                true => stake_with_leader = stake_with_leader.saturating_add(stake),
440                // If the certificate does not include the leader, add the stake to the stake without the leader.
441                false => stake_without_leader = stake_without_leader.saturating_add(stake),
442            }
443        }
444        // Return the stake with the leader, and the stake without the leader.
445        (stake_with_leader, stake_without_leader)
446    }
447}
448
449impl<N: Network> BFT<N> {
450    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
451    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
452        &self,
453        certificate: BatchCertificate<N>,
454    ) -> Result<()> {
455        // Acquire the BFT lock.
456        let _lock = self.lock.lock().await;
457
458        // Retrieve the certificate round.
459        let certificate_round = certificate.round();
460        // Insert the certificate into the DAG.
461        self.dag.write().insert(certificate);
462
463        // Construct the commit round.
464        let commit_round = certificate_round.saturating_sub(1);
465        // If the commit round is odd, return early.
466        if commit_round % 2 != 0 || commit_round < 2 {
467            return Ok(());
468        }
469        // If the commit round is at or below the last committed round, return early.
470        if commit_round <= self.dag.read().last_committed_round() {
471            return Ok(());
472        }
473
474        /* Proceeding to check if the leader is ready to be committed. */
475        trace!("Checking if the leader is ready to be committed for round {commit_round}...");
476
477        // Retrieve the committee lookback for the commit round.
478        let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
479            bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
480        };
481
482        // Either retrieve the cached leader or compute it.
483        let leader = match self.ledger().latest_leader() {
484            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
485            _ => {
486                // Compute the leader for the commit round.
487                let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
488                    bail!("BFT failed to compute the leader for commit round {commit_round}");
489                };
490
491                // Cache the computed leader.
492                self.ledger().update_latest_leader(commit_round, computed_leader);
493
494                computed_leader
495            }
496        };
497
498        // Retrieve the leader certificate for the commit round.
499        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
500        else {
501            trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
502            return Ok(());
503        };
504        // Retrieve all of the certificates for the **certificate** round.
505        let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
506            // TODO (howardwu): Investigate how many certificates we should have at this point.
507            bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
508        };
509        // Construct a set over the authors who included the leader's certificate in the certificate round.
510        let authors = certificates
511            .values()
512            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
513                true => Some(c.author()),
514                false => None,
515            })
516            .collect();
517        // Check if the leader is ready to be committed.
518        if !committee_lookback.is_availability_threshold_reached(&authors) {
519            // If the leader is not ready to be committed, return early.
520            trace!("BFT is not ready to commit {commit_round}");
521            return Ok(());
522        }
523
524        /* Proceeding to commit the leader. */
525        info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
526
527        // Commit the leader certificate, and all previous leader certificates since the last committed round.
528        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
529    }
530
531    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
532    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
533        &self,
534        leader_certificate: BatchCertificate<N>,
535    ) -> Result<()> {
536        // Fetch the leader round.
537        let latest_leader_round = leader_certificate.round();
538        // Determine the list of all previous leader certificates since the last committed round.
539        // The order of the leader certificates is from **newest** to **oldest**.
540        let mut leader_certificates = vec![leader_certificate.clone()];
541        {
542            // Retrieve the leader round.
543            let leader_round = leader_certificate.round();
544
545            let mut current_certificate = leader_certificate;
546            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
547            {
548                // Retrieve the previous committee for the leader round.
549                let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
550                    Ok(committee) => committee,
551                    Err(e) => {
552                        bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
553                    }
554                };
555                // Either retrieve the cached leader or compute it.
556                let leader = match self.ledger().latest_leader() {
557                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
558                    _ => {
559                        // Compute the leader for the commit round.
560                        let computed_leader = match previous_committee_lookback.get_leader(round) {
561                            Ok(leader) => leader,
562                            Err(e) => {
563                                bail!("BFT failed to compute the leader for the even round {round} - {e}");
564                            }
565                        };
566
567                        // Cache the computed leader.
568                        self.ledger().update_latest_leader(round, computed_leader);
569
570                        computed_leader
571                    }
572                };
573                // Retrieve the previous leader certificate.
574                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
575                else {
576                    continue;
577                };
578                // Determine if there is a path between the previous certificate and the current certificate.
579                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
580                    // Add the previous leader certificate to the list of certificates to commit.
581                    leader_certificates.push(previous_certificate.clone());
582                    // Update the current certificate to the previous leader certificate.
583                    current_certificate = previous_certificate;
584                }
585            }
586        }
587
588        // Iterate over the leader certificates to commit.
589        for leader_certificate in leader_certificates.into_iter().rev() {
590            // Retrieve the leader certificate round.
591            let leader_round = leader_certificate.round();
592            // Compute the commit subdag.
593            let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
594                Ok(subdag) => subdag,
595                Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
596            };
597            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
598            if !IS_SYNCING {
599                // Initialize a map for the deduped transmissions.
600                let mut transmissions = IndexMap::new();
601                // Initialize a map for the deduped transaction ids.
602                let mut seen_transaction_ids = IndexSet::new();
603                // Initialize a map for the deduped solution ids.
604                let mut seen_solution_ids = IndexSet::new();
605                // Start from the oldest leader certificate.
606                for certificate in commit_subdag.values().flatten() {
607                    // Retrieve the transmissions.
608                    for transmission_id in certificate.transmission_ids() {
609                        // If the transaction ID or solution ID already exists in the map, skip it.
610                        // Note: This additional check is done to ensure that we do not include duplicate
611                        // transaction IDs or solution IDs that may have a different transmission ID.
612                        match transmission_id {
613                            TransmissionID::Solution(solution_id, _) => {
614                                // If the solution already exists, skip it.
615                                if seen_solution_ids.contains(&solution_id) {
616                                    continue;
617                                }
618                            }
619                            TransmissionID::Transaction(transaction_id, _) => {
620                                // If the transaction already exists, skip it.
621                                if seen_transaction_ids.contains(transaction_id) {
622                                    continue;
623                                }
624                            }
625                            TransmissionID::Ratification => {
626                                bail!("Ratifications are currently not supported in the BFT.")
627                            }
628                        }
629                        // If the transmission already exists in the map, skip it.
630                        if transmissions.contains_key(transmission_id) {
631                            continue;
632                        }
633                        // If the transmission already exists in the ledger, skip it.
634                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
635                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
636                            continue;
637                        }
638                        // Retrieve the transmission.
639                        let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
640                            bail!(
641                                "BFT failed to retrieve transmission '{}.{}' from round {}",
642                                fmt_id(transmission_id),
643                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
644                                certificate.round()
645                            );
646                        };
647                        // Insert the transaction ID or solution ID into the map.
648                        match transmission_id {
649                            TransmissionID::Solution(id, _) => {
650                                seen_solution_ids.insert(id);
651                            }
652                            TransmissionID::Transaction(id, _) => {
653                                seen_transaction_ids.insert(id);
654                            }
655                            TransmissionID::Ratification => {}
656                        }
657                        // Add the transmission to the set.
658                        transmissions.insert(*transmission_id, transmission);
659                    }
660                }
661                // Trigger consensus, as this will build a new block for the ledger.
662                // Construct the subdag.
663                let subdag = Subdag::from(commit_subdag.clone())?;
664                // Retrieve the anchor round.
665                let anchor_round = subdag.anchor_round();
666                // Retrieve the number of transmissions.
667                let num_transmissions = transmissions.len();
668                // Retrieve metadata about the subdag.
669                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
670
671                // Ensure the subdag anchor round matches the leader round.
672                ensure!(
673                    anchor_round == leader_round,
674                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
675                );
676
677                // Trigger consensus.
678                if let Some(consensus_sender) = self.consensus_sender.get() {
679                    // Initialize a callback sender and receiver.
680                    let (callback_sender, callback_receiver) = oneshot::channel();
681                    // Send the subdag and transmissions to consensus.
682                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
683                    // Await the callback to continue.
684                    match callback_receiver.await {
685                        Ok(Ok(())) => (), // continue
686                        Ok(Err(e)) => {
687                            error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
688                            return Ok(());
689                        }
690                        Err(e) => {
691                            error!("BFT failed to receive the callback for round {anchor_round} - {e}");
692                            return Ok(());
693                        }
694                    }
695                }
696
697                info!(
698                    "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
699                );
700            }
701
702            // Update the DAG, as the subdag was successfully included into a block.
703            let mut dag_write = self.dag.write();
704            for certificate in commit_subdag.values().flatten() {
705                dag_write.commit(certificate, self.storage().max_gc_rounds());
706            }
707        }
708
709        // Perform garbage collection based on the latest committed leader round.
710        self.storage().garbage_collect_certificates(latest_leader_round);
711
712        Ok(())
713    }
714
715    /// Returns the subdag of batch certificates to commit.
716    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
717        &self,
718        leader_certificate: BatchCertificate<N>,
719    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
720        // Initialize a map for the certificates to commit.
721        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
722        // Initialize a set for the already ordered certificates.
723        let mut already_ordered = HashSet::new();
724        // Initialize a buffer for the certificates to order.
725        let mut buffer = vec![leader_certificate];
726        // Iterate over the certificates to order.
727        while let Some(certificate) = buffer.pop() {
728            // Insert the certificate into the map.
729            commit.entry(certificate.round()).or_default().insert(certificate.clone());
730
731            // Check if the previous certificate is below the GC round.
732            let previous_round = certificate.round().saturating_sub(1);
733            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
734                continue;
735            }
736            // Iterate over the previous certificate IDs.
737            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
738            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
739            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
740                // If the previous certificate is already ordered, continue.
741                if already_ordered.contains(previous_certificate_id) {
742                    continue;
743                }
744                // If the previous certificate was recently committed, continue.
745                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
746                    continue;
747                }
748                // If the previous certificate already exists in the ledger, continue.
749                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
750                    continue;
751                }
752
753                // Retrieve the previous certificate.
754                let previous_certificate = {
755                    // Start by retrieving the previous certificate from the DAG.
756                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
757                        // If the previous certificate is found, return it.
758                        Some(previous_certificate) => previous_certificate,
759                        // If the previous certificate is not found, retrieve it from the storage.
760                        None => match self.storage().get_certificate(*previous_certificate_id) {
761                            // If the previous certificate is found, return it.
762                            Some(previous_certificate) => previous_certificate,
763                            // Otherwise, the previous certificate is missing, and throw an error.
764                            None => bail!(
765                                "Missing previous certificate {} for round {previous_round}",
766                                fmt_id(previous_certificate_id)
767                            ),
768                        },
769                    }
770                };
771                // Insert the previous certificate into the set of already ordered certificates.
772                already_ordered.insert(previous_certificate.id());
773                // Insert the previous certificate into the buffer.
774                buffer.push(previous_certificate);
775            }
776        }
777        // Ensure we only retain certificates that are above the GC round.
778        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
779        // Return the certificates to commit.
780        Ok(commit)
781    }
782
783    /// Returns `true` if there is a path from the previous certificate to the current certificate.
784    fn is_linked(
785        &self,
786        previous_certificate: BatchCertificate<N>,
787        current_certificate: BatchCertificate<N>,
788    ) -> Result<bool> {
789        // Initialize the list containing the traversal.
790        let mut traversal = vec![current_certificate.clone()];
791        // Iterate over the rounds from the current certificate to the previous certificate.
792        for round in (previous_certificate.round()..current_certificate.round()).rev() {
793            // Retrieve all of the certificates for this past round.
794            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
795                // This is a critical error, as the traversal should have these certificates.
796                // If this error is hit, it is likely that the maximum GC rounds should be increased.
797                bail!("BFT failed to retrieve the certificates for past round {round}");
798            };
799            // Filter the certificates to only include those that are in the traversal.
800            traversal = certificates
801                .into_values()
802                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
803                .collect();
804        }
805        Ok(traversal.contains(&previous_certificate))
806    }
807}
808
809impl<N: Network> BFT<N> {
810    /// Starts the BFT handlers.
811    fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
812        let BFTReceiver {
813            mut rx_primary_round,
814            mut rx_primary_certificate,
815            mut rx_sync_bft_dag_at_bootup,
816            mut rx_sync_bft,
817        } = bft_receiver;
818
819        // Process the current round from the primary.
820        let self_ = self.clone();
821        self.spawn(async move {
822            while let Some((current_round, callback)) = rx_primary_round.recv().await {
823                callback.send(self_.update_to_next_round(current_round)).ok();
824            }
825        });
826
827        // Process the certificate from the primary.
828        let self_ = self.clone();
829        self.spawn(async move {
830            while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
831                // Update the DAG with the certificate.
832                let result = self_.update_dag::<true, false>(certificate).await;
833                // Send the callback **after** updating the DAG.
834                // Note: We must await the DAG update before proceeding.
835                callback.send(result).ok();
836            }
837        });
838
839        // Process the request to sync the BFT DAG at bootup.
840        let self_ = self.clone();
841        self.spawn(async move {
842            while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
843                self_.sync_bft_dag_at_bootup(certificates).await;
844            }
845        });
846
847        // Process the request to sync the BFT.
848        let self_ = self.clone();
849        self.spawn(async move {
850            while let Some((certificate, callback)) = rx_sync_bft.recv().await {
851                // Update the DAG with the certificate.
852                let result = self_.update_dag::<true, true>(certificate).await;
853                // Send the callback **after** updating the DAG.
854                // Note: We must await the DAG update before proceeding.
855                callback.send(result).ok();
856            }
857        });
858    }
859
860    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
861    /// already exist in the ledger.
862    ///
863    /// This method commits all the certificates into the DAG.
864    /// Note that there is no need to insert the certificates into the DAG, because these certificates
865    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
866    async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
867        // Acquire the BFT write lock.
868        let mut dag = self.dag.write();
869
870        // Commit all the certificates.
871        for certificate in certificates {
872            dag.commit(&certificate, self.storage().max_gc_rounds());
873        }
874    }
875
876    /// Spawns a task with the given future; it should only be used for long-running tasks.
877    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
878        self.handles.lock().push(tokio::spawn(future));
879    }
880
881    /// Shuts down the BFT.
882    pub async fn shut_down(&self) {
883        info!("Shutting down the BFT...");
884        // Acquire the lock.
885        let _lock = self.lock.lock().await;
886        // Shut down the primary.
887        self.primary.shut_down().await;
888        // Abort the tasks.
889        self.handles.lock().iter().for_each(|handle| handle.abort());
890    }
891}
892
893#[cfg(test)]
894mod tests {
895    use crate::{BFT, MAX_LEADER_CERTIFICATE_DELAY_IN_SECS, helpers::Storage};
896    use snarkos_account::Account;
897    use snarkos_node_bft_ledger_service::MockLedgerService;
898    use snarkos_node_bft_storage_service::BFTMemoryService;
899    use snarkvm::{
900        console::account::{Address, PrivateKey},
901        ledger::{
902            committee::Committee,
903            narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
904        },
905        utilities::TestRng,
906    };
907
908    use anyhow::Result;
909    use indexmap::{IndexMap, IndexSet};
910    use std::sync::Arc;
911
912    type CurrentNetwork = snarkvm::console::network::MainnetV0;
913
914    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
915    fn sample_test_instance(
916        committee_round: Option<u64>,
917        max_gc_rounds: u64,
918        rng: &mut TestRng,
919    ) -> (
920        Committee<CurrentNetwork>,
921        Account<CurrentNetwork>,
922        Arc<MockLedgerService<CurrentNetwork>>,
923        Storage<CurrentNetwork>,
924    ) {
925        let committee = match committee_round {
926            Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
927            None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
928        };
929        let account = Account::new(rng).unwrap();
930        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
931        let transmissions = Arc::new(BFTMemoryService::new());
932        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
933
934        (committee, account, ledger, storage)
935    }
936
937    #[test]
938    #[tracing_test::traced_test]
939    fn test_is_leader_quorum_odd() -> Result<()> {
940        let rng = &mut TestRng::default();
941
942        // Sample batch certificates.
943        let mut certificates = IndexSet::new();
944        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
945        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
946        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
947        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
948
949        // Initialize the committee.
950        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
951            1,
952            vec![
953                certificates[0].author(),
954                certificates[1].author(),
955                certificates[2].author(),
956                certificates[3].author(),
957            ],
958            rng,
959        );
960
961        // Initialize the ledger.
962        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
963        // Initialize the storage.
964        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
965        // Initialize the account.
966        let account = Account::new(rng)?;
967        // Initialize the BFT.
968        let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
969        assert!(bft.is_timer_expired());
970        // Ensure this call succeeds on an odd round.
971        let result = bft.is_leader_quorum_or_nonleaders_available(1);
972        // If timer has expired but quorum threshold is not reached, return 'false'.
973        assert!(!result);
974        // Insert certificates into storage.
975        for certificate in certificates.iter() {
976            storage.testing_only_insert_certificate_testing_only(certificate.clone());
977        }
978        // Ensure this call succeeds on an odd round.
979        let result = bft.is_leader_quorum_or_nonleaders_available(1);
980        assert!(result); // no previous leader certificate
981        // Set the leader certificate.
982        let leader_certificate = sample_batch_certificate(rng);
983        *bft.leader_certificate.write() = Some(leader_certificate);
984        // Ensure this call succeeds on an odd round.
985        let result = bft.is_leader_quorum_or_nonleaders_available(1);
986        assert!(result); // should now fall through to the end of function
987
988        Ok(())
989    }
990
991    #[test]
992    #[tracing_test::traced_test]
993    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
994        let rng = &mut TestRng::default();
995
996        // Sample the test instance.
997        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
998        assert_eq!(committee.starting_round(), 1);
999        assert_eq!(storage.current_round(), 1);
1000        assert_eq!(storage.max_gc_rounds(), 10);
1001
1002        // Initialize the BFT.
1003        let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1004        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1005
1006        // Store is at round 1, and we are checking for round 2.
1007        // Ensure this call fails on an even round.
1008        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1009        assert!(!result);
1010        Ok(())
1011    }
1012
1013    #[test]
1014    #[tracing_test::traced_test]
1015    fn test_is_leader_quorum_even() -> Result<()> {
1016        let rng = &mut TestRng::default();
1017
1018        // Sample the test instance.
1019        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1020        assert_eq!(committee.starting_round(), 2);
1021        assert_eq!(storage.current_round(), 2);
1022        assert_eq!(storage.max_gc_rounds(), 10);
1023
1024        // Initialize the BFT.
1025        let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1026        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1027
1028        // Ensure this call fails on an even round.
1029        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1030        assert!(!result);
1031        Ok(())
1032    }
1033
1034    #[test]
1035    #[tracing_test::traced_test]
1036    fn test_is_even_round_ready() -> Result<()> {
1037        let rng = &mut TestRng::default();
1038
1039        // Sample batch certificates.
1040        let mut certificates = IndexSet::new();
1041        certificates.insert(sample_batch_certificate_for_round(2, rng));
1042        certificates.insert(sample_batch_certificate_for_round(2, rng));
1043        certificates.insert(sample_batch_certificate_for_round(2, rng));
1044        certificates.insert(sample_batch_certificate_for_round(2, rng));
1045
1046        // Initialize the committee.
1047        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1048            2,
1049            vec![
1050                certificates[0].author(),
1051                certificates[1].author(),
1052                certificates[2].author(),
1053                certificates[3].author(),
1054            ],
1055            rng,
1056        );
1057
1058        // Initialize the ledger.
1059        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1060        // Initialize the storage.
1061        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1062        // Initialize the account.
1063        let account = Account::new(rng)?;
1064        // Initialize the BFT.
1065        let bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
1066        // Set the leader certificate.
1067        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1068        *bft.leader_certificate.write() = Some(leader_certificate);
1069        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1070        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1071        assert!(!result);
1072        // Once quorum threshold is reached, we are ready for the next round.
1073        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1074        assert!(result);
1075
1076        // Initialize a new BFT.
1077        let bft_timer = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
1078        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1079        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1080        if !bft_timer.is_timer_expired() {
1081            assert!(!result);
1082        }
1083        // Wait for the timer to expire.
1084        let leader_certificate_timeout =
1085            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1086        std::thread::sleep(leader_certificate_timeout);
1087        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1088        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1089        if bft_timer.is_timer_expired() {
1090            assert!(result);
1091        } else {
1092            assert!(!result);
1093        }
1094
1095        Ok(())
1096    }
1097
1098    #[test]
1099    #[tracing_test::traced_test]
1100    fn test_update_leader_certificate_odd() -> Result<()> {
1101        let rng = &mut TestRng::default();
1102
1103        // Sample the test instance.
1104        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1105        assert_eq!(storage.max_gc_rounds(), 10);
1106
1107        // Initialize the BFT.
1108        let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1109
1110        // Ensure this call fails on an odd round.
1111        let result = bft.update_leader_certificate_to_even_round(1);
1112        assert!(!result);
1113        Ok(())
1114    }
1115
1116    #[test]
1117    #[tracing_test::traced_test]
1118    fn test_update_leader_certificate_bad_round() -> Result<()> {
1119        let rng = &mut TestRng::default();
1120
1121        // Sample the test instance.
1122        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1123        assert_eq!(storage.max_gc_rounds(), 10);
1124
1125        // Initialize the BFT.
1126        let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1127
1128        // Ensure this call succeeds on an even round.
1129        let result = bft.update_leader_certificate_to_even_round(6);
1130        assert!(!result);
1131        Ok(())
1132    }
1133
1134    #[test]
1135    #[tracing_test::traced_test]
1136    fn test_update_leader_certificate_even() -> Result<()> {
1137        let rng = &mut TestRng::default();
1138
1139        // Set the current round.
1140        let current_round = 3;
1141
1142        // Sample the certificates.
1143        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1144            current_round,
1145            rng,
1146        );
1147
1148        // Initialize the committee.
1149        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1150            2,
1151            vec![
1152                certificates[0].author(),
1153                certificates[1].author(),
1154                certificates[2].author(),
1155                certificates[3].author(),
1156            ],
1157            rng,
1158        );
1159
1160        // Initialize the ledger.
1161        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1162
1163        // Initialize the storage.
1164        let transmissions = Arc::new(BFTMemoryService::new());
1165        let storage = Storage::new(ledger.clone(), transmissions, 10);
1166        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1167        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1168        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1169        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1170        assert_eq!(storage.current_round(), 2);
1171
1172        // Retrieve the leader certificate.
1173        let leader = committee.get_leader(2).unwrap();
1174        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1175
1176        // Initialize the BFT.
1177        let account = Account::new(rng)?;
1178        let bft = BFT::new(account, storage.clone(), ledger, None, &[], None)?;
1179
1180        // Set the leader certificate.
1181        *bft.leader_certificate.write() = Some(leader_certificate);
1182
1183        // Update the leader certificate.
1184        // Ensure this call succeeds on an even round.
1185        let result = bft.update_leader_certificate_to_even_round(2);
1186        assert!(result);
1187
1188        Ok(())
1189    }
1190
1191    #[tokio::test]
1192    #[tracing_test::traced_test]
1193    async fn test_order_dag_with_dfs() -> Result<()> {
1194        let rng = &mut TestRng::default();
1195
1196        // Sample the test instance.
1197        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1198
1199        // Initialize the round parameters.
1200        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1201        let current_round = previous_round + 1;
1202
1203        // Sample the current certificate and previous certificates.
1204        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1205            current_round,
1206            rng,
1207        );
1208
1209        /* Test GC */
1210
1211        // Ensure the function succeeds in returning only certificates above GC.
1212        {
1213            // Initialize the storage.
1214            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1215            // Initialize the BFT.
1216            let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], None)?;
1217
1218            // Insert a mock DAG in the BFT.
1219            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1220
1221            // Insert the previous certificates into the BFT.
1222            for certificate in previous_certificates.clone() {
1223                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1224            }
1225
1226            // Ensure this call succeeds and returns all given certificates.
1227            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1228            assert!(result.is_ok());
1229            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1230            assert_eq!(candidate_certificates.len(), 1);
1231            let expected_certificates = vec![certificate.clone()];
1232            assert_eq!(
1233                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1234                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1235            );
1236            assert_eq!(candidate_certificates, expected_certificates);
1237        }
1238
1239        /* Test normal case */
1240
1241        // Ensure the function succeeds in returning all given certificates.
1242        {
1243            // Initialize the storage.
1244            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1245            // Initialize the BFT.
1246            let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1247
1248            // Insert a mock DAG in the BFT.
1249            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1250
1251            // Insert the previous certificates into the BFT.
1252            for certificate in previous_certificates.clone() {
1253                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1254            }
1255
1256            // Ensure this call succeeds and returns all given certificates.
1257            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1258            assert!(result.is_ok());
1259            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1260            assert_eq!(candidate_certificates.len(), 5);
1261            let expected_certificates = vec![
1262                previous_certificates[0].clone(),
1263                previous_certificates[1].clone(),
1264                previous_certificates[2].clone(),
1265                previous_certificates[3].clone(),
1266                certificate,
1267            ];
1268            assert_eq!(
1269                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1270                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1271            );
1272            assert_eq!(candidate_certificates, expected_certificates);
1273        }
1274
1275        Ok(())
1276    }
1277
1278    #[test]
1279    #[tracing_test::traced_test]
1280    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1281        let rng = &mut TestRng::default();
1282
1283        // Sample the test instance.
1284        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1285        assert_eq!(committee.starting_round(), 1);
1286        assert_eq!(storage.current_round(), 1);
1287        assert_eq!(storage.max_gc_rounds(), 1);
1288
1289        // Initialize the round parameters.
1290        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1291        let current_round = previous_round + 1;
1292
1293        // Sample the current certificate and previous certificates.
1294        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1295            current_round,
1296            rng,
1297        );
1298        // Construct the previous certificate IDs.
1299        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1300
1301        /* Test missing previous certificate. */
1302
1303        // Initialize the BFT.
1304        let bft = BFT::new(account, storage, ledger, None, &[], None)?;
1305
1306        // The expected error message.
1307        let error_msg = format!(
1308            "Missing previous certificate {} for round {previous_round}",
1309            crate::helpers::fmt_id(previous_certificate_ids[3]),
1310        );
1311
1312        // Ensure this call fails on a missing previous certificate.
1313        let result = bft.order_dag_with_dfs::<false>(certificate);
1314        assert!(result.is_err());
1315        assert_eq!(result.unwrap_err().to_string(), error_msg);
1316        Ok(())
1317    }
1318
1319    #[tokio::test]
1320    #[tracing_test::traced_test]
1321    async fn test_bft_gc_on_commit() -> Result<()> {
1322        let rng = &mut TestRng::default();
1323
1324        // Initialize the round parameters.
1325        let max_gc_rounds = 1;
1326        let committee_round = 0;
1327        let commit_round = 2;
1328        let current_round = commit_round + 1;
1329
1330        // Sample the certificates.
1331        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1332            current_round,
1333            rng,
1334        );
1335
1336        // Initialize the committee.
1337        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1338            committee_round,
1339            vec![
1340                certificates[0].author(),
1341                certificates[1].author(),
1342                certificates[2].author(),
1343                certificates[3].author(),
1344            ],
1345            rng,
1346        );
1347
1348        // Initialize the ledger.
1349        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1350
1351        // Initialize the storage.
1352        let transmissions = Arc::new(BFTMemoryService::new());
1353        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1354        // Insert the certificates into the storage.
1355        for certificate in certificates.iter() {
1356            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1357        }
1358
1359        // Get the leader certificate.
1360        let leader = committee.get_leader(commit_round).unwrap();
1361        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1362
1363        // Initialize the BFT.
1364        let account = Account::new(rng)?;
1365        let bft = BFT::new(account, storage.clone(), ledger, None, &[], None)?;
1366        // Insert a mock DAG in the BFT.
1367        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1368
1369        // Ensure that the `gc_round` has not been updated yet.
1370        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1371
1372        // Insert the certificates into the BFT.
1373        for certificate in certificates {
1374            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1375        }
1376
1377        // Commit the leader certificate.
1378        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1379
1380        // Ensure that the `gc_round` has been updated.
1381        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1382
1383        Ok(())
1384    }
1385
1386    #[tokio::test]
1387    #[tracing_test::traced_test]
1388    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1389        let rng = &mut TestRng::default();
1390
1391        // Initialize the round parameters.
1392        let max_gc_rounds = 1;
1393        let committee_round = 0;
1394        let commit_round = 2;
1395        let current_round = commit_round + 1;
1396
1397        // Sample the current certificate and previous certificates.
1398        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1399            current_round,
1400            rng,
1401        );
1402
1403        // Initialize the committee.
1404        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1405            committee_round,
1406            vec![
1407                certificates[0].author(),
1408                certificates[1].author(),
1409                certificates[2].author(),
1410                certificates[3].author(),
1411            ],
1412            rng,
1413        );
1414
1415        // Initialize the ledger.
1416        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1417
1418        // Initialize the storage.
1419        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1420        // Insert the certificates into the storage.
1421        for certificate in certificates.iter() {
1422            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1423        }
1424
1425        // Get the leader certificate.
1426        let leader = committee.get_leader(commit_round).unwrap();
1427        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1428
1429        // Initialize the BFT.
1430        let account = Account::new(rng)?;
1431        let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], None)?;
1432
1433        // Insert a mock DAG in the BFT.
1434        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1435
1436        // Insert the previous certificates into the BFT.
1437        for certificate in certificates.clone() {
1438            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1439        }
1440
1441        // Commit the leader certificate.
1442        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1443
1444        // Simulate a bootup of the BFT.
1445
1446        // Initialize a new instance of storage.
1447        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1448        // Initialize a new instance of BFT.
1449        let bootup_bft = BFT::new(account, storage_2, ledger, None, &[], None)?;
1450
1451        // Sync the BFT DAG at bootup.
1452        bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1453
1454        // Check that the BFT starts from the same last committed round.
1455        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1456
1457        // Ensure that both BFTs have committed the leader certificate.
1458        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1459        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1460
1461        // Check the state of the bootup BFT.
1462        for certificate in certificates {
1463            let certificate_round = certificate.round();
1464            let certificate_id = certificate.id();
1465            // Check that the bootup BFT has committed the certificates.
1466            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1467            // Check that the bootup BFT does not contain the certificates in its graph, because
1468            // it should not need to order them again in subsequent subdags.
1469            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1470        }
1471
1472        Ok(())
1473    }
1474
1475    #[tokio::test]
1476    #[tracing_test::traced_test]
1477    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1478        /*
1479        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1480        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1481        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1482        */
1483
1484        let rng = &mut TestRng::default();
1485
1486        // Initialize the round parameters.
1487        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1488        let committee_round = 0;
1489        let commit_round = 2;
1490        let current_round = commit_round + 1;
1491        let next_round = current_round + 1;
1492
1493        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1494        let (round_to_certificates_map, committee) = {
1495            let private_keys = vec![
1496                PrivateKey::new(rng).unwrap(),
1497                PrivateKey::new(rng).unwrap(),
1498                PrivateKey::new(rng).unwrap(),
1499                PrivateKey::new(rng).unwrap(),
1500            ];
1501            let addresses = vec![
1502                Address::try_from(private_keys[0])?,
1503                Address::try_from(private_keys[1])?,
1504                Address::try_from(private_keys[2])?,
1505                Address::try_from(private_keys[3])?,
1506            ];
1507            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1508                committee_round,
1509                addresses,
1510                rng,
1511            );
1512            // Initialize a mapping from the round number to the set of batch certificates in the round.
1513            let mut round_to_certificates_map: IndexMap<
1514                u64,
1515                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1516            > = IndexMap::new();
1517            let mut previous_certificates = IndexSet::with_capacity(4);
1518            // Initialize the genesis batch certificates.
1519            for _ in 0..4 {
1520                previous_certificates.insert(sample_batch_certificate(rng));
1521            }
1522            for round in 0..commit_round + 3 {
1523                let mut current_certificates = IndexSet::new();
1524                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1525                    IndexSet::new()
1526                } else {
1527                    previous_certificates.iter().map(|c| c.id()).collect()
1528                };
1529                let transmission_ids =
1530                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1531                        .into_iter()
1532                        .collect::<IndexSet<_>>();
1533                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1534                let committee_id = committee.id();
1535                for (i, private_key_1) in private_keys.iter().enumerate() {
1536                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1537                        private_key_1,
1538                        round,
1539                        timestamp,
1540                        committee_id,
1541                        transmission_ids.clone(),
1542                        previous_certificate_ids.clone(),
1543                        rng,
1544                    )
1545                    .unwrap();
1546                    let mut signatures = IndexSet::with_capacity(4);
1547                    for (j, private_key_2) in private_keys.iter().enumerate() {
1548                        if i != j {
1549                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1550                        }
1551                    }
1552                    let certificate =
1553                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1554                    current_certificates.insert(certificate);
1555                }
1556                // Update the mapping.
1557                round_to_certificates_map.insert(round, current_certificates.clone());
1558                previous_certificates = current_certificates.clone();
1559            }
1560            (round_to_certificates_map, committee)
1561        };
1562
1563        // Initialize the ledger.
1564        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1565        // Initialize the storage.
1566        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1567        // Get the leaders for the next 2 commit rounds.
1568        let leader = committee.get_leader(commit_round).unwrap();
1569        let next_leader = committee.get_leader(next_round).unwrap();
1570        // Insert the pre shutdown certificates into the storage.
1571        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1572        for i in 1..=commit_round {
1573            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1574            if i == commit_round {
1575                // Only insert the leader certificate for the commit round.
1576                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1577                if let Some(c) = leader_certificate {
1578                    pre_shutdown_certificates.push(c.clone());
1579                }
1580                continue;
1581            }
1582            pre_shutdown_certificates.extend(certificates);
1583        }
1584        for certificate in pre_shutdown_certificates.iter() {
1585            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1586        }
1587        // Insert the post shutdown certificates into the storage.
1588        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1589            Vec::new();
1590        for j in commit_round..=commit_round + 2 {
1591            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1592            post_shutdown_certificates.extend(certificate);
1593        }
1594        for certificate in post_shutdown_certificates.iter() {
1595            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1596        }
1597        // Get the leader certificates.
1598        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1599        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1600
1601        // Initialize the BFT without bootup.
1602        let account = Account::new(rng)?;
1603        let bft = BFT::new(account.clone(), storage, ledger.clone(), None, &[], None)?;
1604
1605        // Insert a mock DAG in the BFT without bootup.
1606        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1607
1608        // Insert the certificates into the BFT without bootup.
1609        for certificate in pre_shutdown_certificates.clone() {
1610            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1611        }
1612
1613        // Insert the post shutdown certificates into the BFT without bootup.
1614        for certificate in post_shutdown_certificates.clone() {
1615            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1616        }
1617        // Commit the second leader certificate.
1618        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1619        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1620        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1621
1622        // Simulate a bootup of the BFT.
1623
1624        // Initialize a new instance of storage.
1625        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1626
1627        // Initialize a new instance of BFT with bootup.
1628        let bootup_bft = BFT::new(account, bootup_storage.clone(), ledger.clone(), None, &[], None)?;
1629
1630        // Sync the BFT DAG at bootup.
1631        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1632
1633        // Insert the post shutdown certificates to the storage and BFT with bootup.
1634        for certificate in post_shutdown_certificates.iter() {
1635            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1636        }
1637        for certificate in post_shutdown_certificates.clone() {
1638            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1639        }
1640        // Commit the second leader certificate.
1641        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1642        let commit_subdag_metadata_bootup =
1643            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1644        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1645        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1646
1647        // Check that the final state of both BFTs is the same.
1648
1649        // Check that both BFTs start from the same last committed round.
1650        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1651
1652        // Ensure that both BFTs have committed the leader certificates.
1653        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1654        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1655        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1656        assert!(
1657            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1658        );
1659
1660        // Check that the bootup BFT has committed the pre shutdown certificates.
1661        for certificate in pre_shutdown_certificates.clone() {
1662            let certificate_round = certificate.round();
1663            let certificate_id = certificate.id();
1664            // Check that both BFTs have committed the certificates.
1665            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1666            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1667            // Check that the bootup BFT does not contain the certificates in its graph, because
1668            // it should not need to order them again in subsequent subdags.
1669            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1670            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1671        }
1672
1673        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1674        for certificate in committed_certificates_bootup.clone() {
1675            let certificate_round = certificate.round();
1676            let certificate_id = certificate.id();
1677            // Check that the both BFTs have committed the certificates.
1678            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1679            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1680            // Check that the bootup BFT does not contain the certificates in its graph, because
1681            // it should not need to order them again in subsequent subdags.
1682            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1683            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1684        }
1685
1686        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1687        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1688
1689        Ok(())
1690    }
1691
1692    #[tokio::test]
1693    #[tracing_test::traced_test]
1694    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1695        /*
1696        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1697        2. Add post shutdown certificates to the bootup BFT.
1698        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1699        */
1700
1701        let rng = &mut TestRng::default();
1702
1703        // Initialize the round parameters.
1704        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1705        let committee_round = 0;
1706        let commit_round = 2;
1707        let current_round = commit_round + 1;
1708        let next_round = current_round + 1;
1709
1710        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1711        let (round_to_certificates_map, committee) = {
1712            let private_keys = vec![
1713                PrivateKey::new(rng).unwrap(),
1714                PrivateKey::new(rng).unwrap(),
1715                PrivateKey::new(rng).unwrap(),
1716                PrivateKey::new(rng).unwrap(),
1717            ];
1718            let addresses = vec![
1719                Address::try_from(private_keys[0])?,
1720                Address::try_from(private_keys[1])?,
1721                Address::try_from(private_keys[2])?,
1722                Address::try_from(private_keys[3])?,
1723            ];
1724            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1725                committee_round,
1726                addresses,
1727                rng,
1728            );
1729            // Initialize a mapping from the round number to the set of batch certificates in the round.
1730            let mut round_to_certificates_map: IndexMap<
1731                u64,
1732                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1733            > = IndexMap::new();
1734            let mut previous_certificates = IndexSet::with_capacity(4);
1735            // Initialize the genesis batch certificates.
1736            for _ in 0..4 {
1737                previous_certificates.insert(sample_batch_certificate(rng));
1738            }
1739            for round in 0..=commit_round + 2 {
1740                let mut current_certificates = IndexSet::new();
1741                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1742                    IndexSet::new()
1743                } else {
1744                    previous_certificates.iter().map(|c| c.id()).collect()
1745                };
1746                let transmission_ids =
1747                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1748                        .into_iter()
1749                        .collect::<IndexSet<_>>();
1750                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1751                let committee_id = committee.id();
1752                for (i, private_key_1) in private_keys.iter().enumerate() {
1753                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1754                        private_key_1,
1755                        round,
1756                        timestamp,
1757                        committee_id,
1758                        transmission_ids.clone(),
1759                        previous_certificate_ids.clone(),
1760                        rng,
1761                    )
1762                    .unwrap();
1763                    let mut signatures = IndexSet::with_capacity(4);
1764                    for (j, private_key_2) in private_keys.iter().enumerate() {
1765                        if i != j {
1766                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1767                        }
1768                    }
1769                    let certificate =
1770                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1771                    current_certificates.insert(certificate);
1772                }
1773                // Update the mapping.
1774                round_to_certificates_map.insert(round, current_certificates.clone());
1775                previous_certificates = current_certificates.clone();
1776            }
1777            (round_to_certificates_map, committee)
1778        };
1779
1780        // Initialize the ledger.
1781        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1782        // Initialize the storage.
1783        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1784        // Get the leaders for the next 2 commit rounds.
1785        let leader = committee.get_leader(commit_round).unwrap();
1786        let next_leader = committee.get_leader(next_round).unwrap();
1787        // Insert the pre shutdown certificates into the storage.
1788        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1789        for i in 1..=commit_round {
1790            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1791            if i == commit_round {
1792                // Only insert the leader certificate for the commit round.
1793                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1794                if let Some(c) = leader_certificate {
1795                    pre_shutdown_certificates.push(c.clone());
1796                }
1797                continue;
1798            }
1799            pre_shutdown_certificates.extend(certificates);
1800        }
1801        for certificate in pre_shutdown_certificates.iter() {
1802            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1803        }
1804        // Initialize the bootup BFT.
1805        let account = Account::new(rng)?;
1806        let bootup_bft = BFT::new(account.clone(), storage.clone(), ledger.clone(), None, &[], None)?;
1807        // Insert a mock DAG in the BFT without bootup.
1808        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1809        // Sync the BFT DAG at bootup.
1810        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1811
1812        // Insert the post shutdown certificates into the storage.
1813        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1814            Vec::new();
1815        for j in commit_round..=commit_round + 2 {
1816            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1817            post_shutdown_certificates.extend(certificate);
1818        }
1819        for certificate in post_shutdown_certificates.iter() {
1820            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1821        }
1822
1823        // Insert the post shutdown certificates into the DAG.
1824        for certificate in post_shutdown_certificates.clone() {
1825            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1826        }
1827
1828        // Get the next leader certificate to commit.
1829        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1830        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1831        let committed_certificates = commit_subdag.values().flatten();
1832
1833        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1834        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1835            for committed_certificate in committed_certificates.clone() {
1836                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1837            }
1838        }
1839        Ok(())
1840    }
1841}