amareleo_node_bft/
bft.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    Primary,
19    helpers::{
20        BFTReceiver,
21        ConsensusSender,
22        DAG,
23        PrimaryReceiver,
24        PrimarySender,
25        Storage,
26        fmt_id,
27        init_bft_channels,
28        now,
29    },
30};
31use aleo_std::StorageMode;
32use amareleo_chain_account::Account;
33use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
34use amareleo_node_bft_ledger_service::LedgerService;
35use snarkvm::{
36    console::account::Address,
37    ledger::{
38        block::Transaction,
39        committee::Committee,
40        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
41        puzzle::{Solution, SolutionID},
42    },
43    prelude::{Field, Network, Result, bail, ensure},
44};
45
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48#[cfg(feature = "locktick")]
49use locktick::{
50    parking_lot::{Mutex, RwLock},
51    tokio::Mutex as TMutex,
52};
53#[cfg(not(feature = "locktick"))]
54use parking_lot::{Mutex, RwLock};
55use std::{
56    collections::{BTreeMap, HashSet},
57    future::Future,
58    sync::{
59        Arc,
60        atomic::{AtomicI64, Ordering},
61    },
62};
63#[cfg(not(feature = "locktick"))]
64use tokio::sync::Mutex as TMutex;
65use tokio::{
66    sync::{OnceCell, oneshot},
67    task::JoinHandle,
68};
69use tracing::subscriber::DefaultGuard;
70
71#[derive(Clone)]
72pub struct BFT<N: Network> {
73    /// The primary.
74    primary: Primary<N>,
75    /// The DAG.
76    dag: Arc<RwLock<DAG<N>>>,
77    /// The batch certificate of the leader from the current even round, if one was present.
78    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
79    /// The timer for the leader certificate to be received.
80    leader_certificate_timer: Arc<AtomicI64>,
81    /// The consensus sender.
82    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
83    /// Tracing handle
84    tracing: Option<TracingHandler>,
85    /// The spawned handles.
86    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
87    /// The BFT lock.
88    bft_lock: Arc<TMutex<()>>,
89}
90
91impl<N: Network> TracingHandlerGuard for BFT<N> {
92    /// Retruns tracing guard
93    fn get_tracing_guard(&self) -> Option<DefaultGuard> {
94        self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
95    }
96}
97
98impl<N: Network> BFT<N> {
99    /// Initializes a new instance of the BFT.
100    pub fn new(
101        account: Account<N>,
102        storage: Storage<N>,
103        keep_state: bool,
104        storage_mode: StorageMode,
105        ledger: Arc<dyn LedgerService<N>>,
106        tracing: Option<TracingHandler>,
107    ) -> Result<Self> {
108        Ok(Self {
109            primary: Primary::new(account, storage, keep_state, storage_mode, ledger, tracing.clone())?,
110            dag: Default::default(),
111            leader_certificate: Default::default(),
112            leader_certificate_timer: Default::default(),
113            consensus_sender: Default::default(),
114            tracing: tracing.clone(),
115            handles: Default::default(),
116            bft_lock: Default::default(),
117        })
118    }
119
120    /// Run the BFT instance.
121    pub async fn run(
122        &mut self,
123        consensus_sender: Option<ConsensusSender<N>>,
124        primary_sender: PrimarySender<N>,
125        primary_receiver: PrimaryReceiver<N>,
126    ) -> Result<()> {
127        guard_info!(self, "Starting the BFT instance...");
128
129        // Initialize communications between BFT and Primary.
130        let (bft_sender, bft_receiver) = init_bft_channels::<N>();
131
132        // Start the BFT handlers.
133        self.start_handlers(bft_receiver);
134
135        // Run the primary instance.
136        let result = self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await;
137        if let Err(err) = result {
138            guard_error!(self, "BFT failed to run the primary instance - {err}");
139            self.shut_down().await;
140            return Err(err);
141        }
142
143        // Lastly, set the consensus sender.
144        // Note: This ensures that during initial syncing,
145        // the BFT does not advance the ledger.
146        if let Some(consensus_sender) = consensus_sender {
147            let result = self.consensus_sender.set(consensus_sender);
148            if result.is_err() {
149                self.shut_down().await;
150                guard_error!(self, "Consensus sender already set");
151                bail!("Consensus sender already set");
152            }
153        }
154        Ok(())
155    }
156
157    /// Returns `true` if the primary is synced.
158    pub fn is_synced(&self) -> bool {
159        self.primary.is_synced()
160    }
161
162    /// Returns the primary.
163    pub const fn primary(&self) -> &Primary<N> {
164        &self.primary
165    }
166
167    /// Returns the storage.
168    pub const fn storage(&self) -> &Storage<N> {
169        self.primary.storage()
170    }
171
172    /// Returns the ledger.
173    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
174        self.primary.ledger()
175    }
176
177    /// Returns the leader of the current even round, if one was present.
178    pub fn leader(&self) -> Option<Address<N>> {
179        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
180    }
181
182    /// Returns the certificate of the leader from the current even round, if one was present.
183    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
184        &self.leader_certificate
185    }
186}
187
188impl<N: Network> BFT<N> {
189    /// Returns the number of unconfirmed transmissions.
190    pub fn num_unconfirmed_transmissions(&self) -> usize {
191        self.primary.num_unconfirmed_transmissions()
192    }
193
194    /// Returns the number of unconfirmed ratifications.
195    pub fn num_unconfirmed_ratifications(&self) -> usize {
196        self.primary.num_unconfirmed_ratifications()
197    }
198
199    /// Returns the number of solutions.
200    pub fn num_unconfirmed_solutions(&self) -> usize {
201        self.primary.num_unconfirmed_solutions()
202    }
203
204    /// Returns the number of unconfirmed transactions.
205    pub fn num_unconfirmed_transactions(&self) -> usize {
206        self.primary.num_unconfirmed_transactions()
207    }
208}
209
210impl<N: Network> BFT<N> {
211    /// Returns the worker transmission IDs.
212    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
213        self.primary.worker_transmission_ids()
214    }
215
216    /// Returns the worker transmissions.
217    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
218        self.primary.worker_transmissions()
219    }
220
221    /// Returns the worker solutions.
222    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
223        self.primary.worker_solutions()
224    }
225
226    /// Returns the worker transactions.
227    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
228        self.primary.worker_transactions()
229    }
230}
231
232impl<N: Network> BFT<N> {
233    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
234    fn update_to_next_round(&self, current_round: u64) -> bool {
235        // Ensure the current round is at least the storage round (this is a sanity check).
236        let storage_round = self.storage().current_round();
237        if current_round < storage_round {
238            guard_debug!(
239                self,
240                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
241            );
242            return false;
243        }
244
245        // Determine if the BFT is ready to update to the next round.
246        let is_ready = match current_round % 2 == 0 {
247            true => self.update_leader_certificate_to_even_round(current_round),
248            false => self.is_leader_quorum_or_nonleaders_available(current_round),
249        };
250
251        #[cfg(feature = "metrics")]
252        {
253            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
254            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
255            if start > 0 {
256                let end = now();
257                let elapsed = std::time::Duration::from_secs((end - start) as u64);
258                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
259            }
260        }
261
262        // Log whether the round is going to update.
263        if current_round % 2 == 0 {
264            // Determine if there is a leader certificate.
265            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
266                // Ensure the state of the leader certificate is consistent with the BFT being ready.
267                if !is_ready {
268                    guard_trace!(self, is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
269                }
270                // Log the leader election.
271                let leader_round = leader_certificate.round();
272                match leader_round == current_round {
273                    true => {
274                        guard_info!(
275                            self,
276                            "\n\nRound {current_round} elected a leader - {}\n",
277                            leader_certificate.author()
278                        );
279                        #[cfg(feature = "metrics")]
280                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
281                    }
282                    false => {
283                        guard_warn!(self, "BFT failed to elect a leader for round {current_round} (!= {leader_round})")
284                    }
285                }
286            } else {
287                match is_ready {
288                    true => guard_info!(self, "\n\nRound {current_round} reached quorum without a leader\n"),
289                    false => {
290                        guard_info!(self, "{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed())
291                    }
292                }
293            }
294        }
295
296        // If the BFT is ready, then update to the next round.
297        if is_ready {
298            // Update to the next round in storage.
299            if let Err(e) = self.storage().increment_to_next_round(current_round) {
300                guard_warn!(self, "BFT failed to increment to the next round from round {current_round} - {e}");
301                return false;
302            }
303            // Update the timer for the leader certificate.
304            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
305        }
306
307        is_ready
308    }
309
310    /// Updates the leader certificate to the current even round,
311    /// returning `true` if the BFT is ready to update to the next round.
312    ///
313    /// This method runs on every even round, by determining the leader of the current even round,
314    /// and setting the leader certificate to their certificate in the round, if they were present.
315    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
316        // Retrieve the current round.
317        let current_round = self.storage().current_round();
318        // Ensure the current round matches the given round.
319        if current_round != even_round {
320            guard_warn!(
321                self,
322                "BFT storage (at round {current_round}) is out of sync with the current even round {even_round}"
323            );
324            return false;
325        }
326
327        // If the current round is odd, return false.
328        if current_round % 2 != 0 || current_round < 2 {
329            guard_error!(self, "BFT cannot update the leader certificate in an odd round");
330            return false;
331        }
332
333        // Retrieve the certificates for the current round.
334        let current_certificates = self.storage().get_certificates_for_round(current_round);
335        // If there are no current certificates, set the leader certificate to 'None', and return early.
336        if current_certificates.is_empty() {
337            // Set the leader certificate to 'None'.
338            *self.leader_certificate.write() = None;
339            return false;
340        }
341
342        // Retrieve the committee lookback of the current round.
343        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
344            Ok(committee) => committee,
345            Err(e) => {
346                guard_error!(
347                    self,
348                    "BFT failed to retrieve the committee lookback for the even round {current_round} - {e}"
349                );
350                return false;
351            }
352        };
353        // Determine the leader of the current round.
354        let leader = match self.ledger().latest_leader() {
355            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
356            _ => {
357                // Compute the leader for the current round.
358                let computed_leader = self.primary.account().address();
359                // let computed_leader = match committee_lookback.get_leader(current_round) {
360                //     Ok(leader) => leader,
361                //     Err(e) => {
362                //         guard_error!(self, "BFT failed to compute the leader for the even round {current_round} - {e}");
363                //         return false;
364                //     }
365                // };
366
367                // Cache the computed leader.
368                self.ledger().update_latest_leader(current_round, computed_leader);
369
370                computed_leader
371            }
372        };
373        // Find and set the leader certificate, if the leader was present in the current even round.
374        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
375        *self.leader_certificate.write() = leader_certificate.cloned();
376
377        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
378    }
379
380    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
381    ///  - If the leader certificate is set for the current even round.
382    ///  - The timer for the leader certificate has expired.
383    fn is_even_round_ready_for_next_round(
384        &self,
385        certificates: IndexSet<BatchCertificate<N>>,
386        committee: Committee<N>,
387        current_round: u64,
388    ) -> bool {
389        // Retrieve the authors for the current round.
390        let authors = certificates.into_iter().map(|c| c.author()).collect();
391        // Check if quorum threshold is reached.
392        if !committee.is_quorum_threshold_reached(&authors) {
393            guard_trace!(self, "BFT failed to reach quorum threshold in even round {current_round}");
394            return false;
395        }
396        // If the leader certificate is set for the current even round, return 'true'.
397        if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
398            if leader_certificate.round() == current_round {
399                return true;
400            }
401        }
402        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
403        if self.is_timer_expired() {
404            guard_debug!(
405                self,
406                "BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)"
407            );
408            return true;
409        }
410        // Otherwise, return 'false'.
411        false
412    }
413
414    /// Returns `true` if the timer for the leader certificate has expired.
415    fn is_timer_expired(&self) -> bool {
416        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
417    }
418
419    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
420    ///  - The leader certificate is `None`.
421    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
422    ///  - The leader certificate timer has expired.
423    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
424        // Retrieve the current round.
425        let current_round = self.storage().current_round();
426        // Ensure the current round matches the given round.
427        if current_round != odd_round {
428            guard_warn!(
429                self,
430                "BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}"
431            );
432            return false;
433        }
434        // If the current round is even, return false.
435        if current_round % 2 != 1 {
436            guard_error!(self, "BFT does not compute stakes for the leader certificate in an even round");
437            return false;
438        }
439        // Retrieve the certificates for the current round.
440        let current_certificates = self.storage().get_certificates_for_round(current_round);
441        // Retrieve the committee lookback for the current round.
442        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
443            Ok(committee) => committee,
444            Err(e) => {
445                guard_error!(
446                    self,
447                    "BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}"
448                );
449                return false;
450            }
451        };
452        // Retrieve the authors of the current certificates.
453        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
454        // Check if quorum threshold is reached.
455        if !committee_lookback.is_quorum_threshold_reached(&authors) {
456            guard_trace!(self, "BFT failed reach quorum threshold in odd round {current_round}. ");
457            return false;
458        }
459        // Retrieve the leader certificate.
460        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
461            // If there is no leader certificate for the previous round, return 'true'.
462            return true;
463        };
464        // Compute the stake for the leader certificate.
465        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
466            leader_certificate.id(),
467            current_certificates,
468            &committee_lookback,
469        );
470        // Return 'true' if any of the following conditions hold:
471        stake_with_leader >= committee_lookback.availability_threshold()
472            || stake_without_leader >= committee_lookback.quorum_threshold()
473            || self.is_timer_expired()
474    }
475
476    /// Computes the amount of stake that has & has not signed for the leader certificate.
477    fn compute_stake_for_leader_certificate(
478        &self,
479        leader_certificate_id: Field<N>,
480        current_certificates: IndexSet<BatchCertificate<N>>,
481        current_committee: &Committee<N>,
482    ) -> (u64, u64) {
483        // If there are no current certificates, return early.
484        if current_certificates.is_empty() {
485            return (0, 0);
486        }
487
488        // Initialize a tracker for the stake with the leader.
489        let mut stake_with_leader = 0u64;
490        // Initialize a tracker for the stake without the leader.
491        let mut stake_without_leader = 0u64;
492        // Iterate over the current certificates.
493        for certificate in current_certificates {
494            // Retrieve the stake for the author of the certificate.
495            let stake = current_committee.get_stake(certificate.author());
496            // Determine if the certificate includes the leader.
497            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
498                // If the certificate includes the leader, add the stake to the stake with the leader.
499                true => stake_with_leader = stake_with_leader.saturating_add(stake),
500                // If the certificate does not include the leader, add the stake to the stake without the leader.
501                false => stake_without_leader = stake_without_leader.saturating_add(stake),
502            }
503        }
504        // Return the stake with the leader, and the stake without the leader.
505        (stake_with_leader, stake_without_leader)
506    }
507}
508
509impl<N: Network> BFT<N> {
510    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
511    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
512        &self,
513        certificate: BatchCertificate<N>,
514    ) -> Result<()> {
515        // Acquire the BFT lock.
516        let _lock = self.bft_lock.lock().await;
517        // Retrieve the certificate round.
518        let certificate_round = certificate.round();
519        // Insert the certificate into the DAG.
520        self.dag.write().insert(certificate, self);
521
522        // Construct the commit round.
523        let commit_round = certificate_round.saturating_sub(1);
524        // If the commit round is odd, return early.
525        if commit_round % 2 != 0 || commit_round < 2 {
526            return Ok(());
527        }
528        // If the commit round is at or below the last committed round, return early.
529        if commit_round <= self.dag.read().last_committed_round() {
530            return Ok(());
531        }
532
533        /* Proceeding to check if the leader is ready to be committed. */
534        guard_info!(self, "Checking if the leader is ready to be committed for round {commit_round}...");
535
536        // Retrieve the committee lookback for the commit round.
537        let Ok(_committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
538            bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
539        };
540
541        // Either retrieve the cached leader or compute it.
542        let leader = match self.ledger().latest_leader() {
543            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
544            _ => {
545                // Compute the leader for the commit round.
546                let computed_leader = self.primary.account().address();
547                // let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
548                //     bail!("BFT failed to compute the leader for commit round {commit_round}");
549                // };
550
551                // Cache the computed leader.
552                self.ledger().update_latest_leader(commit_round, computed_leader);
553
554                computed_leader
555            }
556        };
557
558        // Retrieve the leader certificate for the commit round.
559        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
560        else {
561            guard_trace!(self, "BFT did not find the leader certificate for commit round {commit_round} yet");
562            return Ok(());
563        };
564        // Retrieve all of the certificates for the **certificate** round.
565        let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
566            // TODO (howardwu): Investigate how many certificates we should have at this point.
567            bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
568        };
569        // Retrieve the committee lookback for the certificate round (i.e. the round just after the commit round).
570        let Ok(certificate_committee_lookback) = self.ledger().get_committee_lookback_for_round(certificate_round)
571        else {
572            bail!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}");
573        };
574        // Construct a set over the authors who included the leader's certificate in the certificate round.
575        let authors = certificates
576            .values()
577            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
578                true => Some(c.author()),
579                false => None,
580            })
581            .collect();
582        // Check if the leader is ready to be committed.
583        if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
584            // If the leader is not ready to be committed, return early.
585            guard_trace!(self, "BFT is not ready to commit {commit_round}");
586            return Ok(());
587        }
588
589        /* Proceeding to commit the leader. */
590        guard_info!(self, "Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
591
592        // Commit the leader certificate, and all previous leader certificates since the last committed round.
593        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
594    }
595
596    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
597    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
598        &self,
599        leader_certificate: BatchCertificate<N>,
600    ) -> Result<()> {
601        // Fetch the leader round.
602        let latest_leader_round = leader_certificate.round();
603        // Determine the list of all previous leader certificates since the last committed round.
604        // The order of the leader certificates is from **newest** to **oldest**.
605        let mut leader_certificates = vec![leader_certificate.clone()];
606        {
607            // Retrieve the leader round.
608            let leader_round = leader_certificate.round();
609
610            let mut current_certificate = leader_certificate;
611            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
612            {
613                // // Retrieve the previous committee for the leader round.
614                // let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
615                //     Ok(committee) => committee,
616                //     Err(e) => {
617                //         bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
618                //     }
619                // };
620
621                // Either retrieve the cached leader or compute it.
622                let leader = match self.ledger().latest_leader() {
623                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
624                    _ => {
625                        // Compute the leader for the commit round.
626                        let computed_leader = self.primary.account().address();
627                        // let computed_leader = match previous_committee_lookback.get_leader(round) {
628                        //     Ok(leader) => leader,
629                        //     Err(e) => {
630                        //         bail!("BFT failed to compute the leader for the even round {round} - {e}");
631                        //     }
632                        // };
633
634                        // Cache the computed leader.
635                        self.ledger().update_latest_leader(round, computed_leader);
636
637                        computed_leader
638                    }
639                };
640                // Retrieve the previous leader certificate.
641                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
642                else {
643                    continue;
644                };
645                // Determine if there is a path between the previous certificate and the current certificate.
646                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
647                    // Add the previous leader certificate to the list of certificates to commit.
648                    leader_certificates.push(previous_certificate.clone());
649                    // Update the current certificate to the previous leader certificate.
650                    current_certificate = previous_certificate;
651                }
652            }
653        }
654
655        // Iterate over the leader certificates to commit.
656        for leader_certificate in leader_certificates.into_iter().rev() {
657            // Retrieve the leader certificate round.
658            let leader_round = leader_certificate.round();
659            // Compute the commit subdag.
660            let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
661                Ok(subdag) => subdag,
662                Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
663            };
664            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
665            if !IS_SYNCING {
666                // Initialize a map for the deduped transmissions.
667                let mut transmissions = IndexMap::new();
668                // Initialize a map for the deduped transaction ids.
669                let mut seen_transaction_ids = IndexSet::new();
670                // Initialize a map for the deduped solution ids.
671                let mut seen_solution_ids = IndexSet::new();
672                // Start from the oldest leader certificate.
673                for certificate in commit_subdag.values().flatten() {
674                    // Retrieve the transmissions.
675                    for transmission_id in certificate.transmission_ids() {
676                        // If the transaction ID or solution ID already exists in the map, skip it.
677                        // Note: This additional check is done to ensure that we do not include duplicate
678                        // transaction IDs or solution IDs that may have a different transmission ID.
679                        match transmission_id {
680                            TransmissionID::Solution(solution_id, _) => {
681                                // If the solution already exists, skip it.
682                                if seen_solution_ids.contains(&solution_id) {
683                                    continue;
684                                }
685                            }
686                            TransmissionID::Transaction(transaction_id, _) => {
687                                // If the transaction already exists, skip it.
688                                if seen_transaction_ids.contains(transaction_id) {
689                                    continue;
690                                }
691                            }
692                            TransmissionID::Ratification => {
693                                bail!("Ratifications are currently not supported in the BFT.")
694                            }
695                        }
696                        // If the transmission already exists in the map, skip it.
697                        if transmissions.contains_key(transmission_id) {
698                            continue;
699                        }
700                        // If the transmission already exists in the ledger, skip it.
701                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
702                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
703                            continue;
704                        }
705                        // Retrieve the transmission.
706                        let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
707                            bail!(
708                                "BFT failed to retrieve transmission '{}.{}' from round {}",
709                                fmt_id(transmission_id),
710                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
711                                certificate.round()
712                            );
713                        };
714                        // Insert the transaction ID or solution ID into the map.
715                        match transmission_id {
716                            TransmissionID::Solution(id, _) => {
717                                seen_solution_ids.insert(id);
718                            }
719                            TransmissionID::Transaction(id, _) => {
720                                seen_transaction_ids.insert(id);
721                            }
722                            TransmissionID::Ratification => {}
723                        }
724                        // Add the transmission to the set.
725                        transmissions.insert(*transmission_id, transmission);
726                    }
727                }
728                // Trigger consensus, as this will build a new block for the ledger.
729                // Construct the subdag.
730                let subdag = Subdag::from(commit_subdag.clone())?;
731                // Retrieve the anchor round.
732                let anchor_round = subdag.anchor_round();
733                // Retrieve the number of transmissions.
734                let num_transmissions = transmissions.len();
735                // Retrieve metadata about the subdag.
736                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
737
738                // Ensure the subdag anchor round matches the leader round.
739                ensure!(
740                    anchor_round == leader_round,
741                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
742                );
743
744                // Trigger consensus.
745                if let Some(consensus_sender) = self.consensus_sender.get() {
746                    // Initialize a callback sender and receiver.
747                    let (callback_sender, callback_receiver) = oneshot::channel();
748                    // Send the subdag and transmissions to consensus.
749                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
750                    // Await the callback to continue.
751                    match callback_receiver.await {
752                        Ok(Ok(())) => (), // continue
753                        Ok(Err(e)) => {
754                            guard_error!(self, "BFT failed to advance the subdag for round {anchor_round} - {e}");
755                            return Ok(());
756                        }
757                        Err(e) => {
758                            guard_error!(self, "BFT failed to receive the callback for round {anchor_round} - {e}");
759                            return Ok(());
760                        }
761                    }
762                }
763
764                guard_info!(
765                    self,
766                    "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
767                );
768            }
769
770            // Update the DAG, as the subdag was successfully included into a block.
771            let mut dag_write = self.dag.write();
772            for certificate in commit_subdag.values().flatten() {
773                dag_write.commit(certificate, self.storage().max_gc_rounds());
774            }
775        }
776
777        // Perform garbage collection based on the latest committed leader round.
778        // The protocol guarantees that validators commit the same anchors in the same order,
779        // but they may do so in different chunks of anchors,
780        // where 'chunk' refers to the vector of certificates that the loop just above iterates over.
781        // Doing garbage collection at the end of each chunk (as we do here),
782        // as opposed to after each certificate in the chunk (if we moved this call inside the loop, at the end),
783        // may give raise to a discrepancy between the DAGs of different validators who commit different chunks:
784        // one validator may have more certificates than the other, not yet garbage collected.
785        // However, when `order_dag_with_dfs()` collects the sub-DAG to commit from an anchor,
786        // it excludes certificates that are below the GC round,
787        // so the possible aforementioned discrepancy between DAGs should not affect the consensus.
788        // That exclusion in `order_dag_with_dfs()` is critical to prevent forking,
789        // so long as garbage collection is done after each chunk.
790        // If garbage collection were done after each committed certificate,
791        // that exclusion in `order_dag_with_dfs()` should be unnecessary.
792        self.storage().garbage_collect_certificates(latest_leader_round);
793
794        Ok(())
795    }
796
797    /// Returns the subdag of batch certificates to commit.
798    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
799        &self,
800        leader_certificate: BatchCertificate<N>,
801    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
802        // Initialize a map for the certificates to commit.
803        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
804        // Initialize a set for the already ordered certificates.
805        let mut already_ordered = HashSet::new();
806        // Initialize a buffer for the certificates to order.
807        let mut buffer = vec![leader_certificate];
808        // Iterate over the certificates to order.
809        while let Some(certificate) = buffer.pop() {
810            // Insert the certificate into the map.
811            commit.entry(certificate.round()).or_default().insert(certificate.clone());
812
813            // Check if the previous certificate is below the GC round.
814            // This is currently a critical check to prevent forking,
815            // as explained in the comment at the end of `commit_leader_certificate()`,
816            // just before the call to garbage collection.
817            let previous_round = certificate.round().saturating_sub(1);
818            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
819                continue;
820            }
821            // Iterate over the previous certificate IDs.
822            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
823            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
824            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
825                // If the previous certificate is already ordered, continue.
826                if already_ordered.contains(previous_certificate_id) {
827                    continue;
828                }
829                // If the previous certificate was recently committed, continue.
830                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
831                    continue;
832                }
833                // If the previous certificate already exists in the ledger, continue.
834                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
835                    continue;
836                }
837
838                // Retrieve the previous certificate.
839                let previous_certificate = {
840                    // Start by retrieving the previous certificate from the DAG.
841                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
842                        // If the previous certificate is found, return it.
843                        Some(previous_certificate) => previous_certificate,
844                        // If the previous certificate is not found, retrieve it from the storage.
845                        None => match self.storage().get_certificate(*previous_certificate_id) {
846                            // If the previous certificate is found, return it.
847                            Some(previous_certificate) => previous_certificate,
848                            // Otherwise, the previous certificate is missing, and throw an error.
849                            None => bail!(
850                                "Missing previous certificate {} for round {previous_round}",
851                                fmt_id(previous_certificate_id)
852                            ),
853                        },
854                    }
855                };
856                // Insert the previous certificate into the set of already ordered certificates.
857                already_ordered.insert(previous_certificate.id());
858                // Insert the previous certificate into the buffer.
859                buffer.push(previous_certificate);
860            }
861        }
862        // Ensure we only retain certificates that are above the GC round.
863        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
864        // Return the certificates to commit.
865        Ok(commit)
866    }
867
868    /// Returns `true` if there is a path from the previous certificate to the current certificate.
869    fn is_linked(
870        &self,
871        previous_certificate: BatchCertificate<N>,
872        current_certificate: BatchCertificate<N>,
873    ) -> Result<bool> {
874        // Initialize the list containing the traversal.
875        let mut traversal = vec![current_certificate.clone()];
876        // Iterate over the rounds from the current certificate to the previous certificate.
877        for round in (previous_certificate.round()..current_certificate.round()).rev() {
878            // Retrieve all of the certificates for this past round.
879            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
880                // This is a critical error, as the traversal should have these certificates.
881                // If this error is hit, it is likely that the maximum GC rounds should be increased.
882                bail!("BFT failed to retrieve the certificates for past round {round}");
883            };
884            // Filter the certificates to only include those that are in the traversal.
885            traversal = certificates
886                .into_values()
887                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
888                .collect();
889        }
890        Ok(traversal.contains(&previous_certificate))
891    }
892}
893
894impl<N: Network> BFT<N> {
895    /// Starts the BFT handlers.
896    fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
897        let BFTReceiver { mut rx_primary_round, mut rx_primary_certificate, mut rx_sync_bft_dag_at_bootup } =
898            bft_receiver;
899
900        // Process the current round from the primary.
901        let self_ = self.clone();
902        self.spawn(async move {
903            while let Some((current_round, callback)) = rx_primary_round.recv().await {
904                callback.send(self_.update_to_next_round(current_round)).ok();
905            }
906        });
907
908        // Process the certificate from the primary.
909        let self_ = self.clone();
910        self.spawn(async move {
911            while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
912                // Update the DAG with the certificate.
913                let result = self_.update_dag::<true, false>(certificate).await;
914                // Send the callback **after** updating the DAG.
915                // Note: We must await the DAG update before proceeding.
916                callback.send(result).ok();
917            }
918        });
919
920        // Process the request to sync the BFT DAG at bootup.
921        let self_ = self.clone();
922        self.spawn(async move {
923            while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
924                self_.sync_bft_dag_at_bootup(certificates).await;
925            }
926        });
927    }
928
929    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
930    /// already exist in the ledger.
931    ///
932    /// This method commits all the certificates into the DAG.
933    /// Note that there is no need to insert the certificates into the DAG, because these certificates
934    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
935    async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
936        // Acquire the BFT write lock.
937        let mut dag = self.dag.write();
938
939        // Commit all the certificates.
940        for certificate in certificates {
941            dag.commit(&certificate, self.storage().max_gc_rounds());
942        }
943    }
944
945    /// Spawns a task with the given future; it should only be used for long-running tasks.
946    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
947        self.handles.lock().push(tokio::spawn(future));
948    }
949
950    /// Shuts down the BFT.
951    pub async fn shut_down(&self) {
952        guard_info!(self, "Shutting down the BFT...");
953        // Acquire the lock.
954        let _lock = self.bft_lock.lock().await;
955
956        // Shut down the primary.
957        self.primary.shut_down().await;
958
959        // Abort BFT tasks.
960        let mut handles = self.handles.lock();
961        handles.iter().for_each(|handle| handle.abort());
962        handles.clear();
963    }
964}
965
966#[cfg(test)]
967mod tests {
968    use crate::{
969        BFT,
970        DEVELOPMENT_MODE_RNG_SEED,
971        MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
972        helpers::{Storage, amareleo_storage_mode, default_ledger_dir},
973    };
974
975    use amareleo_chain_account::Account;
976    use amareleo_node_bft_ledger_service::MockLedgerService;
977    use amareleo_node_bft_storage_service::BFTMemoryService;
978    use snarkvm::{
979        console::account::{Address, PrivateKey},
980        ledger::{
981            committee::Committee,
982            narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
983        },
984        utilities::TestRng,
985    };
986
987    use aleo_std::StorageMode;
988    use anyhow::Result;
989    use indexmap::{IndexMap, IndexSet};
990    use rand::SeedableRng;
991    use rand_chacha::ChaChaRng;
992    use std::sync::Arc;
993
994    type CurrentNetwork = snarkvm::console::network::MainnetV0;
995
996    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
997    fn sample_test_instance(
998        committee_round: Option<u64>,
999        max_gc_rounds: u64,
1000        rng: &mut TestRng,
1001    ) -> (
1002        Committee<CurrentNetwork>,
1003        Account<CurrentNetwork>,
1004        Arc<MockLedgerService<CurrentNetwork>>,
1005        Storage<CurrentNetwork>,
1006    ) {
1007        let committee = match committee_round {
1008            Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
1009            None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
1010        };
1011        let account = Account::new(rng).unwrap();
1012        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1013        let transmissions = Arc::new(BFTMemoryService::new());
1014        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
1015
1016        (committee, account, ledger, storage)
1017    }
1018
1019    #[test]
1020    #[tracing_test::traced_test]
1021    fn test_is_leader_quorum_odd() -> Result<()> {
1022        let rng = &mut TestRng::default();
1023
1024        // Sample batch certificates.
1025        let mut certificates = IndexSet::new();
1026        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1027        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1028        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1029        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1030
1031        // Initialize the committee.
1032        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1033            1,
1034            vec![
1035                certificates[0].author(),
1036                certificates[1].author(),
1037                certificates[2].author(),
1038                certificates[3].author(),
1039            ],
1040            rng,
1041        );
1042
1043        // Initialize the ledger.
1044        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1045        // Initialize the storage.
1046        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1047        // Initialize the account.
1048        let account = Account::new(rng)?;
1049        // Initialize the BFT.
1050        let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1051        assert!(bft.is_timer_expired());
1052        // Ensure this call succeeds on an odd round.
1053        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1054        // If timer has expired but quorum threshold is not reached, return 'false'.
1055        assert!(!result);
1056        // Insert certificates into storage.
1057        for certificate in certificates.iter() {
1058            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1059        }
1060        // Ensure this call succeeds on an odd round.
1061        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1062        assert!(result); // no previous leader certificate
1063        // Set the leader certificate.
1064        let leader_certificate = sample_batch_certificate(rng);
1065        *bft.leader_certificate.write() = Some(leader_certificate);
1066        // Ensure this call succeeds on an odd round.
1067        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1068        assert!(result); // should now fall through to the end of function
1069
1070        Ok(())
1071    }
1072
1073    #[test]
1074    #[tracing_test::traced_test]
1075    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1076        let rng = &mut TestRng::default();
1077
1078        // Sample the test instance.
1079        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1080        assert_eq!(committee.starting_round(), 1);
1081        assert_eq!(storage.current_round(), 1);
1082        assert_eq!(storage.max_gc_rounds(), 10);
1083
1084        // Initialize the BFT.
1085        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1086        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1087
1088        // Store is at round 1, and we are checking for round 2.
1089        // Ensure this call fails on an even round.
1090        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1091        assert!(!result);
1092        Ok(())
1093    }
1094
1095    #[test]
1096    #[tracing_test::traced_test]
1097    fn test_is_leader_quorum_even() -> Result<()> {
1098        let rng = &mut TestRng::default();
1099
1100        // Sample the test instance.
1101        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1102        assert_eq!(committee.starting_round(), 2);
1103        assert_eq!(storage.current_round(), 2);
1104        assert_eq!(storage.max_gc_rounds(), 10);
1105
1106        // Initialize the BFT.
1107        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1108        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1109
1110        // Ensure this call fails on an even round.
1111        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1112        assert!(!result);
1113        Ok(())
1114    }
1115
1116    #[test]
1117    #[tracing_test::traced_test]
1118    fn test_is_even_round_ready() -> Result<()> {
1119        let rng = &mut TestRng::default();
1120
1121        // Sample batch certificates.
1122        let mut certificates = IndexSet::new();
1123        certificates.insert(sample_batch_certificate_for_round(2, rng));
1124        certificates.insert(sample_batch_certificate_for_round(2, rng));
1125        certificates.insert(sample_batch_certificate_for_round(2, rng));
1126        certificates.insert(sample_batch_certificate_for_round(2, rng));
1127
1128        // Initialize the committee.
1129        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1130            2,
1131            vec![
1132                certificates[0].author(),
1133                certificates[1].author(),
1134                certificates[2].author(),
1135                certificates[3].author(),
1136            ],
1137            rng,
1138        );
1139
1140        // Initialize the ledger.
1141        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1142        // Initialize the storage.
1143        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1144        // Initialize the account.
1145        let account = Account::new(rng)?;
1146        // Initialize the BFT.
1147        let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1148        // Set the leader certificate.
1149        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1150        *bft.leader_certificate.write() = Some(leader_certificate);
1151        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1152        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1153        assert!(!result);
1154        // Once quorum threshold is reached, we are ready for the next round.
1155        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1156        assert!(result);
1157
1158        // Initialize a new BFT.
1159        let bft_timer =
1160            BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1161        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1162        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1163        if !bft_timer.is_timer_expired() {
1164            assert!(!result);
1165        }
1166        // Wait for the timer to expire.
1167        let leader_certificate_timeout =
1168            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1169        std::thread::sleep(leader_certificate_timeout);
1170        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1171        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1172        if bft_timer.is_timer_expired() {
1173            assert!(result);
1174        } else {
1175            assert!(!result);
1176        }
1177
1178        Ok(())
1179    }
1180
1181    #[test]
1182    #[tracing_test::traced_test]
1183    fn test_update_leader_certificate_odd() -> Result<()> {
1184        let rng = &mut TestRng::default();
1185
1186        // Sample the test instance.
1187        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1188        assert_eq!(storage.max_gc_rounds(), 10);
1189
1190        // Initialize the BFT.
1191        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1192
1193        // Ensure this call fails on an odd round.
1194        let result = bft.update_leader_certificate_to_even_round(1);
1195        assert!(!result);
1196        Ok(())
1197    }
1198
1199    #[test]
1200    #[tracing_test::traced_test]
1201    fn test_update_leader_certificate_bad_round() -> Result<()> {
1202        let rng = &mut TestRng::default();
1203
1204        // Sample the test instance.
1205        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1206        assert_eq!(storage.max_gc_rounds(), 10);
1207
1208        // Initialize the BFT.
1209        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1210
1211        // Ensure this call succeeds on an even round.
1212        let result = bft.update_leader_certificate_to_even_round(6);
1213        assert!(!result);
1214        Ok(())
1215    }
1216
1217    #[test]
1218    #[tracing_test::traced_test]
1219    fn test_update_leader_certificate_even() -> Result<()> {
1220        let rng = &mut TestRng::default();
1221
1222        // Set the current round.
1223        let current_round = 3;
1224
1225        // Sample the certificates.
1226        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1227            current_round,
1228            rng,
1229        );
1230
1231        // Initialize the committee.
1232        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1233            2,
1234            vec![
1235                certificates[0].author(),
1236                certificates[1].author(),
1237                certificates[2].author(),
1238                certificates[3].author(),
1239            ],
1240            rng,
1241        );
1242
1243        // Initialize the ledger.
1244        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1245
1246        // Initialize the storage.
1247        let transmissions = Arc::new(BFTMemoryService::new());
1248        let storage = Storage::new(ledger.clone(), transmissions, 10, None);
1249        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1250        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1251        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1252        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1253        assert_eq!(storage.current_round(), 2);
1254
1255        // Retrieve the leader certificate.
1256        let leader = certificates[0].author(); // committee.get_leader(2).unwrap();
1257        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1258
1259        // Initialize the BFT.
1260        let account = Account::new(rng)?;
1261        let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1262
1263        // Set the leader certificate.
1264        *bft.leader_certificate.write() = Some(leader_certificate);
1265
1266        // Update the leader certificate.
1267        // Ensure this call succeeds on an even round.
1268        let result = bft.update_leader_certificate_to_even_round(2);
1269        assert!(result);
1270
1271        Ok(())
1272    }
1273
1274    #[tokio::test]
1275    #[tracing_test::traced_test]
1276    async fn test_order_dag_with_dfs() -> Result<()> {
1277        let rng = &mut TestRng::default();
1278
1279        // Sample the test instance.
1280        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1281
1282        // Initialize the round parameters.
1283        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1284        let current_round = previous_round + 1;
1285
1286        // Sample the current certificate and previous certificates.
1287        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1288            current_round,
1289            rng,
1290        );
1291
1292        /* Test GC */
1293
1294        // Ensure the function succeeds in returning only certificates above GC.
1295        {
1296            // Initialize the storage.
1297            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1298            // Initialize the BFT.
1299            let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1300
1301            // Insert a mock DAG in the BFT.
1302            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1303
1304            // Insert the previous certificates into the BFT.
1305            for certificate in previous_certificates.clone() {
1306                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1307            }
1308
1309            // Ensure this call succeeds and returns all given certificates.
1310            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1311            assert!(result.is_ok());
1312            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1313            assert_eq!(candidate_certificates.len(), 1);
1314            let expected_certificates = vec![certificate.clone()];
1315            assert_eq!(
1316                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1317                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1318            );
1319            assert_eq!(candidate_certificates, expected_certificates);
1320        }
1321
1322        /* Test normal case */
1323
1324        // Ensure the function succeeds in returning all given certificates.
1325        {
1326            // Initialize the storage.
1327            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1328            // Initialize the BFT.
1329            let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1330
1331            // Insert a mock DAG in the BFT.
1332            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1333
1334            // Insert the previous certificates into the BFT.
1335            for certificate in previous_certificates.clone() {
1336                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1337            }
1338
1339            // Ensure this call succeeds and returns all given certificates.
1340            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1341            assert!(result.is_ok());
1342            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1343            assert_eq!(candidate_certificates.len(), 5);
1344            let expected_certificates = vec![
1345                previous_certificates[0].clone(),
1346                previous_certificates[1].clone(),
1347                previous_certificates[2].clone(),
1348                previous_certificates[3].clone(),
1349                certificate,
1350            ];
1351            assert_eq!(
1352                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1353                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1354            );
1355            assert_eq!(candidate_certificates, expected_certificates);
1356        }
1357
1358        Ok(())
1359    }
1360
1361    #[test]
1362    #[tracing_test::traced_test]
1363    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1364        let rng = &mut TestRng::default();
1365
1366        // Sample the test instance.
1367        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1368        assert_eq!(committee.starting_round(), 1);
1369        assert_eq!(storage.current_round(), 1);
1370        assert_eq!(storage.max_gc_rounds(), 1);
1371
1372        // Initialize the round parameters.
1373        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1374        let current_round = previous_round + 1;
1375
1376        // Sample the current certificate and previous certificates.
1377        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1378            current_round,
1379            rng,
1380        );
1381        // Construct the previous certificate IDs.
1382        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1383
1384        /* Test missing previous certificate. */
1385
1386        // Initialize the BFT.
1387        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1388
1389        // The expected error message.
1390        let error_msg = format!(
1391            "Missing previous certificate {} for round {previous_round}",
1392            crate::helpers::fmt_id(previous_certificate_ids[3]),
1393        );
1394
1395        // Ensure this call fails on a missing previous certificate.
1396        let result = bft.order_dag_with_dfs::<false>(certificate);
1397        assert!(result.is_err());
1398        assert_eq!(result.unwrap_err().to_string(), error_msg);
1399        Ok(())
1400    }
1401
1402    #[tokio::test]
1403    #[tracing_test::traced_test]
1404    async fn test_bft_gc_on_commit() -> Result<()> {
1405        let rng = &mut TestRng::default();
1406
1407        // Initialize the round parameters.
1408        let max_gc_rounds = 1;
1409        let committee_round = 0;
1410        let commit_round = 2;
1411        let current_round = commit_round + 1;
1412
1413        // Sample the certificates.
1414        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1415            current_round,
1416            rng,
1417        );
1418
1419        // Initialize the committee.
1420        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1421            committee_round,
1422            vec![
1423                certificates[0].author(),
1424                certificates[1].author(),
1425                certificates[2].author(),
1426                certificates[3].author(),
1427            ],
1428            rng,
1429        );
1430
1431        // Initialize the ledger.
1432        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1433
1434        // Initialize the storage.
1435        let transmissions = Arc::new(BFTMemoryService::new());
1436        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
1437        // Insert the certificates into the storage.
1438        for certificate in certificates.iter() {
1439            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1440        }
1441
1442        // Get the leader certificate.
1443        let leader = certificates[0].author(); // committee.get_leader(commit_round).unwrap();
1444        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1445
1446        // Initialize the BFT.
1447        let account = Account::new(rng)?;
1448        let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1449        // Insert a mock DAG in the BFT.
1450        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1451
1452        // Ensure that the `gc_round` has not been updated yet.
1453        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1454
1455        // Insert the certificates into the BFT.
1456        for certificate in certificates {
1457            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1458        }
1459
1460        // Commit the leader certificate.
1461        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1462
1463        // Ensure that the `gc_round` has been updated.
1464        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1465
1466        Ok(())
1467    }
1468
1469    #[tokio::test]
1470    #[tracing_test::traced_test]
1471    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1472        let rng = &mut TestRng::default();
1473
1474        // Initialize the round parameters.
1475        let max_gc_rounds = 1;
1476        let committee_round = 0;
1477        let commit_round = 2;
1478        let current_round = commit_round + 1;
1479
1480        // Sample the current certificate and previous certificates.
1481        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1482            current_round,
1483            rng,
1484        );
1485
1486        // Initialize the committee.
1487        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1488            committee_round,
1489            vec![
1490                certificates[0].author(),
1491                certificates[1].author(),
1492                certificates[2].author(),
1493                certificates[3].author(),
1494            ],
1495            rng,
1496        );
1497
1498        // Initialize the ledger.
1499        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1500
1501        // Initialize the storage.
1502        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1503        // Insert the certificates into the storage.
1504        for certificate in certificates.iter() {
1505            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1506        }
1507
1508        // Get the leader certificate.
1509        let leader = certificates[0].author(); // committee.get_leader(commit_round).unwrap();
1510        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1511
1512        // Initialize the BFT.
1513        let account = Account::new(rng)?;
1514        let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1515
1516        // Insert a mock DAG in the BFT.
1517        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1518
1519        // Insert the previous certificates into the BFT.
1520        for certificate in certificates.clone() {
1521            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1522        }
1523
1524        // Commit the leader certificate.
1525        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1526
1527        // Simulate a bootup of the BFT.
1528
1529        // Initialize a new instance of storage.
1530        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1531        // Initialize a new instance of BFT.
1532        let bootup_bft = BFT::new(account, storage_2, false, StorageMode::Development(0), ledger, None)?;
1533
1534        // Sync the BFT DAG at bootup.
1535        bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1536
1537        // Check that the BFT starts from the same last committed round.
1538        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1539
1540        // Ensure that both BFTs have committed the leader certificate.
1541        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1542        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1543
1544        // Check the state of the bootup BFT.
1545        for certificate in certificates {
1546            let certificate_round = certificate.round();
1547            let certificate_id = certificate.id();
1548            // Check that the bootup BFT has committed the certificates.
1549            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1550            // Check that the bootup BFT does not contain the certificates in its graph, because
1551            // it should not need to order them again in subsequent subdags.
1552            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1553        }
1554
1555        Ok(())
1556    }
1557
1558    #[tokio::test]
1559    #[tracing_test::traced_test]
1560    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1561        /*
1562        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1563        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1564        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1565        */
1566
1567        let rng = &mut TestRng::default();
1568        let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1569
1570        let private_keys = vec![
1571            PrivateKey::new(rng_pks).unwrap(),
1572            PrivateKey::new(rng_pks).unwrap(),
1573            PrivateKey::new(rng_pks).unwrap(),
1574            PrivateKey::new(rng_pks).unwrap(),
1575        ];
1576        let address0 = Address::try_from(private_keys[0])?;
1577
1578        // Initialize the round parameters.
1579        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1580        let committee_round = 0;
1581        let commit_round = 2;
1582        let current_round = commit_round + 1;
1583        let next_round = current_round + 1;
1584
1585        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1586        let (round_to_certificates_map, committee) = {
1587            let addresses = vec![
1588                Address::try_from(private_keys[0])?,
1589                Address::try_from(private_keys[1])?,
1590                Address::try_from(private_keys[2])?,
1591                Address::try_from(private_keys[3])?,
1592            ];
1593            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1594                committee_round,
1595                addresses,
1596                rng,
1597            );
1598            // Initialize a mapping from the round number to the set of batch certificates in the round.
1599            let mut round_to_certificates_map: IndexMap<
1600                u64,
1601                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1602            > = IndexMap::new();
1603            let mut previous_certificates = IndexSet::with_capacity(4);
1604            // Initialize the genesis batch certificates.
1605            for _ in 0..4 {
1606                previous_certificates.insert(sample_batch_certificate(rng));
1607            }
1608            for round in 0..commit_round + 3 {
1609                let mut current_certificates = IndexSet::new();
1610                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1611                    IndexSet::new()
1612                } else {
1613                    previous_certificates.iter().map(|c| c.id()).collect()
1614                };
1615                let transmission_ids =
1616                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1617                        .into_iter()
1618                        .collect::<IndexSet<_>>();
1619                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1620                let committee_id = committee.id();
1621                for (i, private_key_1) in private_keys.iter().enumerate() {
1622                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1623                        private_key_1,
1624                        round,
1625                        timestamp,
1626                        committee_id,
1627                        transmission_ids.clone(),
1628                        previous_certificate_ids.clone(),
1629                        rng,
1630                    )
1631                    .unwrap();
1632                    let mut signatures = IndexSet::with_capacity(4);
1633                    for (j, private_key_2) in private_keys.iter().enumerate() {
1634                        if i != j {
1635                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1636                        }
1637                    }
1638                    let certificate =
1639                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1640                    current_certificates.insert(certificate);
1641                }
1642                // Update the mapping.
1643                round_to_certificates_map.insert(round, current_certificates.clone());
1644                previous_certificates = current_certificates.clone();
1645            }
1646            (round_to_certificates_map, committee)
1647        };
1648
1649        // Initialize the ledger.
1650        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1651        // Initialize the storage.
1652        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1653        // Get the leaders for the next 2 commit rounds.
1654        let leader = address0; // committee.get_leader(commit_round).unwrap();
1655        let next_leader = address0; // committee.get_leader(next_round).unwrap();
1656        // Insert the pre shutdown certificates into the storage.
1657        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1658        for i in 1..=commit_round {
1659            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1660            if i == commit_round {
1661                // Only insert the leader certificate for the commit round.
1662                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1663                if let Some(c) = leader_certificate {
1664                    pre_shutdown_certificates.push(c.clone());
1665                }
1666                continue;
1667            }
1668            pre_shutdown_certificates.extend(certificates);
1669        }
1670        for certificate in pre_shutdown_certificates.iter() {
1671            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1672        }
1673        // Insert the post shutdown certificates into the storage.
1674        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1675            Vec::new();
1676        for j in commit_round..=commit_round + 2 {
1677            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1678            post_shutdown_certificates.extend(certificate);
1679        }
1680        for certificate in post_shutdown_certificates.iter() {
1681            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1682        }
1683        // Get the leader certificates.
1684        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1685        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1686
1687        // Initialize the BFT without bootup.
1688        let account = Account::try_from(private_keys[0])?;
1689        let ledger_dir = default_ledger_dir(1, true, "0");
1690        let storage_mode = amareleo_storage_mode(ledger_dir);
1691        let bft = BFT::new(account.clone(), storage, true, storage_mode.clone(), ledger.clone(), None)?;
1692
1693        // Insert a mock DAG in the BFT without bootup.
1694        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1695
1696        // Insert the certificates into the BFT without bootup.
1697        for certificate in pre_shutdown_certificates.clone() {
1698            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1699        }
1700
1701        // Insert the post shutdown certificates into the BFT without bootup.
1702        for certificate in post_shutdown_certificates.clone() {
1703            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1704        }
1705        // Commit the second leader certificate.
1706        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1707        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1708        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1709
1710        // Simulate a bootup of the BFT.
1711
1712        // Initialize a new instance of storage.
1713        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1714
1715        // Initialize a new instance of BFT with bootup.
1716        let bootup_bft = BFT::new(account, bootup_storage.clone(), true, storage_mode.clone(), ledger.clone(), None)?;
1717
1718        // Sync the BFT DAG at bootup.
1719        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1720
1721        // Insert the post shutdown certificates to the storage and BFT with bootup.
1722        for certificate in post_shutdown_certificates.iter() {
1723            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1724        }
1725        for certificate in post_shutdown_certificates.clone() {
1726            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1727        }
1728        // Commit the second leader certificate.
1729        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1730        let commit_subdag_metadata_bootup =
1731            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1732        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1733        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1734
1735        // Check that the final state of both BFTs is the same.
1736
1737        // Check that both BFTs start from the same last committed round.
1738        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1739
1740        // Ensure that both BFTs have committed the leader certificates.
1741        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1742        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1743        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1744        assert!(
1745            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1746        );
1747
1748        // Check that the bootup BFT has committed the pre shutdown certificates.
1749        for certificate in pre_shutdown_certificates.clone() {
1750            let certificate_round = certificate.round();
1751            let certificate_id = certificate.id();
1752            // Check that both BFTs have committed the certificates.
1753            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1754            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1755            // Check that the bootup BFT does not contain the certificates in its graph, because
1756            // it should not need to order them again in subsequent subdags.
1757            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1758            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1759        }
1760
1761        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1762        for certificate in committed_certificates_bootup.clone() {
1763            let certificate_round = certificate.round();
1764            let certificate_id = certificate.id();
1765            // Check that the both BFTs have committed the certificates.
1766            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1767            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1768            // Check that the bootup BFT does not contain the certificates in its graph, because
1769            // it should not need to order them again in subsequent subdags.
1770            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1771            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1772        }
1773
1774        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1775        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1776
1777        Ok(())
1778    }
1779
1780    #[tokio::test]
1781    #[tracing_test::traced_test]
1782    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1783        /*
1784        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1785        2. Add post shutdown certificates to the bootup BFT.
1786        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1787        */
1788
1789        let rng = &mut TestRng::default();
1790        let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1791
1792        let private_keys = vec![
1793            PrivateKey::new(rng_pks).unwrap(),
1794            PrivateKey::new(rng_pks).unwrap(),
1795            PrivateKey::new(rng_pks).unwrap(),
1796            PrivateKey::new(rng_pks).unwrap(),
1797        ];
1798        let address0 = Address::try_from(private_keys[0])?;
1799
1800        // Initialize the round parameters.
1801        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1802        let committee_round = 0;
1803        let commit_round = 2;
1804        let current_round = commit_round + 1;
1805        let next_round = current_round + 1;
1806
1807        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1808        let (round_to_certificates_map, committee) = {
1809            let addresses = vec![
1810                Address::try_from(private_keys[0])?,
1811                Address::try_from(private_keys[1])?,
1812                Address::try_from(private_keys[2])?,
1813                Address::try_from(private_keys[3])?,
1814            ];
1815            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1816                committee_round,
1817                addresses,
1818                rng,
1819            );
1820            // Initialize a mapping from the round number to the set of batch certificates in the round.
1821            let mut round_to_certificates_map: IndexMap<
1822                u64,
1823                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1824            > = IndexMap::new();
1825            let mut previous_certificates = IndexSet::with_capacity(4);
1826            // Initialize the genesis batch certificates.
1827            for _ in 0..4 {
1828                previous_certificates.insert(sample_batch_certificate(rng));
1829            }
1830            for round in 0..=commit_round + 2 {
1831                let mut current_certificates = IndexSet::new();
1832                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1833                    IndexSet::new()
1834                } else {
1835                    previous_certificates.iter().map(|c| c.id()).collect()
1836                };
1837                let transmission_ids =
1838                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1839                        .into_iter()
1840                        .collect::<IndexSet<_>>();
1841                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1842                let committee_id = committee.id();
1843                for (i, private_key_1) in private_keys.iter().enumerate() {
1844                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1845                        private_key_1,
1846                        round,
1847                        timestamp,
1848                        committee_id,
1849                        transmission_ids.clone(),
1850                        previous_certificate_ids.clone(),
1851                        rng,
1852                    )
1853                    .unwrap();
1854                    let mut signatures = IndexSet::with_capacity(4);
1855                    for (j, private_key_2) in private_keys.iter().enumerate() {
1856                        if i != j {
1857                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1858                        }
1859                    }
1860                    let certificate =
1861                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1862                    current_certificates.insert(certificate);
1863                }
1864                // Update the mapping.
1865                round_to_certificates_map.insert(round, current_certificates.clone());
1866                previous_certificates = current_certificates.clone();
1867            }
1868            (round_to_certificates_map, committee)
1869        };
1870
1871        // Initialize the ledger.
1872        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1873        // Initialize the storage.
1874        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1875        // Get the leaders for the next 2 commit rounds.
1876        let leader = address0; // committee.get_leader(commit_round).unwrap();
1877        let next_leader = address0; // committee.get_leader(next_round).unwrap();
1878        // Insert the pre shutdown certificates into the storage.
1879        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1880        for i in 1..=commit_round {
1881            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1882            if i == commit_round {
1883                // Only insert the leader certificate for the commit round.
1884                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1885                if let Some(c) = leader_certificate {
1886                    pre_shutdown_certificates.push(c.clone());
1887                }
1888                continue;
1889            }
1890            pre_shutdown_certificates.extend(certificates);
1891        }
1892        for certificate in pre_shutdown_certificates.iter() {
1893            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1894        }
1895        // Initialize the bootup BFT.
1896        let account = Account::try_from(private_keys[0])?;
1897        let bootup_bft =
1898            BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1899        // Insert a mock DAG in the BFT without bootup.
1900        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1901        // Sync the BFT DAG at bootup.
1902        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1903
1904        // Insert the post shutdown certificates into the storage.
1905        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1906            Vec::new();
1907        for j in commit_round..=commit_round + 2 {
1908            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1909            post_shutdown_certificates.extend(certificate);
1910        }
1911        for certificate in post_shutdown_certificates.iter() {
1912            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1913        }
1914
1915        // Insert the post shutdown certificates into the DAG.
1916        for certificate in post_shutdown_certificates.clone() {
1917            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1918        }
1919
1920        // Get the next leader certificate to commit.
1921        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1922        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1923        let committed_certificates = commit_subdag.values().flatten();
1924
1925        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1926        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1927            for committed_certificate in committed_certificates.clone() {
1928                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1929            }
1930        }
1931        Ok(())
1932    }
1933}