amareleo_node_bft/
bft.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    Primary,
19    helpers::{
20        BFTReceiver,
21        ConsensusSender,
22        DAG,
23        PrimaryReceiver,
24        PrimarySender,
25        Storage,
26        fmt_id,
27        init_bft_channels,
28        now,
29    },
30};
31use aleo_std::StorageMode;
32use amareleo_chain_account::Account;
33use amareleo_chain_tracing::TracingHandler;
34use amareleo_node_bft_ledger_service::LedgerService;
35use snarkvm::{
36    console::account::Address,
37    ledger::{
38        block::Transaction,
39        committee::Committee,
40        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
41        puzzle::{Solution, SolutionID},
42    },
43    prelude::{Field, Network, Result, bail, ensure},
44};
45
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48use parking_lot::{Mutex, RwLock};
49use std::{
50    collections::{BTreeMap, HashSet},
51    future::Future,
52    sync::{
53        Arc,
54        atomic::{AtomicI64, Ordering},
55    },
56};
57use tokio::{
58    sync::{Mutex as TMutex, OnceCell, oneshot},
59    task::JoinHandle,
60};
61use tracing::subscriber::DefaultGuard;
62
63#[derive(Clone)]
64pub struct BFT<N: Network> {
65    /// The primary.
66    primary: Primary<N>,
67    /// The DAG.
68    dag: Arc<RwLock<DAG<N>>>,
69    /// The batch certificate of the leader from the current even round, if one was present.
70    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
71    /// The timer for the leader certificate to be received.
72    leader_certificate_timer: Arc<AtomicI64>,
73    /// The consensus sender.
74    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
75    /// Tracing handle
76    tracing: Option<TracingHandler>,
77    /// The spawned handles.
78    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
79    /// The BFT lock.
80    lock: Arc<TMutex<()>>,
81}
82
83impl<N: Network> BFT<N> {
84    /// Initializes a new instance of the BFT.
85    pub fn new(
86        account: Account<N>,
87        storage: Storage<N>,
88        keep_state: bool,
89        storage_mode: StorageMode,
90        ledger: Arc<dyn LedgerService<N>>,
91        tracing: Option<TracingHandler>,
92    ) -> Result<Self> {
93        Ok(Self {
94            primary: Primary::new(account, storage, keep_state, storage_mode, ledger, tracing.clone())?,
95            dag: Default::default(),
96            leader_certificate: Default::default(),
97            leader_certificate_timer: Default::default(),
98            consensus_sender: Default::default(),
99            tracing: tracing.clone(),
100            handles: Default::default(),
101            lock: Default::default(),
102        })
103    }
104
105    /// Run the BFT instance.
106    pub async fn run(
107        &mut self,
108        consensus_sender: Option<ConsensusSender<N>>,
109        primary_sender: PrimarySender<N>,
110        primary_receiver: PrimaryReceiver<N>,
111    ) -> Result<()> {
112        let _guard = self.get_tracing_guard();
113        info!("Starting the BFT instance...");
114        // Initialize the BFT channels.
115        let (bft_sender, bft_receiver) = init_bft_channels::<N>();
116        // First, start the BFT handlers.
117        self.start_handlers(bft_receiver);
118        // Next, run the primary instance.
119        self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await?;
120        // Lastly, set the consensus sender.
121        // Note: This ensures during initial syncing, that the BFT does not advance the ledger.
122        if let Some(consensus_sender) = consensus_sender {
123            self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
124        }
125        Ok(())
126    }
127
128    /// Returns `true` if the primary is synced.
129    pub fn is_synced(&self) -> bool {
130        self.primary.is_synced()
131    }
132
133    /// Returns the primary.
134    pub const fn primary(&self) -> &Primary<N> {
135        &self.primary
136    }
137
138    /// Returns the storage.
139    pub const fn storage(&self) -> &Storage<N> {
140        self.primary.storage()
141    }
142
143    /// Returns the ledger.
144    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
145        self.primary.ledger()
146    }
147
148    /// Returns the leader of the current even round, if one was present.
149    pub fn leader(&self) -> Option<Address<N>> {
150        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
151    }
152
153    /// Returns the certificate of the leader from the current even round, if one was present.
154    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
155        &self.leader_certificate
156    }
157
158    /// Retruns tracing guard
159    pub fn get_tracing_guard(&self) -> Option<DefaultGuard> {
160        self.tracing.clone().map(|trace_handle| trace_handle.subscribe_thread())
161    }
162}
163
164impl<N: Network> BFT<N> {
165    /// Returns the number of unconfirmed transmissions.
166    pub fn num_unconfirmed_transmissions(&self) -> usize {
167        self.primary.num_unconfirmed_transmissions()
168    }
169
170    /// Returns the number of unconfirmed ratifications.
171    pub fn num_unconfirmed_ratifications(&self) -> usize {
172        self.primary.num_unconfirmed_ratifications()
173    }
174
175    /// Returns the number of solutions.
176    pub fn num_unconfirmed_solutions(&self) -> usize {
177        self.primary.num_unconfirmed_solutions()
178    }
179
180    /// Returns the number of unconfirmed transactions.
181    pub fn num_unconfirmed_transactions(&self) -> usize {
182        self.primary.num_unconfirmed_transactions()
183    }
184}
185
186impl<N: Network> BFT<N> {
187    /// Returns the worker transmission IDs.
188    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
189        self.primary.worker_transmission_ids()
190    }
191
192    /// Returns the worker transmissions.
193    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
194        self.primary.worker_transmissions()
195    }
196
197    /// Returns the worker solutions.
198    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
199        self.primary.worker_solutions()
200    }
201
202    /// Returns the worker transactions.
203    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
204        self.primary.worker_transactions()
205    }
206}
207
208impl<N: Network> BFT<N> {
209    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
210    fn update_to_next_round(&self, current_round: u64) -> bool {
211        let _guard = self.get_tracing_guard();
212
213        // Ensure the current round is at least the storage round (this is a sanity check).
214        let storage_round = self.storage().current_round();
215        if current_round < storage_round {
216            debug!(
217                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
218            );
219            return false;
220        }
221
222        // Determine if the BFT is ready to update to the next round.
223        let is_ready = match current_round % 2 == 0 {
224            true => self.update_leader_certificate_to_even_round(current_round),
225            false => self.is_leader_quorum_or_nonleaders_available(current_round),
226        };
227
228        #[cfg(feature = "metrics")]
229        {
230            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
231            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
232            if start > 0 {
233                let end = now();
234                let elapsed = std::time::Duration::from_secs((end - start) as u64);
235                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
236            }
237        }
238
239        // Log whether the round is going to update.
240        if current_round % 2 == 0 {
241            // Determine if there is a leader certificate.
242            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
243                // Ensure the state of the leader certificate is consistent with the BFT being ready.
244                if !is_ready {
245                    trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
246                }
247                // Log the leader election.
248                let leader_round = leader_certificate.round();
249                match leader_round == current_round {
250                    true => {
251                        info!("\n\nRound {current_round} elected a leader - {}\n", leader_certificate.author());
252                        #[cfg(feature = "metrics")]
253                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
254                    }
255                    false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
256                }
257            } else {
258                match is_ready {
259                    true => info!("\n\nRound {current_round} reached quorum without a leader\n"),
260                    false => info!("{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed()),
261                }
262            }
263        }
264
265        // If the BFT is ready, then update to the next round.
266        if is_ready {
267            // Update to the next round in storage.
268            if let Err(e) = self.storage().increment_to_next_round(current_round) {
269                warn!("BFT failed to increment to the next round from round {current_round} - {e}");
270                return false;
271            }
272            // Update the timer for the leader certificate.
273            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
274        }
275
276        is_ready
277    }
278
279    /// Updates the leader certificate to the current even round,
280    /// returning `true` if the BFT is ready to update to the next round.
281    ///
282    /// This method runs on every even round, by determining the leader of the current even round,
283    /// and setting the leader certificate to their certificate in the round, if they were present.
284    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
285        let _guard = self.get_tracing_guard();
286        // Retrieve the current round.
287        let current_round = self.storage().current_round();
288        // Ensure the current round matches the given round.
289        if current_round != even_round {
290            warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
291            return false;
292        }
293
294        // If the current round is odd, return false.
295        if current_round % 2 != 0 || current_round < 2 {
296            error!("BFT cannot update the leader certificate in an odd round");
297            return false;
298        }
299
300        // Retrieve the certificates for the current round.
301        let current_certificates = self.storage().get_certificates_for_round(current_round);
302        // If there are no current certificates, set the leader certificate to 'None', and return early.
303        if current_certificates.is_empty() {
304            // Set the leader certificate to 'None'.
305            *self.leader_certificate.write() = None;
306            return false;
307        }
308
309        // Retrieve the committee lookback of the current round.
310        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
311            Ok(committee) => committee,
312            Err(e) => {
313                error!("BFT failed to retrieve the committee lookback for the even round {current_round} - {e}");
314                return false;
315            }
316        };
317        // Determine the leader of the current round.
318        let leader = match self.ledger().latest_leader() {
319            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
320            _ => {
321                // Compute the leader for the current round.
322                let computed_leader = self.primary.account().address();
323                // let computed_leader = match committee_lookback.get_leader(current_round) {
324                //     Ok(leader) => leader,
325                //     Err(e) => {
326                //         error!("BFT failed to compute the leader for the even round {current_round} - {e}");
327                //         return false;
328                //     }
329                // };
330
331                // Cache the computed leader.
332                self.ledger().update_latest_leader(current_round, computed_leader);
333
334                computed_leader
335            }
336        };
337        // Find and set the leader certificate, if the leader was present in the current even round.
338        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
339        *self.leader_certificate.write() = leader_certificate.cloned();
340
341        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
342    }
343
344    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
345    ///  - If the leader certificate is set for the current even round.
346    ///  - The timer for the leader certificate has expired.
347    fn is_even_round_ready_for_next_round(
348        &self,
349        certificates: IndexSet<BatchCertificate<N>>,
350        committee: Committee<N>,
351        current_round: u64,
352    ) -> bool {
353        let _guard = self.get_tracing_guard();
354
355        // Retrieve the authors for the current round.
356        let authors = certificates.into_iter().map(|c| c.author()).collect();
357        // Check if quorum threshold is reached.
358        if !committee.is_quorum_threshold_reached(&authors) {
359            trace!("BFT failed to reach quorum threshold in even round {current_round}");
360            return false;
361        }
362        // If the leader certificate is set for the current even round, return 'true'.
363        if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
364            if leader_certificate.round() == current_round {
365                return true;
366            }
367        }
368        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
369        if self.is_timer_expired() {
370            debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
371            return true;
372        }
373        // Otherwise, return 'false'.
374        false
375    }
376
377    /// Returns `true` if the timer for the leader certificate has expired.
378    fn is_timer_expired(&self) -> bool {
379        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
380    }
381
382    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
383    ///  - The leader certificate is `None`.
384    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
385    ///  - The leader certificate timer has expired.
386    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
387        let _guard = self.get_tracing_guard();
388
389        // Retrieve the current round.
390        let current_round = self.storage().current_round();
391        // Ensure the current round matches the given round.
392        if current_round != odd_round {
393            warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
394            return false;
395        }
396        // If the current round is even, return false.
397        if current_round % 2 != 1 {
398            error!("BFT does not compute stakes for the leader certificate in an even round");
399            return false;
400        }
401        // Retrieve the certificates for the current round.
402        let current_certificates = self.storage().get_certificates_for_round(current_round);
403        // Retrieve the committee lookback for the current round.
404        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
405            Ok(committee) => committee,
406            Err(e) => {
407                error!("BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}");
408                return false;
409            }
410        };
411        // Retrieve the authors of the current certificates.
412        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
413        // Check if quorum threshold is reached.
414        if !committee_lookback.is_quorum_threshold_reached(&authors) {
415            trace!("BFT failed reach quorum threshold in odd round {current_round}. ");
416            return false;
417        }
418        // Retrieve the leader certificate.
419        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
420            // If there is no leader certificate for the previous round, return 'true'.
421            return true;
422        };
423        // Compute the stake for the leader certificate.
424        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
425            leader_certificate.id(),
426            current_certificates,
427            &committee_lookback,
428        );
429        // Return 'true' if any of the following conditions hold:
430        stake_with_leader >= committee_lookback.availability_threshold()
431            || stake_without_leader >= committee_lookback.quorum_threshold()
432            || self.is_timer_expired()
433    }
434
435    /// Computes the amount of stake that has & has not signed for the leader certificate.
436    fn compute_stake_for_leader_certificate(
437        &self,
438        leader_certificate_id: Field<N>,
439        current_certificates: IndexSet<BatchCertificate<N>>,
440        current_committee: &Committee<N>,
441    ) -> (u64, u64) {
442        // If there are no current certificates, return early.
443        if current_certificates.is_empty() {
444            return (0, 0);
445        }
446
447        // Initialize a tracker for the stake with the leader.
448        let mut stake_with_leader = 0u64;
449        // Initialize a tracker for the stake without the leader.
450        let mut stake_without_leader = 0u64;
451        // Iterate over the current certificates.
452        for certificate in current_certificates {
453            // Retrieve the stake for the author of the certificate.
454            let stake = current_committee.get_stake(certificate.author());
455            // Determine if the certificate includes the leader.
456            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
457                // If the certificate includes the leader, add the stake to the stake with the leader.
458                true => stake_with_leader = stake_with_leader.saturating_add(stake),
459                // If the certificate does not include the leader, add the stake to the stake without the leader.
460                false => stake_without_leader = stake_without_leader.saturating_add(stake),
461            }
462        }
463        // Return the stake with the leader, and the stake without the leader.
464        (stake_with_leader, stake_without_leader)
465    }
466}
467
468impl<N: Network> BFT<N> {
469    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
470    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
471        &self,
472        certificate: BatchCertificate<N>,
473    ) -> Result<()> {
474        // Acquire the BFT lock.
475        let _lock = self.lock.lock().await;
476        let _guard = self.get_tracing_guard();
477
478        // Retrieve the certificate round.
479        let certificate_round = certificate.round();
480        // Insert the certificate into the DAG.
481        self.dag.write().insert(certificate, self.tracing.clone());
482
483        // Construct the commit round.
484        let commit_round = certificate_round.saturating_sub(1);
485        // If the commit round is odd, return early.
486        if commit_round % 2 != 0 || commit_round < 2 {
487            return Ok(());
488        }
489        // If the commit round is at or below the last committed round, return early.
490        if commit_round <= self.dag.read().last_committed_round() {
491            return Ok(());
492        }
493
494        /* Proceeding to check if the leader is ready to be committed. */
495        info!("Checking if the leader is ready to be committed for round {commit_round}...");
496
497        // Retrieve the committee lookback for the commit round.
498        let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
499            bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
500        };
501
502        // Either retrieve the cached leader or compute it.
503        let leader = match self.ledger().latest_leader() {
504            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
505            _ => {
506                // Compute the leader for the commit round.
507                let computed_leader = self.primary.account().address();
508                // let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
509                //     bail!("BFT failed to compute the leader for commit round {commit_round}");
510                // };
511
512                // Cache the computed leader.
513                self.ledger().update_latest_leader(commit_round, computed_leader);
514
515                computed_leader
516            }
517        };
518
519        // Retrieve the leader certificate for the commit round.
520        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
521        else {
522            trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
523            return Ok(());
524        };
525        // Retrieve all of the certificates for the **certificate** round.
526        let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
527            // TODO (howardwu): Investigate how many certificates we should have at this point.
528            bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
529        };
530        // Construct a set over the authors who included the leader's certificate in the certificate round.
531        let authors = certificates
532            .values()
533            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
534                true => Some(c.author()),
535                false => None,
536            })
537            .collect();
538        // Check if the leader is ready to be committed.
539        if !committee_lookback.is_availability_threshold_reached(&authors) {
540            // If the leader is not ready to be committed, return early.
541            trace!("BFT is not ready to commit {commit_round}");
542            return Ok(());
543        }
544
545        /* Proceeding to commit the leader. */
546        info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
547
548        // Commit the leader certificate, and all previous leader certificates since the last committed round.
549        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
550    }
551
552    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
553    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
554        &self,
555        leader_certificate: BatchCertificate<N>,
556    ) -> Result<()> {
557        let _guard = self.get_tracing_guard();
558
559        // Fetch the leader round.
560        let latest_leader_round = leader_certificate.round();
561        // Determine the list of all previous leader certificates since the last committed round.
562        // The order of the leader certificates is from **newest** to **oldest**.
563        let mut leader_certificates = vec![leader_certificate.clone()];
564        {
565            // Retrieve the leader round.
566            let leader_round = leader_certificate.round();
567
568            let mut current_certificate = leader_certificate;
569            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
570            {
571                // // Retrieve the previous committee for the leader round.
572                // let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
573                //     Ok(committee) => committee,
574                //     Err(e) => {
575                //         bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
576                //     }
577                // };
578
579                // Either retrieve the cached leader or compute it.
580                let leader = match self.ledger().latest_leader() {
581                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
582                    _ => {
583                        // Compute the leader for the commit round.
584                        let computed_leader = self.primary.account().address();
585                        // let computed_leader = match previous_committee_lookback.get_leader(round) {
586                        //     Ok(leader) => leader,
587                        //     Err(e) => {
588                        //         bail!("BFT failed to compute the leader for the even round {round} - {e}");
589                        //     }
590                        // };
591
592                        // Cache the computed leader.
593                        self.ledger().update_latest_leader(round, computed_leader);
594
595                        computed_leader
596                    }
597                };
598                // Retrieve the previous leader certificate.
599                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
600                else {
601                    continue;
602                };
603                // Determine if there is a path between the previous certificate and the current certificate.
604                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
605                    // Add the previous leader certificate to the list of certificates to commit.
606                    leader_certificates.push(previous_certificate.clone());
607                    // Update the current certificate to the previous leader certificate.
608                    current_certificate = previous_certificate;
609                }
610            }
611        }
612
613        // Iterate over the leader certificates to commit.
614        for leader_certificate in leader_certificates.into_iter().rev() {
615            // Retrieve the leader certificate round.
616            let leader_round = leader_certificate.round();
617            // Compute the commit subdag.
618            let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
619                Ok(subdag) => subdag,
620                Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
621            };
622            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
623            if !IS_SYNCING {
624                // Initialize a map for the deduped transmissions.
625                let mut transmissions = IndexMap::new();
626                // Initialize a map for the deduped transaction ids.
627                let mut seen_transaction_ids = IndexSet::new();
628                // Initialize a map for the deduped solution ids.
629                let mut seen_solution_ids = IndexSet::new();
630                // Start from the oldest leader certificate.
631                for certificate in commit_subdag.values().flatten() {
632                    // Retrieve the transmissions.
633                    for transmission_id in certificate.transmission_ids() {
634                        // If the transaction ID or solution ID already exists in the map, skip it.
635                        // Note: This additional check is done to ensure that we do not include duplicate
636                        // transaction IDs or solution IDs that may have a different transmission ID.
637                        match transmission_id {
638                            TransmissionID::Solution(solution_id, _) => {
639                                // If the solution already exists, skip it.
640                                if seen_solution_ids.contains(&solution_id) {
641                                    continue;
642                                }
643                            }
644                            TransmissionID::Transaction(transaction_id, _) => {
645                                // If the transaction already exists, skip it.
646                                if seen_transaction_ids.contains(transaction_id) {
647                                    continue;
648                                }
649                            }
650                            TransmissionID::Ratification => {
651                                bail!("Ratifications are currently not supported in the BFT.")
652                            }
653                        }
654                        // If the transmission already exists in the map, skip it.
655                        if transmissions.contains_key(transmission_id) {
656                            continue;
657                        }
658                        // If the transmission already exists in the ledger, skip it.
659                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
660                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
661                            continue;
662                        }
663                        // Retrieve the transmission.
664                        let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
665                            bail!(
666                                "BFT failed to retrieve transmission '{}.{}' from round {}",
667                                fmt_id(transmission_id),
668                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
669                                certificate.round()
670                            );
671                        };
672                        // Insert the transaction ID or solution ID into the map.
673                        match transmission_id {
674                            TransmissionID::Solution(id, _) => {
675                                seen_solution_ids.insert(id);
676                            }
677                            TransmissionID::Transaction(id, _) => {
678                                seen_transaction_ids.insert(id);
679                            }
680                            TransmissionID::Ratification => {}
681                        }
682                        // Add the transmission to the set.
683                        transmissions.insert(*transmission_id, transmission);
684                    }
685                }
686                // Trigger consensus, as this will build a new block for the ledger.
687                // Construct the subdag.
688                let subdag = Subdag::from(commit_subdag.clone())?;
689                // Retrieve the anchor round.
690                let anchor_round = subdag.anchor_round();
691                // Retrieve the number of transmissions.
692                let num_transmissions = transmissions.len();
693                // Retrieve metadata about the subdag.
694                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
695
696                // Ensure the subdag anchor round matches the leader round.
697                ensure!(
698                    anchor_round == leader_round,
699                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
700                );
701
702                // Trigger consensus.
703                if let Some(consensus_sender) = self.consensus_sender.get() {
704                    // Initialize a callback sender and receiver.
705                    let (callback_sender, callback_receiver) = oneshot::channel();
706                    // Send the subdag and transmissions to consensus.
707                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
708                    // Await the callback to continue.
709                    match callback_receiver.await {
710                        Ok(Ok(())) => (), // continue
711                        Ok(Err(e)) => {
712                            error!("BFT failed to advance the subdag for round {anchor_round} - {e}");
713                            return Ok(());
714                        }
715                        Err(e) => {
716                            error!("BFT failed to receive the callback for round {anchor_round} - {e}");
717                            return Ok(());
718                        }
719                    }
720                }
721
722                info!(
723                    "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
724                );
725            }
726
727            // Update the DAG, as the subdag was successfully included into a block.
728            let mut dag_write = self.dag.write();
729            for certificate in commit_subdag.values().flatten() {
730                dag_write.commit(certificate, self.storage().max_gc_rounds());
731            }
732        }
733
734        // Perform garbage collection based on the latest committed leader round.
735        self.storage().garbage_collect_certificates(latest_leader_round);
736
737        Ok(())
738    }
739
740    /// Returns the subdag of batch certificates to commit.
741    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
742        &self,
743        leader_certificate: BatchCertificate<N>,
744    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
745        // Initialize a map for the certificates to commit.
746        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
747        // Initialize a set for the already ordered certificates.
748        let mut already_ordered = HashSet::new();
749        // Initialize a buffer for the certificates to order.
750        let mut buffer = vec![leader_certificate];
751        // Iterate over the certificates to order.
752        while let Some(certificate) = buffer.pop() {
753            // Insert the certificate into the map.
754            commit.entry(certificate.round()).or_default().insert(certificate.clone());
755
756            // Check if the previous certificate is below the GC round.
757            let previous_round = certificate.round().saturating_sub(1);
758            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
759                continue;
760            }
761            // Iterate over the previous certificate IDs.
762            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
763            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
764            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
765                // If the previous certificate is already ordered, continue.
766                if already_ordered.contains(previous_certificate_id) {
767                    continue;
768                }
769                // If the previous certificate was recently committed, continue.
770                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
771                    continue;
772                }
773                // If the previous certificate already exists in the ledger, continue.
774                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
775                    continue;
776                }
777
778                // Retrieve the previous certificate.
779                let previous_certificate = {
780                    // Start by retrieving the previous certificate from the DAG.
781                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
782                        // If the previous certificate is found, return it.
783                        Some(previous_certificate) => previous_certificate,
784                        // If the previous certificate is not found, retrieve it from the storage.
785                        None => match self.storage().get_certificate(*previous_certificate_id) {
786                            // If the previous certificate is found, return it.
787                            Some(previous_certificate) => previous_certificate,
788                            // Otherwise, the previous certificate is missing, and throw an error.
789                            None => bail!(
790                                "Missing previous certificate {} for round {previous_round}",
791                                fmt_id(previous_certificate_id)
792                            ),
793                        },
794                    }
795                };
796                // Insert the previous certificate into the set of already ordered certificates.
797                already_ordered.insert(previous_certificate.id());
798                // Insert the previous certificate into the buffer.
799                buffer.push(previous_certificate);
800            }
801        }
802        // Ensure we only retain certificates that are above the GC round.
803        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
804        // Return the certificates to commit.
805        Ok(commit)
806    }
807
808    /// Returns `true` if there is a path from the previous certificate to the current certificate.
809    fn is_linked(
810        &self,
811        previous_certificate: BatchCertificate<N>,
812        current_certificate: BatchCertificate<N>,
813    ) -> Result<bool> {
814        // Initialize the list containing the traversal.
815        let mut traversal = vec![current_certificate.clone()];
816        // Iterate over the rounds from the current certificate to the previous certificate.
817        for round in (previous_certificate.round()..current_certificate.round()).rev() {
818            // Retrieve all of the certificates for this past round.
819            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
820                // This is a critical error, as the traversal should have these certificates.
821                // If this error is hit, it is likely that the maximum GC rounds should be increased.
822                bail!("BFT failed to retrieve the certificates for past round {round}");
823            };
824            // Filter the certificates to only include those that are in the traversal.
825            traversal = certificates
826                .into_values()
827                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
828                .collect();
829        }
830        Ok(traversal.contains(&previous_certificate))
831    }
832}
833
834impl<N: Network> BFT<N> {
835    /// Starts the BFT handlers.
836    fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
837        let BFTReceiver {
838            mut rx_primary_round,
839            mut rx_primary_certificate,
840            mut rx_sync_bft_dag_at_bootup,
841            mut rx_sync_bft,
842        } = bft_receiver;
843
844        // Process the current round from the primary.
845        let self_ = self.clone();
846        self.spawn(async move {
847            while let Some((current_round, callback)) = rx_primary_round.recv().await {
848                callback.send(self_.update_to_next_round(current_round)).ok();
849            }
850        });
851
852        // Process the certificate from the primary.
853        let self_ = self.clone();
854        self.spawn(async move {
855            while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
856                // Update the DAG with the certificate.
857                let result = self_.update_dag::<true, false>(certificate).await;
858                // Send the callback **after** updating the DAG.
859                // Note: We must await the DAG update before proceeding.
860                callback.send(result).ok();
861            }
862        });
863
864        // Process the request to sync the BFT DAG at bootup.
865        let self_ = self.clone();
866        self.spawn(async move {
867            while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
868                self_.sync_bft_dag_at_bootup(certificates).await;
869            }
870        });
871
872        // Process the request to sync the BFT.
873        let self_ = self.clone();
874        self.spawn(async move {
875            while let Some((certificate, callback)) = rx_sync_bft.recv().await {
876                // Update the DAG with the certificate.
877                let result = self_.update_dag::<true, true>(certificate).await;
878                // Send the callback **after** updating the DAG.
879                // Note: We must await the DAG update before proceeding.
880                callback.send(result).ok();
881            }
882        });
883    }
884
885    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
886    /// already exist in the ledger.
887    ///
888    /// This method commits all the certificates into the DAG.
889    /// Note that there is no need to insert the certificates into the DAG, because these certificates
890    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
891    async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
892        // Acquire the BFT write lock.
893        let mut dag = self.dag.write();
894
895        // Commit all the certificates.
896        for certificate in certificates {
897            dag.commit(&certificate, self.storage().max_gc_rounds());
898        }
899    }
900
901    /// Spawns a task with the given future; it should only be used for long-running tasks.
902    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
903        self.handles.lock().push(tokio::spawn(future));
904    }
905
906    /// Shuts down the BFT.
907    pub async fn shut_down(&self) {
908        let _guard = self.get_tracing_guard();
909        info!("Shutting down the BFT...");
910        // Acquire the lock.
911        let _lock = self.lock.lock().await;
912        // Shut down the primary.
913        self.primary.shut_down().await;
914        // Abort the tasks.
915        self.handles.lock().iter().for_each(|handle| handle.abort());
916    }
917}
918
919#[cfg(test)]
920mod tests {
921    use crate::{
922        BFT,
923        DEVELOPMENT_MODE_RNG_SEED,
924        MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
925        helpers::{Storage, amareleo_storage_mode, default_ledger_dir},
926    };
927
928    use amareleo_chain_account::Account;
929    use amareleo_node_bft_ledger_service::MockLedgerService;
930    use amareleo_node_bft_storage_service::BFTMemoryService;
931    use snarkvm::{
932        console::account::{Address, PrivateKey},
933        ledger::{
934            committee::Committee,
935            narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
936        },
937        utilities::TestRng,
938    };
939
940    use aleo_std::StorageMode;
941    use anyhow::Result;
942    use indexmap::{IndexMap, IndexSet};
943    use rand::SeedableRng;
944    use rand_chacha::ChaChaRng;
945    use std::sync::Arc;
946
947    type CurrentNetwork = snarkvm::console::network::MainnetV0;
948
949    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
950    fn sample_test_instance(
951        committee_round: Option<u64>,
952        max_gc_rounds: u64,
953        rng: &mut TestRng,
954    ) -> (
955        Committee<CurrentNetwork>,
956        Account<CurrentNetwork>,
957        Arc<MockLedgerService<CurrentNetwork>>,
958        Storage<CurrentNetwork>,
959    ) {
960        let committee = match committee_round {
961            Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
962            None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
963        };
964        let account = Account::new(rng).unwrap();
965        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
966        let transmissions = Arc::new(BFTMemoryService::new());
967        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
968
969        (committee, account, ledger, storage)
970    }
971
972    #[test]
973    #[tracing_test::traced_test]
974    fn test_is_leader_quorum_odd() -> Result<()> {
975        let rng = &mut TestRng::default();
976
977        // Sample batch certificates.
978        let mut certificates = IndexSet::new();
979        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
980        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
981        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
982        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
983
984        // Initialize the committee.
985        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
986            1,
987            vec![
988                certificates[0].author(),
989                certificates[1].author(),
990                certificates[2].author(),
991                certificates[3].author(),
992            ],
993            rng,
994        );
995
996        // Initialize the ledger.
997        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
998        // Initialize the storage.
999        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1000        // Initialize the account.
1001        let account = Account::new(rng)?;
1002        // Initialize the BFT.
1003        let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1004        assert!(bft.is_timer_expired());
1005        // Ensure this call succeeds on an odd round.
1006        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1007        // If timer has expired but quorum threshold is not reached, return 'false'.
1008        assert!(!result);
1009        // Insert certificates into storage.
1010        for certificate in certificates.iter() {
1011            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1012        }
1013        // Ensure this call succeeds on an odd round.
1014        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1015        assert!(result); // no previous leader certificate
1016        // Set the leader certificate.
1017        let leader_certificate = sample_batch_certificate(rng);
1018        *bft.leader_certificate.write() = Some(leader_certificate);
1019        // Ensure this call succeeds on an odd round.
1020        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1021        assert!(result); // should now fall through to the end of function
1022
1023        Ok(())
1024    }
1025
1026    #[test]
1027    #[tracing_test::traced_test]
1028    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1029        let rng = &mut TestRng::default();
1030
1031        // Sample the test instance.
1032        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1033        assert_eq!(committee.starting_round(), 1);
1034        assert_eq!(storage.current_round(), 1);
1035        assert_eq!(storage.max_gc_rounds(), 10);
1036
1037        // Initialize the BFT.
1038        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1039        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1040
1041        // Store is at round 1, and we are checking for round 2.
1042        // Ensure this call fails on an even round.
1043        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1044        assert!(!result);
1045        Ok(())
1046    }
1047
1048    #[test]
1049    #[tracing_test::traced_test]
1050    fn test_is_leader_quorum_even() -> Result<()> {
1051        let rng = &mut TestRng::default();
1052
1053        // Sample the test instance.
1054        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1055        assert_eq!(committee.starting_round(), 2);
1056        assert_eq!(storage.current_round(), 2);
1057        assert_eq!(storage.max_gc_rounds(), 10);
1058
1059        // Initialize the BFT.
1060        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1061        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1062
1063        // Ensure this call fails on an even round.
1064        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1065        assert!(!result);
1066        Ok(())
1067    }
1068
1069    #[test]
1070    #[tracing_test::traced_test]
1071    fn test_is_even_round_ready() -> Result<()> {
1072        let rng = &mut TestRng::default();
1073
1074        // Sample batch certificates.
1075        let mut certificates = IndexSet::new();
1076        certificates.insert(sample_batch_certificate_for_round(2, rng));
1077        certificates.insert(sample_batch_certificate_for_round(2, rng));
1078        certificates.insert(sample_batch_certificate_for_round(2, rng));
1079        certificates.insert(sample_batch_certificate_for_round(2, rng));
1080
1081        // Initialize the committee.
1082        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1083            2,
1084            vec![
1085                certificates[0].author(),
1086                certificates[1].author(),
1087                certificates[2].author(),
1088                certificates[3].author(),
1089            ],
1090            rng,
1091        );
1092
1093        // Initialize the ledger.
1094        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1095        // Initialize the storage.
1096        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1097        // Initialize the account.
1098        let account = Account::new(rng)?;
1099        // Initialize the BFT.
1100        let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1101        // Set the leader certificate.
1102        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1103        *bft.leader_certificate.write() = Some(leader_certificate);
1104        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1105        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1106        assert!(!result);
1107        // Once quorum threshold is reached, we are ready for the next round.
1108        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1109        assert!(result);
1110
1111        // Initialize a new BFT.
1112        let bft_timer =
1113            BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1114        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1115        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1116        if !bft_timer.is_timer_expired() {
1117            assert!(!result);
1118        }
1119        // Wait for the timer to expire.
1120        let leader_certificate_timeout =
1121            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1122        std::thread::sleep(leader_certificate_timeout);
1123        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1124        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1125        if bft_timer.is_timer_expired() {
1126            assert!(result);
1127        } else {
1128            assert!(!result);
1129        }
1130
1131        Ok(())
1132    }
1133
1134    #[test]
1135    #[tracing_test::traced_test]
1136    fn test_update_leader_certificate_odd() -> Result<()> {
1137        let rng = &mut TestRng::default();
1138
1139        // Sample the test instance.
1140        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1141        assert_eq!(storage.max_gc_rounds(), 10);
1142
1143        // Initialize the BFT.
1144        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1145
1146        // Ensure this call fails on an odd round.
1147        let result = bft.update_leader_certificate_to_even_round(1);
1148        assert!(!result);
1149        Ok(())
1150    }
1151
1152    #[test]
1153    #[tracing_test::traced_test]
1154    fn test_update_leader_certificate_bad_round() -> Result<()> {
1155        let rng = &mut TestRng::default();
1156
1157        // Sample the test instance.
1158        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1159        assert_eq!(storage.max_gc_rounds(), 10);
1160
1161        // Initialize the BFT.
1162        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1163
1164        // Ensure this call succeeds on an even round.
1165        let result = bft.update_leader_certificate_to_even_round(6);
1166        assert!(!result);
1167        Ok(())
1168    }
1169
1170    #[test]
1171    #[tracing_test::traced_test]
1172    fn test_update_leader_certificate_even() -> Result<()> {
1173        let rng = &mut TestRng::default();
1174
1175        // Set the current round.
1176        let current_round = 3;
1177
1178        // Sample the certificates.
1179        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1180            current_round,
1181            rng,
1182        );
1183
1184        // Initialize the committee.
1185        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1186            2,
1187            vec![
1188                certificates[0].author(),
1189                certificates[1].author(),
1190                certificates[2].author(),
1191                certificates[3].author(),
1192            ],
1193            rng,
1194        );
1195
1196        // Initialize the ledger.
1197        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1198
1199        // Initialize the storage.
1200        let transmissions = Arc::new(BFTMemoryService::new());
1201        let storage = Storage::new(ledger.clone(), transmissions, 10, None);
1202        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1203        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1204        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1205        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1206        assert_eq!(storage.current_round(), 2);
1207
1208        // Retrieve the leader certificate.
1209        let leader = certificates[0].author(); // committee.get_leader(2).unwrap();
1210        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1211
1212        // Initialize the BFT.
1213        let account = Account::new(rng)?;
1214        let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1215
1216        // Set the leader certificate.
1217        *bft.leader_certificate.write() = Some(leader_certificate);
1218
1219        // Update the leader certificate.
1220        // Ensure this call succeeds on an even round.
1221        let result = bft.update_leader_certificate_to_even_round(2);
1222        assert!(result);
1223
1224        Ok(())
1225    }
1226
1227    #[tokio::test]
1228    #[tracing_test::traced_test]
1229    async fn test_order_dag_with_dfs() -> Result<()> {
1230        let rng = &mut TestRng::default();
1231
1232        // Sample the test instance.
1233        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1234
1235        // Initialize the round parameters.
1236        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1237        let current_round = previous_round + 1;
1238
1239        // Sample the current certificate and previous certificates.
1240        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1241            current_round,
1242            rng,
1243        );
1244
1245        /* Test GC */
1246
1247        // Ensure the function succeeds in returning only certificates above GC.
1248        {
1249            // Initialize the storage.
1250            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1251            // Initialize the BFT.
1252            let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1253
1254            // Insert a mock DAG in the BFT.
1255            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1256
1257            // Insert the previous certificates into the BFT.
1258            for certificate in previous_certificates.clone() {
1259                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1260            }
1261
1262            // Ensure this call succeeds and returns all given certificates.
1263            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1264            assert!(result.is_ok());
1265            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1266            assert_eq!(candidate_certificates.len(), 1);
1267            let expected_certificates = vec![certificate.clone()];
1268            assert_eq!(
1269                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1270                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1271            );
1272            assert_eq!(candidate_certificates, expected_certificates);
1273        }
1274
1275        /* Test normal case */
1276
1277        // Ensure the function succeeds in returning all given certificates.
1278        {
1279            // Initialize the storage.
1280            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1281            // Initialize the BFT.
1282            let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1283
1284            // Insert a mock DAG in the BFT.
1285            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1286
1287            // Insert the previous certificates into the BFT.
1288            for certificate in previous_certificates.clone() {
1289                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1290            }
1291
1292            // Ensure this call succeeds and returns all given certificates.
1293            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1294            assert!(result.is_ok());
1295            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1296            assert_eq!(candidate_certificates.len(), 5);
1297            let expected_certificates = vec![
1298                previous_certificates[0].clone(),
1299                previous_certificates[1].clone(),
1300                previous_certificates[2].clone(),
1301                previous_certificates[3].clone(),
1302                certificate,
1303            ];
1304            assert_eq!(
1305                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1306                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1307            );
1308            assert_eq!(candidate_certificates, expected_certificates);
1309        }
1310
1311        Ok(())
1312    }
1313
1314    #[test]
1315    #[tracing_test::traced_test]
1316    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1317        let rng = &mut TestRng::default();
1318
1319        // Sample the test instance.
1320        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1321        assert_eq!(committee.starting_round(), 1);
1322        assert_eq!(storage.current_round(), 1);
1323        assert_eq!(storage.max_gc_rounds(), 1);
1324
1325        // Initialize the round parameters.
1326        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1327        let current_round = previous_round + 1;
1328
1329        // Sample the current certificate and previous certificates.
1330        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1331            current_round,
1332            rng,
1333        );
1334        // Construct the previous certificate IDs.
1335        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1336
1337        /* Test missing previous certificate. */
1338
1339        // Initialize the BFT.
1340        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1341
1342        // The expected error message.
1343        let error_msg = format!(
1344            "Missing previous certificate {} for round {previous_round}",
1345            crate::helpers::fmt_id(previous_certificate_ids[3]),
1346        );
1347
1348        // Ensure this call fails on a missing previous certificate.
1349        let result = bft.order_dag_with_dfs::<false>(certificate);
1350        assert!(result.is_err());
1351        assert_eq!(result.unwrap_err().to_string(), error_msg);
1352        Ok(())
1353    }
1354
1355    #[tokio::test]
1356    #[tracing_test::traced_test]
1357    async fn test_bft_gc_on_commit() -> Result<()> {
1358        let rng = &mut TestRng::default();
1359
1360        // Initialize the round parameters.
1361        let max_gc_rounds = 1;
1362        let committee_round = 0;
1363        let commit_round = 2;
1364        let current_round = commit_round + 1;
1365
1366        // Sample the certificates.
1367        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1368            current_round,
1369            rng,
1370        );
1371
1372        // Initialize the committee.
1373        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1374            committee_round,
1375            vec![
1376                certificates[0].author(),
1377                certificates[1].author(),
1378                certificates[2].author(),
1379                certificates[3].author(),
1380            ],
1381            rng,
1382        );
1383
1384        // Initialize the ledger.
1385        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1386
1387        // Initialize the storage.
1388        let transmissions = Arc::new(BFTMemoryService::new());
1389        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
1390        // Insert the certificates into the storage.
1391        for certificate in certificates.iter() {
1392            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1393        }
1394
1395        // Get the leader certificate.
1396        let leader = certificates[0].author(); // committee.get_leader(commit_round).unwrap();
1397        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1398
1399        // Initialize the BFT.
1400        let account = Account::new(rng)?;
1401        let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1402        // Insert a mock DAG in the BFT.
1403        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1404
1405        // Ensure that the `gc_round` has not been updated yet.
1406        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1407
1408        // Insert the certificates into the BFT.
1409        for certificate in certificates {
1410            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1411        }
1412
1413        // Commit the leader certificate.
1414        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1415
1416        // Ensure that the `gc_round` has been updated.
1417        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1418
1419        Ok(())
1420    }
1421
1422    #[tokio::test]
1423    #[tracing_test::traced_test]
1424    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1425        let rng = &mut TestRng::default();
1426
1427        // Initialize the round parameters.
1428        let max_gc_rounds = 1;
1429        let committee_round = 0;
1430        let commit_round = 2;
1431        let current_round = commit_round + 1;
1432
1433        // Sample the current certificate and previous certificates.
1434        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1435            current_round,
1436            rng,
1437        );
1438
1439        // Initialize the committee.
1440        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1441            committee_round,
1442            vec![
1443                certificates[0].author(),
1444                certificates[1].author(),
1445                certificates[2].author(),
1446                certificates[3].author(),
1447            ],
1448            rng,
1449        );
1450
1451        // Initialize the ledger.
1452        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1453
1454        // Initialize the storage.
1455        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1456        // Insert the certificates into the storage.
1457        for certificate in certificates.iter() {
1458            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1459        }
1460
1461        // Get the leader certificate.
1462        let leader = certificates[0].author(); // committee.get_leader(commit_round).unwrap();
1463        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1464
1465        // Initialize the BFT.
1466        let account = Account::new(rng)?;
1467        let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1468
1469        // Insert a mock DAG in the BFT.
1470        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1471
1472        // Insert the previous certificates into the BFT.
1473        for certificate in certificates.clone() {
1474            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1475        }
1476
1477        // Commit the leader certificate.
1478        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1479
1480        // Simulate a bootup of the BFT.
1481
1482        // Initialize a new instance of storage.
1483        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1484        // Initialize a new instance of BFT.
1485        let bootup_bft = BFT::new(account, storage_2, false, StorageMode::Development(0), ledger, None)?;
1486
1487        // Sync the BFT DAG at bootup.
1488        bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1489
1490        // Check that the BFT starts from the same last committed round.
1491        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1492
1493        // Ensure that both BFTs have committed the leader certificate.
1494        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1495        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1496
1497        // Check the state of the bootup BFT.
1498        for certificate in certificates {
1499            let certificate_round = certificate.round();
1500            let certificate_id = certificate.id();
1501            // Check that the bootup BFT has committed the certificates.
1502            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1503            // Check that the bootup BFT does not contain the certificates in its graph, because
1504            // it should not need to order them again in subsequent subdags.
1505            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1506        }
1507
1508        Ok(())
1509    }
1510
1511    #[tokio::test]
1512    #[tracing_test::traced_test]
1513    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1514        /*
1515        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1516        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1517        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1518        */
1519
1520        let rng = &mut TestRng::default();
1521        let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1522
1523        let private_keys = vec![
1524            PrivateKey::new(rng_pks).unwrap(),
1525            PrivateKey::new(rng_pks).unwrap(),
1526            PrivateKey::new(rng_pks).unwrap(),
1527            PrivateKey::new(rng_pks).unwrap(),
1528        ];
1529        let address0 = Address::try_from(private_keys[0])?;
1530
1531        // Initialize the round parameters.
1532        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1533        let committee_round = 0;
1534        let commit_round = 2;
1535        let current_round = commit_round + 1;
1536        let next_round = current_round + 1;
1537
1538        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1539        let (round_to_certificates_map, committee) = {
1540            let addresses = vec![
1541                Address::try_from(private_keys[0])?,
1542                Address::try_from(private_keys[1])?,
1543                Address::try_from(private_keys[2])?,
1544                Address::try_from(private_keys[3])?,
1545            ];
1546            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1547                committee_round,
1548                addresses,
1549                rng,
1550            );
1551            // Initialize a mapping from the round number to the set of batch certificates in the round.
1552            let mut round_to_certificates_map: IndexMap<
1553                u64,
1554                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1555            > = IndexMap::new();
1556            let mut previous_certificates = IndexSet::with_capacity(4);
1557            // Initialize the genesis batch certificates.
1558            for _ in 0..4 {
1559                previous_certificates.insert(sample_batch_certificate(rng));
1560            }
1561            for round in 0..commit_round + 3 {
1562                let mut current_certificates = IndexSet::new();
1563                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1564                    IndexSet::new()
1565                } else {
1566                    previous_certificates.iter().map(|c| c.id()).collect()
1567                };
1568                let transmission_ids =
1569                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1570                        .into_iter()
1571                        .collect::<IndexSet<_>>();
1572                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1573                let committee_id = committee.id();
1574                for (i, private_key_1) in private_keys.iter().enumerate() {
1575                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1576                        private_key_1,
1577                        round,
1578                        timestamp,
1579                        committee_id,
1580                        transmission_ids.clone(),
1581                        previous_certificate_ids.clone(),
1582                        rng,
1583                    )
1584                    .unwrap();
1585                    let mut signatures = IndexSet::with_capacity(4);
1586                    for (j, private_key_2) in private_keys.iter().enumerate() {
1587                        if i != j {
1588                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1589                        }
1590                    }
1591                    let certificate =
1592                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1593                    current_certificates.insert(certificate);
1594                }
1595                // Update the mapping.
1596                round_to_certificates_map.insert(round, current_certificates.clone());
1597                previous_certificates = current_certificates.clone();
1598            }
1599            (round_to_certificates_map, committee)
1600        };
1601
1602        // Initialize the ledger.
1603        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1604        // Initialize the storage.
1605        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1606        // Get the leaders for the next 2 commit rounds.
1607        let leader = address0; // committee.get_leader(commit_round).unwrap();
1608        let next_leader = address0; // committee.get_leader(next_round).unwrap();
1609        // Insert the pre shutdown certificates into the storage.
1610        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1611        for i in 1..=commit_round {
1612            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1613            if i == commit_round {
1614                // Only insert the leader certificate for the commit round.
1615                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1616                if let Some(c) = leader_certificate {
1617                    pre_shutdown_certificates.push(c.clone());
1618                }
1619                continue;
1620            }
1621            pre_shutdown_certificates.extend(certificates);
1622        }
1623        for certificate in pre_shutdown_certificates.iter() {
1624            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1625        }
1626        // Insert the post shutdown certificates into the storage.
1627        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1628            Vec::new();
1629        for j in commit_round..=commit_round + 2 {
1630            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1631            post_shutdown_certificates.extend(certificate);
1632        }
1633        for certificate in post_shutdown_certificates.iter() {
1634            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1635        }
1636        // Get the leader certificates.
1637        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1638        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1639
1640        // Initialize the BFT without bootup.
1641        let account = Account::try_from(private_keys[0])?;
1642        let ledger_dir = default_ledger_dir(1, true, "0");
1643        let storage_mode = amareleo_storage_mode(ledger_dir);
1644        let bft = BFT::new(account.clone(), storage, true, storage_mode.clone(), ledger.clone(), None)?;
1645
1646        // Insert a mock DAG in the BFT without bootup.
1647        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1648
1649        // Insert the certificates into the BFT without bootup.
1650        for certificate in pre_shutdown_certificates.clone() {
1651            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1652        }
1653
1654        // Insert the post shutdown certificates into the BFT without bootup.
1655        for certificate in post_shutdown_certificates.clone() {
1656            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1657        }
1658        // Commit the second leader certificate.
1659        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1660        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1661        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1662
1663        // Simulate a bootup of the BFT.
1664
1665        // Initialize a new instance of storage.
1666        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1667
1668        // Initialize a new instance of BFT with bootup.
1669        let bootup_bft = BFT::new(account, bootup_storage.clone(), true, storage_mode.clone(), ledger.clone(), None)?;
1670
1671        // Sync the BFT DAG at bootup.
1672        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1673
1674        // Insert the post shutdown certificates to the storage and BFT with bootup.
1675        for certificate in post_shutdown_certificates.iter() {
1676            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1677        }
1678        for certificate in post_shutdown_certificates.clone() {
1679            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1680        }
1681        // Commit the second leader certificate.
1682        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1683        let commit_subdag_metadata_bootup =
1684            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1685        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1686        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1687
1688        // Check that the final state of both BFTs is the same.
1689
1690        // Check that both BFTs start from the same last committed round.
1691        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1692
1693        // Ensure that both BFTs have committed the leader certificates.
1694        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1695        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1696        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1697        assert!(
1698            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1699        );
1700
1701        // Check that the bootup BFT has committed the pre shutdown certificates.
1702        for certificate in pre_shutdown_certificates.clone() {
1703            let certificate_round = certificate.round();
1704            let certificate_id = certificate.id();
1705            // Check that both BFTs have committed the certificates.
1706            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1707            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1708            // Check that the bootup BFT does not contain the certificates in its graph, because
1709            // it should not need to order them again in subsequent subdags.
1710            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1711            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1712        }
1713
1714        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1715        for certificate in committed_certificates_bootup.clone() {
1716            let certificate_round = certificate.round();
1717            let certificate_id = certificate.id();
1718            // Check that the both BFTs have committed the certificates.
1719            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1720            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1721            // Check that the bootup BFT does not contain the certificates in its graph, because
1722            // it should not need to order them again in subsequent subdags.
1723            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1724            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1725        }
1726
1727        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1728        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1729
1730        Ok(())
1731    }
1732
1733    #[tokio::test]
1734    #[tracing_test::traced_test]
1735    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1736        /*
1737        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1738        2. Add post shutdown certificates to the bootup BFT.
1739        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1740        */
1741
1742        let rng = &mut TestRng::default();
1743        let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1744
1745        let private_keys = vec![
1746            PrivateKey::new(rng_pks).unwrap(),
1747            PrivateKey::new(rng_pks).unwrap(),
1748            PrivateKey::new(rng_pks).unwrap(),
1749            PrivateKey::new(rng_pks).unwrap(),
1750        ];
1751        let address0 = Address::try_from(private_keys[0])?;
1752
1753        // Initialize the round parameters.
1754        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1755        let committee_round = 0;
1756        let commit_round = 2;
1757        let current_round = commit_round + 1;
1758        let next_round = current_round + 1;
1759
1760        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1761        let (round_to_certificates_map, committee) = {
1762            let addresses = vec![
1763                Address::try_from(private_keys[0])?,
1764                Address::try_from(private_keys[1])?,
1765                Address::try_from(private_keys[2])?,
1766                Address::try_from(private_keys[3])?,
1767            ];
1768            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1769                committee_round,
1770                addresses,
1771                rng,
1772            );
1773            // Initialize a mapping from the round number to the set of batch certificates in the round.
1774            let mut round_to_certificates_map: IndexMap<
1775                u64,
1776                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1777            > = IndexMap::new();
1778            let mut previous_certificates = IndexSet::with_capacity(4);
1779            // Initialize the genesis batch certificates.
1780            for _ in 0..4 {
1781                previous_certificates.insert(sample_batch_certificate(rng));
1782            }
1783            for round in 0..=commit_round + 2 {
1784                let mut current_certificates = IndexSet::new();
1785                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1786                    IndexSet::new()
1787                } else {
1788                    previous_certificates.iter().map(|c| c.id()).collect()
1789                };
1790                let transmission_ids =
1791                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1792                        .into_iter()
1793                        .collect::<IndexSet<_>>();
1794                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1795                let committee_id = committee.id();
1796                for (i, private_key_1) in private_keys.iter().enumerate() {
1797                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1798                        private_key_1,
1799                        round,
1800                        timestamp,
1801                        committee_id,
1802                        transmission_ids.clone(),
1803                        previous_certificate_ids.clone(),
1804                        rng,
1805                    )
1806                    .unwrap();
1807                    let mut signatures = IndexSet::with_capacity(4);
1808                    for (j, private_key_2) in private_keys.iter().enumerate() {
1809                        if i != j {
1810                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1811                        }
1812                    }
1813                    let certificate =
1814                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1815                    current_certificates.insert(certificate);
1816                }
1817                // Update the mapping.
1818                round_to_certificates_map.insert(round, current_certificates.clone());
1819                previous_certificates = current_certificates.clone();
1820            }
1821            (round_to_certificates_map, committee)
1822        };
1823
1824        // Initialize the ledger.
1825        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1826        // Initialize the storage.
1827        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1828        // Get the leaders for the next 2 commit rounds.
1829        let leader = address0; // committee.get_leader(commit_round).unwrap();
1830        let next_leader = address0; // committee.get_leader(next_round).unwrap();
1831        // Insert the pre shutdown certificates into the storage.
1832        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1833        for i in 1..=commit_round {
1834            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1835            if i == commit_round {
1836                // Only insert the leader certificate for the commit round.
1837                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1838                if let Some(c) = leader_certificate {
1839                    pre_shutdown_certificates.push(c.clone());
1840                }
1841                continue;
1842            }
1843            pre_shutdown_certificates.extend(certificates);
1844        }
1845        for certificate in pre_shutdown_certificates.iter() {
1846            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1847        }
1848        // Initialize the bootup BFT.
1849        let account = Account::try_from(private_keys[0])?;
1850        let bootup_bft =
1851            BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1852        // Insert a mock DAG in the BFT without bootup.
1853        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1854        // Sync the BFT DAG at bootup.
1855        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1856
1857        // Insert the post shutdown certificates into the storage.
1858        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1859            Vec::new();
1860        for j in commit_round..=commit_round + 2 {
1861            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1862            post_shutdown_certificates.extend(certificate);
1863        }
1864        for certificate in post_shutdown_certificates.iter() {
1865            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1866        }
1867
1868        // Insert the post shutdown certificates into the DAG.
1869        for certificate in post_shutdown_certificates.clone() {
1870            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1871        }
1872
1873        // Get the next leader certificate to commit.
1874        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1875        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1876        let committed_certificates = commit_subdag.values().flatten();
1877
1878        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1879        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1880            for committed_certificate in committed_certificates.clone() {
1881                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1882            }
1883        }
1884        Ok(())
1885    }
1886}