amareleo_node_bft/
bft.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    Primary,
19    helpers::{
20        BFTReceiver,
21        ConsensusSender,
22        DAG,
23        PrimaryReceiver,
24        PrimarySender,
25        Storage,
26        fmt_id,
27        init_bft_channels,
28        now,
29    },
30};
31use aleo_std::StorageMode;
32use amareleo_chain_account::Account;
33use amareleo_node_bft_ledger_service::LedgerService;
34use snarkvm::{
35    console::account::Address,
36    ledger::{
37        block::Transaction,
38        committee::Committee,
39        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
40        puzzle::{Solution, SolutionID},
41    },
42    prelude::{Field, Network, Result, bail, ensure},
43};
44
45use colored::Colorize;
46use indexmap::{IndexMap, IndexSet};
47use parking_lot::{Mutex, RwLock};
48use std::{
49    collections::{BTreeMap, HashSet},
50    future::Future,
51    sync::{
52        Arc,
53        atomic::{AtomicI64, Ordering},
54    },
55};
56use tokio::{
57    sync::{Mutex as TMutex, OnceCell, oneshot},
58    task::JoinHandle,
59};
60
61#[derive(Clone)]
62pub struct BFT<N: Network> {
63    /// The primary.
64    primary: Primary<N>,
65    /// The DAG.
66    dag: Arc<RwLock<DAG<N>>>,
67    /// The batch certificate of the leader from the current even round, if one was present.
68    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
69    /// The timer for the leader certificate to be received.
70    leader_certificate_timer: Arc<AtomicI64>,
71    /// The consensus sender.
72    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
73    /// The spawned handles.
74    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
75    /// The BFT lock.
76    lock: Arc<TMutex<()>>,
77}
78
79impl<N: Network> BFT<N> {
80    /// Initializes a new instance of the BFT.
81    pub fn new(
82        account: Account<N>,
83        storage: Storage<N>,
84        keep_state: bool,
85        storage_mode: StorageMode,
86        ledger: Arc<dyn LedgerService<N>>,
87    ) -> Result<Self> {
88        Ok(Self {
89            primary: Primary::new(account, storage, keep_state, storage_mode, ledger)?,
90            dag: Default::default(),
91            leader_certificate: Default::default(),
92            leader_certificate_timer: Default::default(),
93            consensus_sender: Default::default(),
94            handles: Default::default(),
95            lock: Default::default(),
96        })
97    }
98
99    /// Run the BFT instance.
100    pub async fn run(
101        &mut self,
102        consensus_sender: Option<ConsensusSender<N>>,
103        primary_sender: PrimarySender<N>,
104        primary_receiver: PrimaryReceiver<N>,
105    ) -> Result<()> {
106        info!("Starting the BFT instance...");
107        // Initialize the BFT channels.
108        let (bft_sender, bft_receiver) = init_bft_channels::<N>();
109        // First, start the BFT handlers.
110        self.start_handlers(bft_receiver);
111        // Next, run the primary instance.
112        self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await?;
113        // Lastly, set the consensus sender.
114        // Note: This ensures during initial syncing, that the BFT does not advance the ledger.
115        if let Some(consensus_sender) = consensus_sender {
116            self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
117        }
118        Ok(())
119    }
120
121    /// Returns `true` if the primary is synced.
122    pub fn is_synced(&self) -> bool {
123        self.primary.is_synced()
124    }
125
126    /// Returns the primary.
127    pub const fn primary(&self) -> &Primary<N> {
128        &self.primary
129    }
130
131    /// Returns the storage.
132    pub const fn storage(&self) -> &Storage<N> {
133        self.primary.storage()
134    }
135
136    /// Returns the ledger.
137    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
138        self.primary.ledger()
139    }
140
141    /// Returns the leader of the current even round, if one was present.
142    pub fn leader(&self) -> Option<Address<N>> {
143        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
144    }
145
146    /// Returns the certificate of the leader from the current even round, if one was present.
147    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
148        &self.leader_certificate
149    }
150}
151
152impl<N: Network> BFT<N> {
153    /// Returns the number of unconfirmed transmissions.
154    pub fn num_unconfirmed_transmissions(&self) -> usize {
155        self.primary.num_unconfirmed_transmissions()
156    }
157
158    /// Returns the number of unconfirmed ratifications.
159    pub fn num_unconfirmed_ratifications(&self) -> usize {
160        self.primary.num_unconfirmed_ratifications()
161    }
162
163    /// Returns the number of solutions.
164    pub fn num_unconfirmed_solutions(&self) -> usize {
165        self.primary.num_unconfirmed_solutions()
166    }
167
168    /// Returns the number of unconfirmed transactions.
169    pub fn num_unconfirmed_transactions(&self) -> usize {
170        self.primary.num_unconfirmed_transactions()
171    }
172}
173
174impl<N: Network> BFT<N> {
175    /// Returns the worker transmission IDs.
176    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
177        self.primary.worker_transmission_ids()
178    }
179
180    /// Returns the worker transmissions.
181    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
182        self.primary.worker_transmissions()
183    }
184
185    /// Returns the worker solutions.
186    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
187        self.primary.worker_solutions()
188    }
189
190    /// Returns the worker transactions.
191    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
192        self.primary.worker_transactions()
193    }
194}
195
196impl<N: Network> BFT<N> {
197    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
198    fn update_to_next_round(&self, current_round: u64) -> bool {
199        // Ensure the current round is at least the storage round (this is a sanity check).
200        let storage_round = self.storage().current_round();
201        if current_round < storage_round {
202            debug!(
203                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
204            );
205            return false;
206        }
207
208        // Determine if the BFT is ready to update to the next round.
209        let is_ready = match current_round % 2 == 0 {
210            true => self.update_leader_certificate_to_even_round(current_round),
211            false => self.is_leader_quorum_or_nonleaders_available(current_round),
212        };
213
214        #[cfg(feature = "metrics")]
215        {
216            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
217            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
218            if start > 0 {
219                let end = now();
220                let elapsed = std::time::Duration::from_secs((end - start) as u64);
221                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
222            }
223        }
224
225        // Log whether the round is going to update.
226        if current_round % 2 == 0 {
227            // Determine if there is a leader certificate.
228            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
229                // Ensure the state of the leader certificate is consistent with the BFT being ready.
230                if !is_ready {
231                    trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
232                }
233                // Log the leader election.
234                let leader_round = leader_certificate.round();
235                match leader_round == current_round {
236                    true => {
237                        info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
238                        #[cfg(feature = "metrics")]
239                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
240                    }
241                    false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
242                }
243            } else {
244                match is_ready {
245                    true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
246                    false => info!("{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed()),
247                }
248            }
249        }
250
251        // If the BFT is ready, then update to the next round.
252        if is_ready {
253            // Update to the next round in storage.
254            if let Err(e) = self.storage().increment_to_next_round(current_round) {
255                warn!("BFT failed to increment to the next round from round {current_round} - {e}");
256                return false;
257            }
258            // Update the timer for the leader certificate.
259            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
260        }
261
262        is_ready
263    }
264
265    /// Updates the leader certificate to the current even round,
266    /// returning `true` if the BFT is ready to update to the next round.
267    ///
268    /// This method runs on every even round, by determining the leader of the current even round,
269    /// and setting the leader certificate to their certificate in the round, if they were present.
270    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
271        // Retrieve the current round.
272        let current_round = self.storage().current_round();
273        // Ensure the current round matches the given round.
274        if current_round != even_round {
275            warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
276            return false;
277        }
278
279        // If the current round is odd, return false.
280        if current_round % 2 != 0 || current_round < 2 {
281            error!("BFT cannot update the leader certificate in an odd round");
282            return false;
283        }
284
285        // Retrieve the certificates for the current round.
286        let current_certificates = self.storage().get_certificates_for_round(current_round);
287        // If there are no current certificates, set the leader certificate to 'None', and return early.
288        if current_certificates.is_empty() {
289            // Set the leader certificate to 'None'.
290            *self.leader_certificate.write() = None;
291            return false;
292        }
293
294        // Retrieve the committee lookback of the current round.
295        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
296            Ok(committee) => committee,
297            Err(e) => {
298                error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
299                return false;
300            }
301        };
302        // Determine the leader of the current round.
303        let leader = match self.ledger().latest_leader() {
304            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
305            _ => {
306                // Compute the leader for the current round.
307                let computed_leader = self.primary.account().address();
308                // let computed_leader = match committee_lookback.get_leader(current_round) {
309                //     Ok(leader) => leader,
310                //     Err(e) => {
311                //         error!("BFT failed to compute the leader for the even round {current_round} - {e}");
312                //         return false;
313                //     }
314                // };
315
316                // Cache the computed leader.
317                self.ledger().update_latest_leader(current_round, computed_leader);
318
319                computed_leader
320            }
321        };
322        // Find and set the leader certificate, if the leader was present in the current even round.
323        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
324        *self.leader_certificate.write() = leader_certificate.cloned();
325
326        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
327    }
328
329    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
330    ///  - If the leader certificate is set for the current even round.
331    ///  - The timer for the leader certificate has expired.
332    fn is_even_round_ready_for_next_round(
333        &self,
334        certificates: IndexSet<BatchCertificate<N>>,
335        committee: Committee<N>,
336        current_round: u64,
337    ) -> bool {
338        // Retrieve the authors for the current round.
339        let authors = certificates.into_iter().map(|c| c.author()).collect();
340        // Check if quorum threshold is reached.
341        if !committee.is_quorum_threshold_reached(&authors) {
342            trace!("BFT failed to reach quorum threshold in even round {current_round}");
343            return false;
344        }
345        // If the leader certificate is set for the current even round, return 'true'.
346        if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
347            if leader_certificate.round() == current_round {
348                return true;
349            }
350        }
351        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
352        if self.is_timer_expired() {
353            debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
354            return true;
355        }
356        // Otherwise, return 'false'.
357        false
358    }
359
360    /// Returns `true` if the timer for the leader certificate has expired.
361    fn is_timer_expired(&self) -> bool {
362        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
363    }
364
365    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
366    ///  - The leader certificate is `None`.
367    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
368    ///  - The leader certificate timer has expired.
369    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
370        // Retrieve the current round.
371        let current_round = self.storage().current_round();
372        // Ensure the current round matches the given round.
373        if current_round != odd_round {
374            warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
375            return false;
376        }
377        // If the current round is even, return false.
378        if current_round % 2 != 1 {
379            error!("BFT does not compute stakes for the leader certificate in an even round");
380            return false;
381        }
382        // Retrieve the certificates for the current round.
383        let current_certificates = self.storage().get_certificates_for_round(current_round);
384        // Retrieve the committee lookback for the current round.
385        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
386            Ok(committee) => committee,
387            Err(e) => {
388                error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
389                return false;
390            }
391        };
392        // Retrieve the authors of the current certificates.
393        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
394        // Check if quorum threshold is reached.
395        if !committee_lookback.is_quorum_threshold_reached(&authors) {
396            trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
397            return false;
398        }
399        // Retrieve the leader certificate.
400        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
401            // If there is no leader certificate for the previous round, return 'true'.
402            return true;
403        };
404        // Compute the stake for the leader certificate.
405        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
406            leader_certificate.id(),
407            current_certificates,
408            &committee_lookback,
409        );
410        // Return 'true' if any of the following conditions hold:
411        stake_with_leader >= committee_lookback.availability_threshold()
412            || stake_without_leader >= committee_lookback.quorum_threshold()
413            || self.is_timer_expired()
414    }
415
416    /// Computes the amount of stake that has & has not signed for the leader certificate.
417    fn compute_stake_for_leader_certificate(
418        &self,
419        leader_certificate_id: Field<N>,
420        current_certificates: IndexSet<BatchCertificate<N>>,
421        current_committee: &Committee<N>,
422    ) -> (u64, u64) {
423        // If there are no current certificates, return early.
424        if current_certificates.is_empty() {
425            return (0, 0);
426        }
427
428        // Initialize a tracker for the stake with the leader.
429        let mut stake_with_leader = 0u64;
430        // Initialize a tracker for the stake without the leader.
431        let mut stake_without_leader = 0u64;
432        // Iterate over the current certificates.
433        for certificate in current_certificates {
434            // Retrieve the stake for the author of the certificate.
435            let stake = current_committee.get_stake(certificate.author());
436            // Determine if the certificate includes the leader.
437            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
438                // If the certificate includes the leader, add the stake to the stake with the leader.
439                true => stake_with_leader = stake_with_leader.saturating_add(stake),
440                // If the certificate does not include the leader, add the stake to the stake without the leader.
441                false => stake_without_leader = stake_without_leader.saturating_add(stake),
442            }
443        }
444        // Return the stake with the leader, and the stake without the leader.
445        (stake_with_leader, stake_without_leader)
446    }
447}
448
449impl<N: Network> BFT<N> {
450    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
451    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
452        &self,
453        certificate: BatchCertificate<N>,
454    ) -> Result<()> {
455        // Acquire the BFT lock.
456        let _lock = self.lock.lock().await;
457
458        // Retrieve the certificate round.
459        let certificate_round = certificate.round();
460        // Insert the certificate into the DAG.
461        self.dag.write().insert(certificate);
462
463        // Construct the commit round.
464        let commit_round = certificate_round.saturating_sub(1);
465        // If the commit round is odd, return early.
466        if commit_round % 2 != 0 || commit_round < 2 {
467            return Ok(());
468        }
469        // If the commit round is at or below the last committed round, return early.
470        if commit_round <= self.dag.read().last_committed_round() {
471            return Ok(());
472        }
473
474        /* Proceeding to check if the leader is ready to be committed. */
475        info!("Checking if the leader is ready to be committed for round {commit_round}...");
476
477        // Retrieve the committee lookback for the commit round.
478        let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
479            bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
480        };
481
482        // Either retrieve the cached leader or compute it.
483        let leader = match self.ledger().latest_leader() {
484            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
485            _ => {
486                // Compute the leader for the commit round.
487                let computed_leader = self.primary.account().address();
488                // let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
489                //     bail!("BFT failed to compute the leader for commit round {commit_round}");
490                // };
491
492                // Cache the computed leader.
493                self.ledger().update_latest_leader(commit_round, computed_leader);
494
495                computed_leader
496            }
497        };
498
499        // Retrieve the leader certificate for the commit round.
500        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
501        else {
502            trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
503            return Ok(());
504        };
505        // Retrieve all of the certificates for the **certificate** round.
506        let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
507            // TODO (howardwu): Investigate how many certificates we should have at this point.
508            bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
509        };
510        // Construct a set over the authors who included the leader's certificate in the certificate round.
511        let authors = certificates
512            .values()
513            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
514                true => Some(c.author()),
515                false => None,
516            })
517            .collect();
518        // Check if the leader is ready to be committed.
519        if !committee_lookback.is_availability_threshold_reached(&authors) {
520            // If the leader is not ready to be committed, return early.
521            trace!("BFT is not ready to commit {commit_round}");
522            return Ok(());
523        }
524
525        /* Proceeding to commit the leader. */
526        info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
527
528        // Commit the leader certificate, and all previous leader certificates since the last committed round.
529        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
530    }
531
532    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
533    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
534        &self,
535        leader_certificate: BatchCertificate<N>,
536    ) -> Result<()> {
537        // Fetch the leader round.
538        let latest_leader_round = leader_certificate.round();
539        // Determine the list of all previous leader certificates since the last committed round.
540        // The order of the leader certificates is from **newest** to **oldest**.
541        let mut leader_certificates = vec![leader_certificate.clone()];
542        {
543            // Retrieve the leader round.
544            let leader_round = leader_certificate.round();
545
546            let mut current_certificate = leader_certificate;
547            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
548            {
549                // // Retrieve the previous committee for the leader round.
550                // let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
551                //     Ok(committee) => committee,
552                //     Err(e) => {
553                //         bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
554                //     }
555                // };
556
557                // Either retrieve the cached leader or compute it.
558                let leader = match self.ledger().latest_leader() {
559                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
560                    _ => {
561                        // Compute the leader for the commit round.
562                        let computed_leader = self.primary.account().address();
563                        // let computed_leader = match previous_committee_lookback.get_leader(round) {
564                        //     Ok(leader) => leader,
565                        //     Err(e) => {
566                        //         bail!("BFT failed to compute the leader for the even round {round} - {e}");
567                        //     }
568                        // };
569
570                        // Cache the computed leader.
571                        self.ledger().update_latest_leader(round, computed_leader);
572
573                        computed_leader
574                    }
575                };
576                // Retrieve the previous leader certificate.
577                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
578                else {
579                    continue;
580                };
581                // Determine if there is a path between the previous certificate and the current certificate.
582                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
583                    // Add the previous leader certificate to the list of certificates to commit.
584                    leader_certificates.push(previous_certificate.clone());
585                    // Update the current certificate to the previous leader certificate.
586                    current_certificate = previous_certificate;
587                }
588            }
589        }
590
591        // Iterate over the leader certificates to commit.
592        for leader_certificate in leader_certificates.into_iter().rev() {
593            // Retrieve the leader certificate round.
594            let leader_round = leader_certificate.round();
595            // Compute the commit subdag.
596            let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
597                Ok(subdag) => subdag,
598                Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
599            };
600            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
601            if !IS_SYNCING {
602                // Initialize a map for the deduped transmissions.
603                let mut transmissions = IndexMap::new();
604                // Initialize a map for the deduped transaction ids.
605                let mut seen_transaction_ids = IndexSet::new();
606                // Initialize a map for the deduped solution ids.
607                let mut seen_solution_ids = IndexSet::new();
608                // Start from the oldest leader certificate.
609                for certificate in commit_subdag.values().flatten() {
610                    // Retrieve the transmissions.
611                    for transmission_id in certificate.transmission_ids() {
612                        // If the transaction ID or solution ID already exists in the map, skip it.
613                        // Note: This additional check is done to ensure that we do not include duplicate
614                        // transaction IDs or solution IDs that may have a different transmission ID.
615                        match transmission_id {
616                            TransmissionID::Solution(solution_id, _) => {
617                                // If the solution already exists, skip it.
618                                if seen_solution_ids.contains(&solution_id) {
619                                    continue;
620                                }
621                            }
622                            TransmissionID::Transaction(transaction_id, _) => {
623                                // If the transaction already exists, skip it.
624                                if seen_transaction_ids.contains(transaction_id) {
625                                    continue;
626                                }
627                            }
628                            TransmissionID::Ratification => {
629                                bail!("Ratifications are currently not supported in the BFT.")
630                            }
631                        }
632                        // If the transmission already exists in the map, skip it.
633                        if transmissions.contains_key(transmission_id) {
634                            continue;
635                        }
636                        // If the transmission already exists in the ledger, skip it.
637                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
638                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
639                            continue;
640                        }
641                        // Retrieve the transmission.
642                        let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
643                            bail!(
644                                "BFT failed to retrieve transmission '{}.{}' from round {}",
645                                fmt_id(transmission_id),
646                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
647                                certificate.round()
648                            );
649                        };
650                        // Insert the transaction ID or solution ID into the map.
651                        match transmission_id {
652                            TransmissionID::Solution(id, _) => {
653                                seen_solution_ids.insert(id);
654                            }
655                            TransmissionID::Transaction(id, _) => {
656                                seen_transaction_ids.insert(id);
657                            }
658                            TransmissionID::Ratification => {}
659                        }
660                        // Add the transmission to the set.
661                        transmissions.insert(*transmission_id, transmission);
662                    }
663                }
664                // Trigger consensus, as this will build a new block for the ledger.
665                // Construct the subdag.
666                let subdag = Subdag::from(commit_subdag.clone())?;
667                // Retrieve the anchor round.
668                let anchor_round = subdag.anchor_round();
669                // Retrieve the number of transmissions.
670                let num_transmissions = transmissions.len();
671                // Retrieve metadata about the subdag.
672                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
673
674                // Ensure the subdag anchor round matches the leader round.
675                ensure!(
676                    anchor_round == leader_round,
677                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
678                );
679
680                // Trigger consensus.
681                if let Some(consensus_sender) = self.consensus_sender.get() {
682                    // Initialize a callback sender and receiver.
683                    let (callback_sender, callback_receiver) = oneshot::channel();
684                    // Send the subdag and transmissions to consensus.
685                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
686                    // Await the callback to continue.
687                    match callback_receiver.await {
688                        Ok(Ok(())) => (), // continue
689                        Ok(Err(e)) => {
690                            error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
691                            return Ok(());
692                        }
693                        Err(e) => {
694                            error!("BFT failed to receive the callback for round {anchor_round} - {e}");
695                            return Ok(());
696                        }
697                    }
698                }
699
700                info!(
701                    "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
702                );
703            }
704
705            // Update the DAG, as the subdag was successfully included into a block.
706            let mut dag_write = self.dag.write();
707            for certificate in commit_subdag.values().flatten() {
708                dag_write.commit(certificate, self.storage().max_gc_rounds());
709            }
710        }
711
712        // Perform garbage collection based on the latest committed leader round.
713        self.storage().garbage_collect_certificates(latest_leader_round);
714
715        Ok(())
716    }
717
718    /// Returns the subdag of batch certificates to commit.
719    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
720        &self,
721        leader_certificate: BatchCertificate<N>,
722    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
723        // Initialize a map for the certificates to commit.
724        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
725        // Initialize a set for the already ordered certificates.
726        let mut already_ordered = HashSet::new();
727        // Initialize a buffer for the certificates to order.
728        let mut buffer = vec![leader_certificate];
729        // Iterate over the certificates to order.
730        while let Some(certificate) = buffer.pop() {
731            // Insert the certificate into the map.
732            commit.entry(certificate.round()).or_default().insert(certificate.clone());
733
734            // Check if the previous certificate is below the GC round.
735            let previous_round = certificate.round().saturating_sub(1);
736            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
737                continue;
738            }
739            // Iterate over the previous certificate IDs.
740            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
741            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
742            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
743                // If the previous certificate is already ordered, continue.
744                if already_ordered.contains(previous_certificate_id) {
745                    continue;
746                }
747                // If the previous certificate was recently committed, continue.
748                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
749                    continue;
750                }
751                // If the previous certificate already exists in the ledger, continue.
752                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
753                    continue;
754                }
755
756                // Retrieve the previous certificate.
757                let previous_certificate = {
758                    // Start by retrieving the previous certificate from the DAG.
759                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
760                        // If the previous certificate is found, return it.
761                        Some(previous_certificate) => previous_certificate,
762                        // If the previous certificate is not found, retrieve it from the storage.
763                        None => match self.storage().get_certificate(*previous_certificate_id) {
764                            // If the previous certificate is found, return it.
765                            Some(previous_certificate) => previous_certificate,
766                            // Otherwise, the previous certificate is missing, and throw an error.
767                            None => bail!(
768                                "Missing previous certificate {} for round {previous_round}",
769                                fmt_id(previous_certificate_id)
770                            ),
771                        },
772                    }
773                };
774                // Insert the previous certificate into the set of already ordered certificates.
775                already_ordered.insert(previous_certificate.id());
776                // Insert the previous certificate into the buffer.
777                buffer.push(previous_certificate);
778            }
779        }
780        // Ensure we only retain certificates that are above the GC round.
781        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
782        // Return the certificates to commit.
783        Ok(commit)
784    }
785
786    /// Returns `true` if there is a path from the previous certificate to the current certificate.
787    fn is_linked(
788        &self,
789        previous_certificate: BatchCertificate<N>,
790        current_certificate: BatchCertificate<N>,
791    ) -> Result<bool> {
792        // Initialize the list containing the traversal.
793        let mut traversal = vec![current_certificate.clone()];
794        // Iterate over the rounds from the current certificate to the previous certificate.
795        for round in (previous_certificate.round()..current_certificate.round()).rev() {
796            // Retrieve all of the certificates for this past round.
797            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
798                // This is a critical error, as the traversal should have these certificates.
799                // If this error is hit, it is likely that the maximum GC rounds should be increased.
800                bail!("BFT failed to retrieve the certificates for past round {round}");
801            };
802            // Filter the certificates to only include those that are in the traversal.
803            traversal = certificates
804                .into_values()
805                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
806                .collect();
807        }
808        Ok(traversal.contains(&previous_certificate))
809    }
810}
811
812impl<N: Network> BFT<N> {
813    /// Starts the BFT handlers.
814    fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
815        let BFTReceiver {
816            mut rx_primary_round,
817            mut rx_primary_certificate,
818            mut rx_sync_bft_dag_at_bootup,
819            mut rx_sync_bft,
820        } = bft_receiver;
821
822        // Process the current round from the primary.
823        let self_ = self.clone();
824        self.spawn(async move {
825            while let Some((current_round, callback)) = rx_primary_round.recv().await {
826                callback.send(self_.update_to_next_round(current_round)).ok();
827            }
828        });
829
830        // Process the certificate from the primary.
831        let self_ = self.clone();
832        self.spawn(async move {
833            while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
834                // Update the DAG with the certificate.
835                let result = self_.update_dag::<true, false>(certificate).await;
836                // Send the callback **after** updating the DAG.
837                // Note: We must await the DAG update before proceeding.
838                callback.send(result).ok();
839            }
840        });
841
842        // Process the request to sync the BFT DAG at bootup.
843        let self_ = self.clone();
844        self.spawn(async move {
845            while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
846                self_.sync_bft_dag_at_bootup(certificates).await;
847            }
848        });
849
850        // Process the request to sync the BFT.
851        let self_ = self.clone();
852        self.spawn(async move {
853            while let Some((certificate, callback)) = rx_sync_bft.recv().await {
854                // Update the DAG with the certificate.
855                let result = self_.update_dag::<true, true>(certificate).await;
856                // Send the callback **after** updating the DAG.
857                // Note: We must await the DAG update before proceeding.
858                callback.send(result).ok();
859            }
860        });
861    }
862
863    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
864    /// already exist in the ledger.
865    ///
866    /// This method commits all the certificates into the DAG.
867    /// Note that there is no need to insert the certificates into the DAG, because these certificates
868    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
869    async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
870        // Acquire the BFT write lock.
871        let mut dag = self.dag.write();
872
873        // Commit all the certificates excluding the latest leader certificate.
874        for certificate in certificates {
875            dag.commit(&certificate, self.storage().max_gc_rounds());
876        }
877    }
878
879    /// Spawns a task with the given future; it should only be used for long-running tasks.
880    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
881        self.handles.lock().push(tokio::spawn(future));
882    }
883
884    /// Shuts down the BFT.
885    pub async fn shut_down(&self) {
886        info!("Shutting down the BFT...");
887        // Acquire the lock.
888        let _lock = self.lock.lock().await;
889        // Shut down the primary.
890        self.primary.shut_down().await;
891        // Abort the tasks.
892        self.handles.lock().iter().for_each(|handle| handle.abort());
893    }
894}
895
896#[cfg(test)]
897mod tests {
898    use crate::{
899        BFT,
900        MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
901        helpers::{Storage, amareleo_storage_mode},
902    };
903
904    use amareleo_chain_account::Account;
905    use amareleo_node_bft_ledger_service::MockLedgerService;
906    use amareleo_node_bft_storage_service::BFTMemoryService;
907    use snarkvm::{
908        console::account::{Address, PrivateKey},
909        ledger::{
910            committee::Committee,
911            narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
912        },
913        utilities::TestRng,
914    };
915
916    use aleo_std::StorageMode;
917    use anyhow::Result;
918    use indexmap::{IndexMap, IndexSet};
919    use rand::SeedableRng;
920    use rand_chacha::ChaChaRng;
921    use std::sync::Arc;
922
923    type CurrentNetwork = snarkvm::console::network::MainnetV0;
924
925    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
926    fn sample_test_instance(
927        committee_round: Option<u64>,
928        max_gc_rounds: u64,
929        rng: &mut TestRng,
930    ) -> (
931        Committee<CurrentNetwork>,
932        Account<CurrentNetwork>,
933        Arc<MockLedgerService<CurrentNetwork>>,
934        Storage<CurrentNetwork>,
935    ) {
936        let committee = match committee_round {
937            Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
938            None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
939        };
940        let account = Account::new(rng).unwrap();
941        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
942        let transmissions = Arc::new(BFTMemoryService::new());
943        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
944
945        (committee, account, ledger, storage)
946    }
947
948    #[test]
949    #[tracing_test::traced_test]
950    fn test_is_leader_quorum_odd() -> Result<()> {
951        let rng = &mut TestRng::default();
952
953        // Sample batch certificates.
954        let mut certificates = IndexSet::new();
955        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
956        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
957        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
958        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
959
960        // Initialize the committee.
961        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
962            1,
963            vec![
964                certificates[0].author(),
965                certificates[1].author(),
966                certificates[2].author(),
967                certificates[3].author(),
968            ],
969            rng,
970        );
971
972        // Initialize the ledger.
973        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
974        // Initialize the storage.
975        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
976        // Initialize the account.
977        let account = Account::new(rng)?;
978        // Initialize the BFT.
979        let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone())?;
980        assert!(bft.is_timer_expired());
981        // Ensure this call succeeds on an odd round.
982        let result = bft.is_leader_quorum_or_nonleaders_available(1);
983        // If timer has expired but quorum threshold is not reached, return 'false'.
984        assert!(!result);
985        // Insert certificates into storage.
986        for certificate in certificates.iter() {
987            storage.testing_only_insert_certificate_testing_only(certificate.clone());
988        }
989        // Ensure this call succeeds on an odd round.
990        let result = bft.is_leader_quorum_or_nonleaders_available(1);
991        assert!(result); // no previous leader certificate
992        // Set the leader certificate.
993        let leader_certificate = sample_batch_certificate(rng);
994        *bft.leader_certificate.write() = Some(leader_certificate);
995        // Ensure this call succeeds on an odd round.
996        let result = bft.is_leader_quorum_or_nonleaders_available(1);
997        assert!(result); // should now fall through to the end of function
998
999        Ok(())
1000    }
1001
1002    #[test]
1003    #[tracing_test::traced_test]
1004    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1005        let rng = &mut TestRng::default();
1006
1007        // Sample the test instance.
1008        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1009        assert_eq!(committee.starting_round(), 1);
1010        assert_eq!(storage.current_round(), 1);
1011        assert_eq!(storage.max_gc_rounds(), 10);
1012
1013        // Initialize the BFT.
1014        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1015        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1016
1017        // Store is at round 1, and we are checking for round 2.
1018        // Ensure this call fails on an even round.
1019        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1020        assert!(!result);
1021        Ok(())
1022    }
1023
1024    #[test]
1025    #[tracing_test::traced_test]
1026    fn test_is_leader_quorum_even() -> Result<()> {
1027        let rng = &mut TestRng::default();
1028
1029        // Sample the test instance.
1030        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1031        assert_eq!(committee.starting_round(), 2);
1032        assert_eq!(storage.current_round(), 2);
1033        assert_eq!(storage.max_gc_rounds(), 10);
1034
1035        // Initialize the BFT.
1036        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1037        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1038
1039        // Ensure this call fails on an even round.
1040        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1041        assert!(!result);
1042        Ok(())
1043    }
1044
1045    #[test]
1046    #[tracing_test::traced_test]
1047    fn test_is_even_round_ready() -> Result<()> {
1048        let rng = &mut TestRng::default();
1049
1050        // Sample batch certificates.
1051        let mut certificates = IndexSet::new();
1052        certificates.insert(sample_batch_certificate_for_round(2, rng));
1053        certificates.insert(sample_batch_certificate_for_round(2, rng));
1054        certificates.insert(sample_batch_certificate_for_round(2, rng));
1055        certificates.insert(sample_batch_certificate_for_round(2, rng));
1056
1057        // Initialize the committee.
1058        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1059            2,
1060            vec![
1061                certificates[0].author(),
1062                certificates[1].author(),
1063                certificates[2].author(),
1064                certificates[3].author(),
1065            ],
1066            rng,
1067        );
1068
1069        // Initialize the ledger.
1070        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1071        // Initialize the storage.
1072        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1073        // Initialize the account.
1074        let account = Account::new(rng)?;
1075        // Initialize the BFT.
1076        let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone())?;
1077        // Set the leader certificate.
1078        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1079        *bft.leader_certificate.write() = Some(leader_certificate);
1080        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1081        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1082        assert!(!result);
1083        // Once quorum threshold is reached, we are ready for the next round.
1084        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1085        assert!(result);
1086
1087        // Initialize a new BFT.
1088        let bft_timer = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone())?;
1089        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1090        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1091        if !bft_timer.is_timer_expired() {
1092            assert!(!result);
1093        }
1094        // Wait for the timer to expire.
1095        let leader_certificate_timeout =
1096            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1097        std::thread::sleep(leader_certificate_timeout);
1098        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1099        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1100        if bft_timer.is_timer_expired() {
1101            assert!(result);
1102        } else {
1103            assert!(!result);
1104        }
1105
1106        Ok(())
1107    }
1108
1109    #[test]
1110    #[tracing_test::traced_test]
1111    fn test_update_leader_certificate_odd() -> Result<()> {
1112        let rng = &mut TestRng::default();
1113
1114        // Sample the test instance.
1115        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1116        assert_eq!(storage.max_gc_rounds(), 10);
1117
1118        // Initialize the BFT.
1119        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1120
1121        // Ensure this call fails on an odd round.
1122        let result = bft.update_leader_certificate_to_even_round(1);
1123        assert!(!result);
1124        Ok(())
1125    }
1126
1127    #[test]
1128    #[tracing_test::traced_test]
1129    fn test_update_leader_certificate_bad_round() -> Result<()> {
1130        let rng = &mut TestRng::default();
1131
1132        // Sample the test instance.
1133        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1134        assert_eq!(storage.max_gc_rounds(), 10);
1135
1136        // Initialize the BFT.
1137        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1138
1139        // Ensure this call succeeds on an even round.
1140        let result = bft.update_leader_certificate_to_even_round(6);
1141        assert!(!result);
1142        Ok(())
1143    }
1144
1145    #[test]
1146    #[tracing_test::traced_test]
1147    fn test_update_leader_certificate_even() -> Result<()> {
1148        let rng = &mut TestRng::default();
1149
1150        // Set the current round.
1151        let current_round = 3;
1152
1153        // Sample the certificates.
1154        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1155            current_round,
1156            rng,
1157        );
1158
1159        // Initialize the committee.
1160        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1161            2,
1162            vec![
1163                certificates[0].author(),
1164                certificates[1].author(),
1165                certificates[2].author(),
1166                certificates[3].author(),
1167            ],
1168            rng,
1169        );
1170
1171        // Initialize the ledger.
1172        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1173
1174        // Initialize the storage.
1175        let transmissions = Arc::new(BFTMemoryService::new());
1176        let storage = Storage::new(ledger.clone(), transmissions, 10);
1177        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1178        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1179        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1180        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1181        assert_eq!(storage.current_round(), 2);
1182
1183        // Retrieve the leader certificate.
1184        let leader = certificates[0].author(); // committee.get_leader(2).unwrap();
1185        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1186
1187        // Initialize the BFT.
1188        let account = Account::new(rng)?;
1189        let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger)?;
1190
1191        // Set the leader certificate.
1192        *bft.leader_certificate.write() = Some(leader_certificate);
1193
1194        // Update the leader certificate.
1195        // Ensure this call succeeds on an even round.
1196        let result = bft.update_leader_certificate_to_even_round(2);
1197        assert!(result);
1198
1199        Ok(())
1200    }
1201
1202    #[tokio::test]
1203    #[tracing_test::traced_test]
1204    async fn test_order_dag_with_dfs() -> Result<()> {
1205        let rng = &mut TestRng::default();
1206
1207        // Sample the test instance.
1208        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1209
1210        // Initialize the round parameters.
1211        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1212        let current_round = previous_round + 1;
1213
1214        // Sample the current certificate and previous certificates.
1215        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1216            current_round,
1217            rng,
1218        );
1219
1220        /* Test GC */
1221
1222        // Ensure the function succeeds in returning only certificates above GC.
1223        {
1224            // Initialize the storage.
1225            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1226            // Initialize the BFT.
1227            let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone())?;
1228
1229            // Insert a mock DAG in the BFT.
1230            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1231
1232            // Insert the previous certificates into the BFT.
1233            for certificate in previous_certificates.clone() {
1234                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1235            }
1236
1237            // Ensure this call succeeds and returns all given certificates.
1238            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1239            assert!(result.is_ok());
1240            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1241            assert_eq!(candidate_certificates.len(), 1);
1242            let expected_certificates = vec![certificate.clone()];
1243            assert_eq!(
1244                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1245                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1246            );
1247            assert_eq!(candidate_certificates, expected_certificates);
1248        }
1249
1250        /* Test normal case */
1251
1252        // Ensure the function succeeds in returning all given certificates.
1253        {
1254            // Initialize the storage.
1255            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1256            // Initialize the BFT.
1257            let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1258
1259            // Insert a mock DAG in the BFT.
1260            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1261
1262            // Insert the previous certificates into the BFT.
1263            for certificate in previous_certificates.clone() {
1264                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1265            }
1266
1267            // Ensure this call succeeds and returns all given certificates.
1268            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1269            assert!(result.is_ok());
1270            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1271            assert_eq!(candidate_certificates.len(), 5);
1272            let expected_certificates = vec![
1273                previous_certificates[0].clone(),
1274                previous_certificates[1].clone(),
1275                previous_certificates[2].clone(),
1276                previous_certificates[3].clone(),
1277                certificate,
1278            ];
1279            assert_eq!(
1280                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1281                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1282            );
1283            assert_eq!(candidate_certificates, expected_certificates);
1284        }
1285
1286        Ok(())
1287    }
1288
1289    #[test]
1290    #[tracing_test::traced_test]
1291    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1292        let rng = &mut TestRng::default();
1293
1294        // Sample the test instance.
1295        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1296        assert_eq!(committee.starting_round(), 1);
1297        assert_eq!(storage.current_round(), 1);
1298        assert_eq!(storage.max_gc_rounds(), 1);
1299
1300        // Initialize the round parameters.
1301        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1302        let current_round = previous_round + 1;
1303
1304        // Sample the current certificate and previous certificates.
1305        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1306            current_round,
1307            rng,
1308        );
1309        // Construct the previous certificate IDs.
1310        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1311
1312        /* Test missing previous certificate. */
1313
1314        // Initialize the BFT.
1315        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger)?;
1316
1317        // The expected error message.
1318        let error_msg = format!(
1319            "Missing previous certificate {} for round {previous_round}",
1320            crate::helpers::fmt_id(previous_certificate_ids[3]),
1321        );
1322
1323        // Ensure this call fails on a missing previous certificate.
1324        let result = bft.order_dag_with_dfs::<false>(certificate);
1325        assert!(result.is_err());
1326        assert_eq!(result.unwrap_err().to_string(), error_msg);
1327        Ok(())
1328    }
1329
1330    #[tokio::test]
1331    #[tracing_test::traced_test]
1332    async fn test_bft_gc_on_commit() -> Result<()> {
1333        let rng = &mut TestRng::default();
1334
1335        // Initialize the round parameters.
1336        let max_gc_rounds = 1;
1337        let committee_round = 0;
1338        let commit_round = 2;
1339        let current_round = commit_round + 1;
1340
1341        // Sample the certificates.
1342        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1343            current_round,
1344            rng,
1345        );
1346
1347        // Initialize the committee.
1348        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1349            committee_round,
1350            vec![
1351                certificates[0].author(),
1352                certificates[1].author(),
1353                certificates[2].author(),
1354                certificates[3].author(),
1355            ],
1356            rng,
1357        );
1358
1359        // Initialize the ledger.
1360        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1361
1362        // Initialize the storage.
1363        let transmissions = Arc::new(BFTMemoryService::new());
1364        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1365        // Insert the certificates into the storage.
1366        for certificate in certificates.iter() {
1367            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1368        }
1369
1370        // Get the leader certificate.
1371        let leader = certificates[0].author(); // committee.get_leader(commit_round).unwrap();
1372        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1373
1374        // Initialize the BFT.
1375        let account = Account::new(rng)?;
1376        let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger)?;
1377        // Insert a mock DAG in the BFT.
1378        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1379
1380        // Ensure that the `gc_round` has not been updated yet.
1381        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1382
1383        // Insert the certificates into the BFT.
1384        for certificate in certificates {
1385            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1386        }
1387
1388        // Commit the leader certificate.
1389        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1390
1391        // Ensure that the `gc_round` has been updated.
1392        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1393
1394        Ok(())
1395    }
1396
1397    #[tokio::test]
1398    #[tracing_test::traced_test]
1399    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1400        let rng = &mut TestRng::default();
1401
1402        // Initialize the round parameters.
1403        let max_gc_rounds = 1;
1404        let committee_round = 0;
1405        let commit_round = 2;
1406        let current_round = commit_round + 1;
1407
1408        // Sample the current certificate and previous certificates.
1409        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1410            current_round,
1411            rng,
1412        );
1413
1414        // Initialize the committee.
1415        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1416            committee_round,
1417            vec![
1418                certificates[0].author(),
1419                certificates[1].author(),
1420                certificates[2].author(),
1421                certificates[3].author(),
1422            ],
1423            rng,
1424        );
1425
1426        // Initialize the ledger.
1427        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1428
1429        // Initialize the storage.
1430        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1431        // Insert the certificates into the storage.
1432        for certificate in certificates.iter() {
1433            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1434        }
1435
1436        // Get the leader certificate.
1437        let leader = certificates[0].author(); // committee.get_leader(commit_round).unwrap();
1438        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1439
1440        // Initialize the BFT.
1441        let account = Account::new(rng)?;
1442        let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone())?;
1443
1444        // Insert a mock DAG in the BFT.
1445        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1446
1447        // Insert the previous certificates into the BFT.
1448        for certificate in certificates.clone() {
1449            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1450        }
1451
1452        // Commit the leader certificate.
1453        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1454
1455        // Simulate a bootup of the BFT.
1456
1457        // Initialize a new instance of storage.
1458        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1459        // Initialize a new instance of BFT.
1460        let bootup_bft = BFT::new(account, storage_2, false, StorageMode::Development(0), ledger)?;
1461
1462        // Sync the BFT DAG at bootup.
1463        bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1464
1465        // Check that the BFT starts from the same last committed round.
1466        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1467
1468        // Ensure that both BFTs have committed the leader certificate.
1469        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1470        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1471
1472        // Check the state of the bootup BFT.
1473        for certificate in certificates {
1474            let certificate_round = certificate.round();
1475            let certificate_id = certificate.id();
1476            // Check that the bootup BFT has committed the certificates.
1477            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1478            // Check that the bootup BFT does not contain the certificates in its graph, because
1479            // it should not need to order them again in subsequent subdags.
1480            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1481        }
1482
1483        Ok(())
1484    }
1485
1486    #[tokio::test]
1487    #[tracing_test::traced_test]
1488    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1489        /*
1490        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1491        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1492        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1493        */
1494
1495        let rng = &mut TestRng::default();
1496        let rng_pks = &mut ChaChaRng::seed_from_u64(1234567890u64);
1497
1498        let private_keys = vec![
1499            PrivateKey::new(rng_pks).unwrap(),
1500            PrivateKey::new(rng_pks).unwrap(),
1501            PrivateKey::new(rng_pks).unwrap(),
1502            PrivateKey::new(rng_pks).unwrap(),
1503        ];
1504        let address0 = Address::try_from(private_keys[0])?;
1505
1506        // Initialize the round parameters.
1507        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1508        let committee_round = 0;
1509        let commit_round = 2;
1510        let current_round = commit_round + 1;
1511        let next_round = current_round + 1;
1512
1513        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1514        let (round_to_certificates_map, committee) = {
1515            let addresses = vec![
1516                Address::try_from(private_keys[0])?,
1517                Address::try_from(private_keys[1])?,
1518                Address::try_from(private_keys[2])?,
1519                Address::try_from(private_keys[3])?,
1520            ];
1521            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1522                committee_round,
1523                addresses,
1524                rng,
1525            );
1526            // Initialize a mapping from the round number to the set of batch certificates in the round.
1527            let mut round_to_certificates_map: IndexMap<
1528                u64,
1529                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1530            > = IndexMap::new();
1531            let mut previous_certificates = IndexSet::with_capacity(4);
1532            // Initialize the genesis batch certificates.
1533            for _ in 0..4 {
1534                previous_certificates.insert(sample_batch_certificate(rng));
1535            }
1536            for round in 0..commit_round + 3 {
1537                let mut current_certificates = IndexSet::new();
1538                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1539                    IndexSet::new()
1540                } else {
1541                    previous_certificates.iter().map(|c| c.id()).collect()
1542                };
1543                let transmission_ids =
1544                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1545                        .into_iter()
1546                        .collect::<IndexSet<_>>();
1547                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1548                let committee_id = committee.id();
1549                for (i, private_key_1) in private_keys.iter().enumerate() {
1550                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1551                        private_key_1,
1552                        round,
1553                        timestamp,
1554                        committee_id,
1555                        transmission_ids.clone(),
1556                        previous_certificate_ids.clone(),
1557                        rng,
1558                    )
1559                    .unwrap();
1560                    let mut signatures = IndexSet::with_capacity(4);
1561                    for (j, private_key_2) in private_keys.iter().enumerate() {
1562                        if i != j {
1563                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1564                        }
1565                    }
1566                    let certificate =
1567                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1568                    current_certificates.insert(certificate);
1569                }
1570                // Update the mapping.
1571                round_to_certificates_map.insert(round, current_certificates.clone());
1572                previous_certificates = current_certificates.clone();
1573            }
1574            (round_to_certificates_map, committee)
1575        };
1576
1577        // Initialize the ledger.
1578        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1579        // Initialize the storage.
1580        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1581        // Get the leaders for the next 2 commit rounds.
1582        let leader = address0; // committee.get_leader(commit_round).unwrap();
1583        let next_leader = address0; // committee.get_leader(next_round).unwrap();
1584        // Insert the pre shutdown certificates into the storage.
1585        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1586        for i in 1..=commit_round {
1587            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1588            if i == commit_round {
1589                // Only insert the leader certificate for the commit round.
1590                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1591                if let Some(c) = leader_certificate {
1592                    pre_shutdown_certificates.push(c.clone());
1593                }
1594                continue;
1595            }
1596            pre_shutdown_certificates.extend(certificates);
1597        }
1598        for certificate in pre_shutdown_certificates.iter() {
1599            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1600        }
1601        // Insert the post shutdown certificates into the storage.
1602        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1603            Vec::new();
1604        for j in commit_round..=commit_round + 2 {
1605            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1606            post_shutdown_certificates.extend(certificate);
1607        }
1608        for certificate in post_shutdown_certificates.iter() {
1609            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1610        }
1611        // Get the leader certificates.
1612        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1613        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1614
1615        // Initialize the BFT without bootup.
1616        let account = Account::try_from(private_keys[0])?;
1617        let bft = BFT::new(account.clone(), storage, true, amareleo_storage_mode(1, true, None), ledger.clone())?;
1618
1619        // Insert a mock DAG in the BFT without bootup.
1620        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1621
1622        // Insert the certificates into the BFT without bootup.
1623        for certificate in pre_shutdown_certificates.clone() {
1624            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1625        }
1626
1627        // Insert the post shutdown certificates into the BFT without bootup.
1628        for certificate in post_shutdown_certificates.clone() {
1629            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1630        }
1631        // Commit the second leader certificate.
1632        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1633        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1634        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1635
1636        // Simulate a bootup of the BFT.
1637
1638        // Initialize a new instance of storage.
1639        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1640
1641        // Initialize a new instance of BFT with bootup.
1642        let bootup_bft =
1643            BFT::new(account, bootup_storage.clone(), true, amareleo_storage_mode(1, true, None), ledger.clone())?;
1644
1645        // Sync the BFT DAG at bootup.
1646        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1647
1648        // Insert the post shutdown certificates to the storage and BFT with bootup.
1649        for certificate in post_shutdown_certificates.iter() {
1650            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1651        }
1652        for certificate in post_shutdown_certificates.clone() {
1653            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1654        }
1655        // Commit the second leader certificate.
1656        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1657        let commit_subdag_metadata_bootup =
1658            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1659        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1660        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1661
1662        // Check that the final state of both BFTs is the same.
1663
1664        // Check that both BFTs start from the same last committed round.
1665        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1666
1667        // Ensure that both BFTs have committed the leader certificates.
1668        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1669        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1670        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1671        assert!(
1672            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1673        );
1674
1675        // Check that the bootup BFT has committed the pre shutdown certificates.
1676        for certificate in pre_shutdown_certificates.clone() {
1677            let certificate_round = certificate.round();
1678            let certificate_id = certificate.id();
1679            // Check that both BFTs have committed the certificates.
1680            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1681            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1682            // Check that the bootup BFT does not contain the certificates in its graph, because
1683            // it should not need to order them again in subsequent subdags.
1684            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1685            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1686        }
1687
1688        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1689        for certificate in committed_certificates_bootup.clone() {
1690            let certificate_round = certificate.round();
1691            let certificate_id = certificate.id();
1692            // Check that the both BFTs have committed the certificates.
1693            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1694            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1695            // Check that the bootup BFT does not contain the certificates in its graph, because
1696            // it should not need to order them again in subsequent subdags.
1697            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1698            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1699        }
1700
1701        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1702        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1703
1704        Ok(())
1705    }
1706
1707    #[tokio::test]
1708    #[tracing_test::traced_test]
1709    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1710        /*
1711        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1712        2. Add post shutdown certificates to the bootup BFT.
1713        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1714        */
1715
1716        let rng = &mut TestRng::default();
1717        let rng_pks = &mut ChaChaRng::seed_from_u64(1234567890u64);
1718
1719        let private_keys = vec![
1720            PrivateKey::new(rng_pks).unwrap(),
1721            PrivateKey::new(rng_pks).unwrap(),
1722            PrivateKey::new(rng_pks).unwrap(),
1723            PrivateKey::new(rng_pks).unwrap(),
1724        ];
1725        let address0 = Address::try_from(private_keys[0])?;
1726
1727        // Initialize the round parameters.
1728        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1729        let committee_round = 0;
1730        let commit_round = 2;
1731        let current_round = commit_round + 1;
1732        let next_round = current_round + 1;
1733
1734        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1735        let (round_to_certificates_map, committee) = {
1736            let addresses = vec![
1737                Address::try_from(private_keys[0])?,
1738                Address::try_from(private_keys[1])?,
1739                Address::try_from(private_keys[2])?,
1740                Address::try_from(private_keys[3])?,
1741            ];
1742            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1743                committee_round,
1744                addresses,
1745                rng,
1746            );
1747            // Initialize a mapping from the round number to the set of batch certificates in the round.
1748            let mut round_to_certificates_map: IndexMap<
1749                u64,
1750                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1751            > = IndexMap::new();
1752            let mut previous_certificates = IndexSet::with_capacity(4);
1753            // Initialize the genesis batch certificates.
1754            for _ in 0..4 {
1755                previous_certificates.insert(sample_batch_certificate(rng));
1756            }
1757            for round in 0..=commit_round + 2 {
1758                let mut current_certificates = IndexSet::new();
1759                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1760                    IndexSet::new()
1761                } else {
1762                    previous_certificates.iter().map(|c| c.id()).collect()
1763                };
1764                let transmission_ids =
1765                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1766                        .into_iter()
1767                        .collect::<IndexSet<_>>();
1768                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1769                let committee_id = committee.id();
1770                for (i, private_key_1) in private_keys.iter().enumerate() {
1771                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1772                        private_key_1,
1773                        round,
1774                        timestamp,
1775                        committee_id,
1776                        transmission_ids.clone(),
1777                        previous_certificate_ids.clone(),
1778                        rng,
1779                    )
1780                    .unwrap();
1781                    let mut signatures = IndexSet::with_capacity(4);
1782                    for (j, private_key_2) in private_keys.iter().enumerate() {
1783                        if i != j {
1784                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1785                        }
1786                    }
1787                    let certificate =
1788                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1789                    current_certificates.insert(certificate);
1790                }
1791                // Update the mapping.
1792                round_to_certificates_map.insert(round, current_certificates.clone());
1793                previous_certificates = current_certificates.clone();
1794            }
1795            (round_to_certificates_map, committee)
1796        };
1797
1798        // Initialize the ledger.
1799        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1800        // Initialize the storage.
1801        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1802        // Get the leaders for the next 2 commit rounds.
1803        let leader = address0; // committee.get_leader(commit_round).unwrap();
1804        let next_leader = address0; // committee.get_leader(next_round).unwrap();
1805        // Insert the pre shutdown certificates into the storage.
1806        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1807        for i in 1..=commit_round {
1808            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1809            if i == commit_round {
1810                // Only insert the leader certificate for the commit round.
1811                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1812                if let Some(c) = leader_certificate {
1813                    pre_shutdown_certificates.push(c.clone());
1814                }
1815                continue;
1816            }
1817            pre_shutdown_certificates.extend(certificates);
1818        }
1819        for certificate in pre_shutdown_certificates.iter() {
1820            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1821        }
1822        // Initialize the bootup BFT.
1823        let account = Account::try_from(private_keys[0])?;
1824        let bootup_bft =
1825            BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone())?;
1826        // Insert a mock DAG in the BFT without bootup.
1827        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1828        // Sync the BFT DAG at bootup.
1829        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1830
1831        // Insert the post shutdown certificates into the storage.
1832        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1833            Vec::new();
1834        for j in commit_round..=commit_round + 2 {
1835            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1836            post_shutdown_certificates.extend(certificate);
1837        }
1838        for certificate in post_shutdown_certificates.iter() {
1839            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1840        }
1841
1842        // Insert the post shutdown certificates into the DAG.
1843        for certificate in post_shutdown_certificates.clone() {
1844            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1845        }
1846
1847        // Get the next leader certificate to commit.
1848        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1849        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1850        let committed_certificates = commit_subdag.values().flatten();
1851
1852        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1853        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1854            for committed_certificate in committed_certificates.clone() {
1855                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1856            }
1857        }
1858        Ok(())
1859    }
1860}