amareleo_node_bft/
bft.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    Primary,
19    helpers::{
20        BFTReceiver,
21        ConsensusSender,
22        DAG,
23        PrimaryReceiver,
24        PrimarySender,
25        Storage,
26        fmt_id,
27        init_bft_channels,
28        now,
29    },
30};
31use aleo_std::StorageMode;
32use amareleo_chain_account::Account;
33use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
34use amareleo_node_bft_ledger_service::LedgerService;
35use snarkvm::{
36    console::account::Address,
37    ledger::{
38        block::Transaction,
39        committee::Committee,
40        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
41        puzzle::{Solution, SolutionID},
42    },
43    prelude::{Field, Network, Result, bail, ensure},
44};
45
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48#[cfg(feature = "locktick")]
49use locktick::{
50    parking_lot::{Mutex, RwLock},
51    tokio::Mutex as TMutex,
52};
53#[cfg(not(feature = "locktick"))]
54use parking_lot::{Mutex, RwLock};
55use std::{
56    collections::{BTreeMap, HashSet},
57    future::Future,
58    sync::{
59        Arc,
60        atomic::{AtomicI64, Ordering},
61    },
62};
63#[cfg(not(feature = "locktick"))]
64use tokio::sync::Mutex as TMutex;
65use tokio::{
66    sync::{OnceCell, oneshot},
67    task::JoinHandle,
68};
69use tracing::subscriber::DefaultGuard;
70
71#[derive(Clone)]
72pub struct BFT<N: Network> {
73    /// The primary.
74    primary: Primary<N>,
75    /// The DAG.
76    dag: Arc<RwLock<DAG<N>>>,
77    /// The batch certificate of the leader from the current even round, if one was present.
78    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
79    /// The timer for the leader certificate to be received.
80    leader_certificate_timer: Arc<AtomicI64>,
81    /// The consensus sender.
82    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
83    /// Tracing handle
84    tracing: Option<TracingHandler>,
85    /// The spawned handles.
86    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
87    /// The BFT lock.
88    bft_lock: Arc<TMutex<()>>,
89}
90
91impl<N: Network> TracingHandlerGuard for BFT<N> {
92    /// Retruns tracing guard
93    fn get_tracing_guard(&self) -> Option<DefaultGuard> {
94        self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
95    }
96}
97
98impl<N: Network> BFT<N> {
99    /// Initializes a new instance of the BFT.
100    pub fn new(
101        account: Account<N>,
102        storage: Storage<N>,
103        keep_state: bool,
104        storage_mode: StorageMode,
105        ledger: Arc<dyn LedgerService<N>>,
106        tracing: Option<TracingHandler>,
107    ) -> Result<Self> {
108        Ok(Self {
109            primary: Primary::new(account, storage, keep_state, storage_mode, ledger, tracing.clone())?,
110            dag: Default::default(),
111            leader_certificate: Default::default(),
112            leader_certificate_timer: Default::default(),
113            consensus_sender: Default::default(),
114            tracing: tracing.clone(),
115            handles: Default::default(),
116            bft_lock: Default::default(),
117        })
118    }
119
120    /// Run the BFT instance.
121    pub async fn run(
122        &mut self,
123        consensus_sender: Option<ConsensusSender<N>>,
124        primary_sender: PrimarySender<N>,
125        primary_receiver: PrimaryReceiver<N>,
126    ) -> Result<()> {
127        guard_info!(self, "Starting the BFT instance...");
128
129        // Initialize communications between BFT and Primary.
130        let (bft_sender, bft_receiver) = init_bft_channels::<N>();
131
132        // Start the BFT handlers.
133        self.start_handlers(bft_receiver);
134
135        // Run the primary instance.
136        let result = self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await;
137        if let Err(err) = result {
138            guard_error!(self, "BFT failed to run the primary instance - {err}");
139            self.shut_down().await;
140            return Err(err);
141        }
142
143        // Lastly, set the consensus sender.
144        // Note: This ensures that during initial syncing,
145        // the BFT does not advance the ledger.
146        if let Some(consensus_sender) = consensus_sender {
147            let result = self.consensus_sender.set(consensus_sender);
148            if result.is_err() {
149                self.shut_down().await;
150                guard_error!(self, "Consensus sender already set");
151                bail!("Consensus sender already set");
152            }
153        }
154        Ok(())
155    }
156
157    /// Returns `true` if the primary is synced.
158    pub fn is_synced(&self) -> bool {
159        self.primary.is_synced()
160    }
161
162    /// Returns the primary.
163    pub const fn primary(&self) -> &Primary<N> {
164        &self.primary
165    }
166
167    /// Returns the storage.
168    pub const fn storage(&self) -> &Storage<N> {
169        self.primary.storage()
170    }
171
172    /// Returns the ledger.
173    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
174        self.primary.ledger()
175    }
176
177    /// Returns the leader of the current even round, if one was present.
178    pub fn leader(&self) -> Option<Address<N>> {
179        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
180    }
181
182    /// Returns the certificate of the leader from the current even round, if one was present.
183    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
184        &self.leader_certificate
185    }
186}
187
188impl<N: Network> BFT<N> {
189    /// Returns the number of unconfirmed transmissions.
190    pub fn num_unconfirmed_transmissions(&self) -> usize {
191        self.primary.num_unconfirmed_transmissions()
192    }
193
194    /// Returns the number of unconfirmed ratifications.
195    pub fn num_unconfirmed_ratifications(&self) -> usize {
196        self.primary.num_unconfirmed_ratifications()
197    }
198
199    /// Returns the number of solutions.
200    pub fn num_unconfirmed_solutions(&self) -> usize {
201        self.primary.num_unconfirmed_solutions()
202    }
203
204    /// Returns the number of unconfirmed transactions.
205    pub fn num_unconfirmed_transactions(&self) -> usize {
206        self.primary.num_unconfirmed_transactions()
207    }
208}
209
210impl<N: Network> BFT<N> {
211    /// Returns the worker transmission IDs.
212    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
213        self.primary.worker_transmission_ids()
214    }
215
216    /// Returns the worker transmissions.
217    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
218        self.primary.worker_transmissions()
219    }
220
221    /// Returns the worker solutions.
222    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
223        self.primary.worker_solutions()
224    }
225
226    /// Returns the worker transactions.
227    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
228        self.primary.worker_transactions()
229    }
230}
231
232impl<N: Network> BFT<N> {
233    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
234    fn update_to_next_round(&self, current_round: u64) -> bool {
235        // Ensure the current round is at least the storage round (this is a sanity check).
236        let storage_round = self.storage().current_round();
237        if current_round < storage_round {
238            guard_debug!(
239                self,
240                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
241            );
242            return false;
243        }
244
245        // Determine if the BFT is ready to update to the next round.
246        let is_ready = match current_round % 2 == 0 {
247            true => self.update_leader_certificate_to_even_round(current_round),
248            false => self.is_leader_quorum_or_nonleaders_available(current_round),
249        };
250
251        #[cfg(feature = "metrics")]
252        {
253            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
254            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
255            if start > 0 {
256                let end = now();
257                let elapsed = std::time::Duration::from_secs((end - start) as u64);
258                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
259            }
260        }
261
262        // Log whether the round is going to update.
263        if current_round % 2 == 0 {
264            // Determine if there is a leader certificate.
265            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
266                // Ensure the state of the leader certificate is consistent with the BFT being ready.
267                if !is_ready {
268                    guard_trace!(self, is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
269                }
270                // Log the leader election.
271                let leader_round = leader_certificate.round();
272                match leader_round == current_round {
273                    true => {
274                        guard_info!(
275                            self,
276                            "\n\nRound {current_round} elected a leader - {}\n",
277                            leader_certificate.author()
278                        );
279                        #[cfg(feature = "metrics")]
280                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
281                    }
282                    false => {
283                        guard_warn!(self, "BFT failed to elect a leader for round {current_round} (!= {leader_round})")
284                    }
285                }
286            } else {
287                match is_ready {
288                    true => guard_info!(self, "\n\nRound {current_round} reached quorum without a leader\n"),
289                    false => {
290                        guard_info!(self, "{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed())
291                    }
292                }
293            }
294        }
295
296        // If the BFT is ready, then update to the next round.
297        if is_ready {
298            // Update to the next round in storage.
299            if let Err(e) = self.storage().increment_to_next_round(current_round) {
300                guard_warn!(self, "BFT failed to increment to the next round from round {current_round} - {e}");
301                return false;
302            }
303            // Update the timer for the leader certificate.
304            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
305        }
306
307        is_ready
308    }
309
310    /// Updates the leader certificate to the current even round,
311    /// returning `true` if the BFT is ready to update to the next round.
312    ///
313    /// This method runs on every even round, by determining the leader of the current even round,
314    /// and setting the leader certificate to their certificate in the round, if they were present.
315    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
316        // Retrieve the current round.
317        let current_round = self.storage().current_round();
318        // Ensure the current round matches the given round.
319        if current_round != even_round {
320            guard_warn!(
321                self,
322                "BFT storage (at round {current_round}) is out of sync with the current even round {even_round}"
323            );
324            return false;
325        }
326
327        // If the current round is odd, return false.
328        if current_round % 2 != 0 || current_round < 2 {
329            guard_error!(self, "BFT cannot update the leader certificate in an odd round");
330            return false;
331        }
332
333        // Retrieve the certificates for the current round.
334        let current_certificates = self.storage().get_certificates_for_round(current_round);
335        // If there are no current certificates, set the leader certificate to 'None', and return early.
336        if current_certificates.is_empty() {
337            // Set the leader certificate to 'None'.
338            *self.leader_certificate.write() = None;
339            return false;
340        }
341
342        // Retrieve the committee lookback of the current round.
343        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
344            Ok(committee) => committee,
345            Err(e) => {
346                guard_error!(
347                    self,
348                    "BFT failed to retrieve the committee lookback for the even round {current_round} - {e}"
349                );
350                return false;
351            }
352        };
353        // Determine the leader of the current round.
354        let leader = match self.ledger().latest_leader() {
355            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
356            _ => {
357                // Compute the leader for the current round.
358                let computed_leader = self.primary.account().address();
359                // let computed_leader = match committee_lookback.get_leader(current_round) {
360                //     Ok(leader) => leader,
361                //     Err(e) => {
362                //         guard_error!(self, "BFT failed to compute the leader for the even round {current_round} - {e}");
363                //         return false;
364                //     }
365                // };
366
367                // Cache the computed leader.
368                self.ledger().update_latest_leader(current_round, computed_leader);
369
370                computed_leader
371            }
372        };
373        // Find and set the leader certificate, if the leader was present in the current even round.
374        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
375        *self.leader_certificate.write() = leader_certificate.cloned();
376
377        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
378    }
379
380    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
381    ///  - If the leader certificate is set for the current even round.
382    ///  - The timer for the leader certificate has expired.
383    fn is_even_round_ready_for_next_round(
384        &self,
385        certificates: IndexSet<BatchCertificate<N>>,
386        committee: Committee<N>,
387        current_round: u64,
388    ) -> bool {
389        // Retrieve the authors for the current round.
390        let authors = certificates.into_iter().map(|c| c.author()).collect();
391        // Check if quorum threshold is reached.
392        if !committee.is_quorum_threshold_reached(&authors) {
393            guard_trace!(self, "BFT failed to reach quorum threshold in even round {current_round}");
394            return false;
395        }
396        // If the leader certificate is set for the current even round, return 'true'.
397        if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
398            if leader_certificate.round() == current_round {
399                return true;
400            }
401        }
402        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
403        if self.is_timer_expired() {
404            guard_debug!(
405                self,
406                "BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)"
407            );
408            return true;
409        }
410        // Otherwise, return 'false'.
411        false
412    }
413
414    /// Returns `true` if the timer for the leader certificate has expired.
415    fn is_timer_expired(&self) -> bool {
416        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
417    }
418
419    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
420    ///  - The leader certificate is `None`.
421    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
422    ///  - The leader certificate timer has expired.
423    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
424        // Retrieve the current round.
425        let current_round = self.storage().current_round();
426        // Ensure the current round matches the given round.
427        if current_round != odd_round {
428            guard_warn!(
429                self,
430                "BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}"
431            );
432            return false;
433        }
434        // If the current round is even, return false.
435        if current_round % 2 != 1 {
436            guard_error!(self, "BFT does not compute stakes for the leader certificate in an even round");
437            return false;
438        }
439        // Retrieve the certificates for the current round.
440        let current_certificates = self.storage().get_certificates_for_round(current_round);
441        // Retrieve the committee lookback for the current round.
442        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
443            Ok(committee) => committee,
444            Err(e) => {
445                guard_error!(
446                    self,
447                    "BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}"
448                );
449                return false;
450            }
451        };
452        // Retrieve the authors of the current certificates.
453        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
454        // Check if quorum threshold is reached.
455        if !committee_lookback.is_quorum_threshold_reached(&authors) {
456            guard_trace!(self, "BFT failed reach quorum threshold in odd round {current_round}. ");
457            return false;
458        }
459        // Retrieve the leader certificate.
460        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
461            // If there is no leader certificate for the previous round, return 'true'.
462            return true;
463        };
464        // Compute the stake for the leader certificate.
465        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
466            leader_certificate.id(),
467            current_certificates,
468            &committee_lookback,
469        );
470        // Return 'true' if any of the following conditions hold:
471        stake_with_leader >= committee_lookback.availability_threshold()
472            || stake_without_leader >= committee_lookback.quorum_threshold()
473            || self.is_timer_expired()
474    }
475
476    /// Computes the amount of stake that has & has not signed for the leader certificate.
477    fn compute_stake_for_leader_certificate(
478        &self,
479        leader_certificate_id: Field<N>,
480        current_certificates: IndexSet<BatchCertificate<N>>,
481        current_committee: &Committee<N>,
482    ) -> (u64, u64) {
483        // If there are no current certificates, return early.
484        if current_certificates.is_empty() {
485            return (0, 0);
486        }
487
488        // Initialize a tracker for the stake with the leader.
489        let mut stake_with_leader = 0u64;
490        // Initialize a tracker for the stake without the leader.
491        let mut stake_without_leader = 0u64;
492        // Iterate over the current certificates.
493        for certificate in current_certificates {
494            // Retrieve the stake for the author of the certificate.
495            let stake = current_committee.get_stake(certificate.author());
496            // Determine if the certificate includes the leader.
497            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
498                // If the certificate includes the leader, add the stake to the stake with the leader.
499                true => stake_with_leader = stake_with_leader.saturating_add(stake),
500                // If the certificate does not include the leader, add the stake to the stake without the leader.
501                false => stake_without_leader = stake_without_leader.saturating_add(stake),
502            }
503        }
504        // Return the stake with the leader, and the stake without the leader.
505        (stake_with_leader, stake_without_leader)
506    }
507}
508
509impl<N: Network> BFT<N> {
510    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
511    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
512        &self,
513        certificate: BatchCertificate<N>,
514    ) -> Result<()> {
515        // Acquire the BFT lock.
516        let _lock = self.bft_lock.lock().await;
517        // Retrieve the certificate round.
518        let certificate_round = certificate.round();
519        // Insert the certificate into the DAG.
520        self.dag.write().insert(certificate, self);
521
522        // Construct the commit round.
523        let commit_round = certificate_round.saturating_sub(1);
524        // If the commit round is odd, return early.
525        if commit_round % 2 != 0 || commit_round < 2 {
526            return Ok(());
527        }
528        // If the commit round is at or below the last committed round, return early.
529        if commit_round <= self.dag.read().last_committed_round() {
530            return Ok(());
531        }
532
533        /* Proceeding to check if the leader is ready to be committed. */
534        guard_info!(self, "Checking if the leader is ready to be committed for round {commit_round}...");
535
536        // Retrieve the committee lookback for the commit round.
537        let Ok(_committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
538            bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
539        };
540
541        // Either retrieve the cached leader or compute it.
542        let leader = match self.ledger().latest_leader() {
543            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
544            _ => {
545                // Compute the leader for the commit round.
546                let computed_leader = self.primary.account().address();
547                // let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
548                //     bail!("BFT failed to compute the leader for commit round {commit_round}");
549                // };
550
551                // Cache the computed leader.
552                self.ledger().update_latest_leader(commit_round, computed_leader);
553
554                computed_leader
555            }
556        };
557
558        // Retrieve the leader certificate for the commit round.
559        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
560        else {
561            guard_trace!(self, "BFT did not find the leader certificate for commit round {commit_round} yet");
562            return Ok(());
563        };
564        // Retrieve all of the certificates for the **certificate** round.
565        let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
566            // TODO (howardwu): Investigate how many certificates we should have at this point.
567            bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
568        };
569        // Retrieve the committee lookback for the certificate round (i.e. the round just after the commit round).
570        let Ok(certificate_committee_lookback) = self.ledger().get_committee_lookback_for_round(certificate_round)
571        else {
572            bail!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}");
573        };
574        // Construct a set over the authors who included the leader's certificate in the certificate round.
575        let authors = certificates
576            .values()
577            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
578                true => Some(c.author()),
579                false => None,
580            })
581            .collect();
582        // Check if the leader is ready to be committed.
583        if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
584            // If the leader is not ready to be committed, return early.
585            guard_trace!(self, "BFT is not ready to commit {commit_round}");
586            return Ok(());
587        }
588
589        /* Proceeding to commit the leader. */
590        guard_info!(self, "Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
591
592        // Commit the leader certificate, and all previous leader certificates since the last committed round.
593        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
594    }
595
596    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
597    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
598        &self,
599        leader_certificate: BatchCertificate<N>,
600    ) -> Result<()> {
601        // Fetch the leader round.
602        let latest_leader_round = leader_certificate.round();
603        // Determine the list of all previous leader certificates since the last committed round.
604        // The order of the leader certificates is from **newest** to **oldest**.
605        let mut leader_certificates = vec![leader_certificate.clone()];
606        {
607            // Retrieve the leader round.
608            let leader_round = leader_certificate.round();
609
610            let mut current_certificate = leader_certificate;
611            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
612            {
613                // // Retrieve the previous committee for the leader round.
614                // let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
615                //     Ok(committee) => committee,
616                //     Err(e) => {
617                //         bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
618                //     }
619                // };
620
621                // Either retrieve the cached leader or compute it.
622                let leader = match self.ledger().latest_leader() {
623                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
624                    _ => {
625                        // Compute the leader for the commit round.
626                        let computed_leader = self.primary.account().address();
627                        // let computed_leader = match previous_committee_lookback.get_leader(round) {
628                        //     Ok(leader) => leader,
629                        //     Err(e) => {
630                        //         bail!("BFT failed to compute the leader for the even round {round} - {e}");
631                        //     }
632                        // };
633
634                        // Cache the computed leader.
635                        self.ledger().update_latest_leader(round, computed_leader);
636
637                        computed_leader
638                    }
639                };
640                // Retrieve the previous leader certificate.
641                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
642                else {
643                    continue;
644                };
645                // Determine if there is a path between the previous certificate and the current certificate.
646                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
647                    // Add the previous leader certificate to the list of certificates to commit.
648                    leader_certificates.push(previous_certificate.clone());
649                    // Update the current certificate to the previous leader certificate.
650                    current_certificate = previous_certificate;
651                }
652            }
653        }
654
655        // Iterate over the leader certificates to commit.
656        for leader_certificate in leader_certificates.into_iter().rev() {
657            // Retrieve the leader certificate round.
658            let leader_round = leader_certificate.round();
659            // Compute the commit subdag.
660            let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
661                Ok(subdag) => subdag,
662                Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
663            };
664            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
665            if !IS_SYNCING {
666                // Initialize a map for the deduped transmissions.
667                let mut transmissions = IndexMap::new();
668                // Initialize a map for the deduped transaction ids.
669                let mut seen_transaction_ids = IndexSet::new();
670                // Initialize a map for the deduped solution ids.
671                let mut seen_solution_ids = IndexSet::new();
672                // Start from the oldest leader certificate.
673                for certificate in commit_subdag.values().flatten() {
674                    // Retrieve the transmissions.
675                    for transmission_id in certificate.transmission_ids() {
676                        // If the transaction ID or solution ID already exists in the map, skip it.
677                        // Note: This additional check is done to ensure that we do not include duplicate
678                        // transaction IDs or solution IDs that may have a different transmission ID.
679                        match transmission_id {
680                            TransmissionID::Solution(solution_id, _) => {
681                                // If the solution already exists, skip it.
682                                if seen_solution_ids.contains(&solution_id) {
683                                    continue;
684                                }
685                            }
686                            TransmissionID::Transaction(transaction_id, _) => {
687                                // If the transaction already exists, skip it.
688                                if seen_transaction_ids.contains(transaction_id) {
689                                    continue;
690                                }
691                            }
692                            TransmissionID::Ratification => {
693                                bail!("Ratifications are currently not supported in the BFT.")
694                            }
695                        }
696                        // If the transmission already exists in the map, skip it.
697                        if transmissions.contains_key(transmission_id) {
698                            continue;
699                        }
700                        // If the transmission already exists in the ledger, skip it.
701                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
702                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
703                            continue;
704                        }
705                        // Retrieve the transmission.
706                        let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
707                            bail!(
708                                "BFT failed to retrieve transmission '{}.{}' from round {}",
709                                fmt_id(transmission_id),
710                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
711                                certificate.round()
712                            );
713                        };
714                        // Insert the transaction ID or solution ID into the map.
715                        match transmission_id {
716                            TransmissionID::Solution(id, _) => {
717                                seen_solution_ids.insert(id);
718                            }
719                            TransmissionID::Transaction(id, _) => {
720                                seen_transaction_ids.insert(id);
721                            }
722                            TransmissionID::Ratification => {}
723                        }
724                        // Add the transmission to the set.
725                        transmissions.insert(*transmission_id, transmission);
726                    }
727                }
728                // Trigger consensus, as this will build a new block for the ledger.
729                // Construct the subdag.
730                let subdag = Subdag::from(commit_subdag.clone())?;
731                // Retrieve the anchor round.
732                let anchor_round = subdag.anchor_round();
733                // Retrieve the number of transmissions.
734                let num_transmissions = transmissions.len();
735                // Retrieve metadata about the subdag.
736                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
737
738                // Ensure the subdag anchor round matches the leader round.
739                ensure!(
740                    anchor_round == leader_round,
741                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
742                );
743
744                // Trigger consensus.
745                if let Some(consensus_sender) = self.consensus_sender.get() {
746                    // Initialize a callback sender and receiver.
747                    let (callback_sender, callback_receiver) = oneshot::channel();
748                    // Send the subdag and transmissions to consensus.
749                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
750                    // Await the callback to continue.
751                    match callback_receiver.await {
752                        Ok(Ok(())) => (), // continue
753                        Ok(Err(e)) => {
754                            guard_error!(self, "BFT failed to advance the subdag for round {anchor_round} - {e}");
755                            return Ok(());
756                        }
757                        Err(e) => {
758                            guard_error!(self, "BFT failed to receive the callback for round {anchor_round} - {e}");
759                            return Ok(());
760                        }
761                    }
762                }
763
764                guard_info!(
765                    self,
766                    "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
767                );
768            }
769
770            // Update the DAG, as the subdag was successfully included into a block.
771            let mut dag_write = self.dag.write();
772            for certificate in commit_subdag.values().flatten() {
773                dag_write.commit(certificate, self.storage().max_gc_rounds());
774            }
775        }
776
777        // Perform garbage collection based on the latest committed leader round.
778        self.storage().garbage_collect_certificates(latest_leader_round);
779
780        Ok(())
781    }
782
783    /// Returns the subdag of batch certificates to commit.
784    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
785        &self,
786        leader_certificate: BatchCertificate<N>,
787    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
788        // Initialize a map for the certificates to commit.
789        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
790        // Initialize a set for the already ordered certificates.
791        let mut already_ordered = HashSet::new();
792        // Initialize a buffer for the certificates to order.
793        let mut buffer = vec![leader_certificate];
794        // Iterate over the certificates to order.
795        while let Some(certificate) = buffer.pop() {
796            // Insert the certificate into the map.
797            commit.entry(certificate.round()).or_default().insert(certificate.clone());
798
799            // Check if the previous certificate is below the GC round.
800            let previous_round = certificate.round().saturating_sub(1);
801            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
802                continue;
803            }
804            // Iterate over the previous certificate IDs.
805            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
806            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
807            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
808                // If the previous certificate is already ordered, continue.
809                if already_ordered.contains(previous_certificate_id) {
810                    continue;
811                }
812                // If the previous certificate was recently committed, continue.
813                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
814                    continue;
815                }
816                // If the previous certificate already exists in the ledger, continue.
817                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
818                    continue;
819                }
820
821                // Retrieve the previous certificate.
822                let previous_certificate = {
823                    // Start by retrieving the previous certificate from the DAG.
824                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
825                        // If the previous certificate is found, return it.
826                        Some(previous_certificate) => previous_certificate,
827                        // If the previous certificate is not found, retrieve it from the storage.
828                        None => match self.storage().get_certificate(*previous_certificate_id) {
829                            // If the previous certificate is found, return it.
830                            Some(previous_certificate) => previous_certificate,
831                            // Otherwise, the previous certificate is missing, and throw an error.
832                            None => bail!(
833                                "Missing previous certificate {} for round {previous_round}",
834                                fmt_id(previous_certificate_id)
835                            ),
836                        },
837                    }
838                };
839                // Insert the previous certificate into the set of already ordered certificates.
840                already_ordered.insert(previous_certificate.id());
841                // Insert the previous certificate into the buffer.
842                buffer.push(previous_certificate);
843            }
844        }
845        // Ensure we only retain certificates that are above the GC round.
846        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
847        // Return the certificates to commit.
848        Ok(commit)
849    }
850
851    /// Returns `true` if there is a path from the previous certificate to the current certificate.
852    fn is_linked(
853        &self,
854        previous_certificate: BatchCertificate<N>,
855        current_certificate: BatchCertificate<N>,
856    ) -> Result<bool> {
857        // Initialize the list containing the traversal.
858        let mut traversal = vec![current_certificate.clone()];
859        // Iterate over the rounds from the current certificate to the previous certificate.
860        for round in (previous_certificate.round()..current_certificate.round()).rev() {
861            // Retrieve all of the certificates for this past round.
862            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
863                // This is a critical error, as the traversal should have these certificates.
864                // If this error is hit, it is likely that the maximum GC rounds should be increased.
865                bail!("BFT failed to retrieve the certificates for past round {round}");
866            };
867            // Filter the certificates to only include those that are in the traversal.
868            traversal = certificates
869                .into_values()
870                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
871                .collect();
872        }
873        Ok(traversal.contains(&previous_certificate))
874    }
875}
876
877impl<N: Network> BFT<N> {
878    /// Starts the BFT handlers.
879    fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
880        let BFTReceiver { mut rx_primary_round, mut rx_primary_certificate, mut rx_sync_bft_dag_at_bootup } =
881            bft_receiver;
882
883        // Process the current round from the primary.
884        let self_ = self.clone();
885        self.spawn(async move {
886            while let Some((current_round, callback)) = rx_primary_round.recv().await {
887                callback.send(self_.update_to_next_round(current_round)).ok();
888            }
889        });
890
891        // Process the certificate from the primary.
892        let self_ = self.clone();
893        self.spawn(async move {
894            while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
895                // Update the DAG with the certificate.
896                let result = self_.update_dag::<true, false>(certificate).await;
897                // Send the callback **after** updating the DAG.
898                // Note: We must await the DAG update before proceeding.
899                callback.send(result).ok();
900            }
901        });
902
903        // Process the request to sync the BFT DAG at bootup.
904        let self_ = self.clone();
905        self.spawn(async move {
906            while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
907                self_.sync_bft_dag_at_bootup(certificates).await;
908            }
909        });
910    }
911
912    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
913    /// already exist in the ledger.
914    ///
915    /// This method commits all the certificates into the DAG.
916    /// Note that there is no need to insert the certificates into the DAG, because these certificates
917    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
918    async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
919        // Acquire the BFT write lock.
920        let mut dag = self.dag.write();
921
922        // Commit all the certificates.
923        for certificate in certificates {
924            dag.commit(&certificate, self.storage().max_gc_rounds());
925        }
926    }
927
928    /// Spawns a task with the given future; it should only be used for long-running tasks.
929    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
930        self.handles.lock().push(tokio::spawn(future));
931    }
932
933    /// Shuts down the BFT.
934    pub async fn shut_down(&self) {
935        guard_info!(self, "Shutting down the BFT...");
936        // Acquire the lock.
937        let _lock = self.bft_lock.lock().await;
938
939        // Shut down the primary.
940        self.primary.shut_down().await;
941
942        // Abort BFT tasks.
943        let mut handles = self.handles.lock();
944        handles.iter().for_each(|handle| handle.abort());
945        handles.clear();
946    }
947}
948
949#[cfg(test)]
950mod tests {
951    use crate::{
952        BFT,
953        DEVELOPMENT_MODE_RNG_SEED,
954        MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
955        helpers::{Storage, amareleo_storage_mode, default_ledger_dir},
956    };
957
958    use amareleo_chain_account::Account;
959    use amareleo_node_bft_ledger_service::MockLedgerService;
960    use amareleo_node_bft_storage_service::BFTMemoryService;
961    use snarkvm::{
962        console::account::{Address, PrivateKey},
963        ledger::{
964            committee::Committee,
965            narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
966        },
967        utilities::TestRng,
968    };
969
970    use aleo_std::StorageMode;
971    use anyhow::Result;
972    use indexmap::{IndexMap, IndexSet};
973    use rand::SeedableRng;
974    use rand_chacha::ChaChaRng;
975    use std::sync::Arc;
976
977    type CurrentNetwork = snarkvm::console::network::MainnetV0;
978
979    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
980    fn sample_test_instance(
981        committee_round: Option<u64>,
982        max_gc_rounds: u64,
983        rng: &mut TestRng,
984    ) -> (
985        Committee<CurrentNetwork>,
986        Account<CurrentNetwork>,
987        Arc<MockLedgerService<CurrentNetwork>>,
988        Storage<CurrentNetwork>,
989    ) {
990        let committee = match committee_round {
991            Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
992            None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
993        };
994        let account = Account::new(rng).unwrap();
995        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
996        let transmissions = Arc::new(BFTMemoryService::new());
997        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
998
999        (committee, account, ledger, storage)
1000    }
1001
1002    #[test]
1003    #[tracing_test::traced_test]
1004    fn test_is_leader_quorum_odd() -> Result<()> {
1005        let rng = &mut TestRng::default();
1006
1007        // Sample batch certificates.
1008        let mut certificates = IndexSet::new();
1009        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1010        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1011        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1012        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1013
1014        // Initialize the committee.
1015        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1016            1,
1017            vec![
1018                certificates[0].author(),
1019                certificates[1].author(),
1020                certificates[2].author(),
1021                certificates[3].author(),
1022            ],
1023            rng,
1024        );
1025
1026        // Initialize the ledger.
1027        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1028        // Initialize the storage.
1029        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1030        // Initialize the account.
1031        let account = Account::new(rng)?;
1032        // Initialize the BFT.
1033        let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1034        assert!(bft.is_timer_expired());
1035        // Ensure this call succeeds on an odd round.
1036        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1037        // If timer has expired but quorum threshold is not reached, return 'false'.
1038        assert!(!result);
1039        // Insert certificates into storage.
1040        for certificate in certificates.iter() {
1041            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1042        }
1043        // Ensure this call succeeds on an odd round.
1044        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1045        assert!(result); // no previous leader certificate
1046        // Set the leader certificate.
1047        let leader_certificate = sample_batch_certificate(rng);
1048        *bft.leader_certificate.write() = Some(leader_certificate);
1049        // Ensure this call succeeds on an odd round.
1050        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1051        assert!(result); // should now fall through to the end of function
1052
1053        Ok(())
1054    }
1055
1056    #[test]
1057    #[tracing_test::traced_test]
1058    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1059        let rng = &mut TestRng::default();
1060
1061        // Sample the test instance.
1062        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1063        assert_eq!(committee.starting_round(), 1);
1064        assert_eq!(storage.current_round(), 1);
1065        assert_eq!(storage.max_gc_rounds(), 10);
1066
1067        // Initialize the BFT.
1068        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1069        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1070
1071        // Store is at round 1, and we are checking for round 2.
1072        // Ensure this call fails on an even round.
1073        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1074        assert!(!result);
1075        Ok(())
1076    }
1077
1078    #[test]
1079    #[tracing_test::traced_test]
1080    fn test_is_leader_quorum_even() -> Result<()> {
1081        let rng = &mut TestRng::default();
1082
1083        // Sample the test instance.
1084        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1085        assert_eq!(committee.starting_round(), 2);
1086        assert_eq!(storage.current_round(), 2);
1087        assert_eq!(storage.max_gc_rounds(), 10);
1088
1089        // Initialize the BFT.
1090        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1091        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1092
1093        // Ensure this call fails on an even round.
1094        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1095        assert!(!result);
1096        Ok(())
1097    }
1098
1099    #[test]
1100    #[tracing_test::traced_test]
1101    fn test_is_even_round_ready() -> Result<()> {
1102        let rng = &mut TestRng::default();
1103
1104        // Sample batch certificates.
1105        let mut certificates = IndexSet::new();
1106        certificates.insert(sample_batch_certificate_for_round(2, rng));
1107        certificates.insert(sample_batch_certificate_for_round(2, rng));
1108        certificates.insert(sample_batch_certificate_for_round(2, rng));
1109        certificates.insert(sample_batch_certificate_for_round(2, rng));
1110
1111        // Initialize the committee.
1112        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1113            2,
1114            vec![
1115                certificates[0].author(),
1116                certificates[1].author(),
1117                certificates[2].author(),
1118                certificates[3].author(),
1119            ],
1120            rng,
1121        );
1122
1123        // Initialize the ledger.
1124        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1125        // Initialize the storage.
1126        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1127        // Initialize the account.
1128        let account = Account::new(rng)?;
1129        // Initialize the BFT.
1130        let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1131        // Set the leader certificate.
1132        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1133        *bft.leader_certificate.write() = Some(leader_certificate);
1134        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1135        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1136        assert!(!result);
1137        // Once quorum threshold is reached, we are ready for the next round.
1138        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1139        assert!(result);
1140
1141        // Initialize a new BFT.
1142        let bft_timer =
1143            BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1144        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1145        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1146        if !bft_timer.is_timer_expired() {
1147            assert!(!result);
1148        }
1149        // Wait for the timer to expire.
1150        let leader_certificate_timeout =
1151            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1152        std::thread::sleep(leader_certificate_timeout);
1153        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1154        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1155        if bft_timer.is_timer_expired() {
1156            assert!(result);
1157        } else {
1158            assert!(!result);
1159        }
1160
1161        Ok(())
1162    }
1163
1164    #[test]
1165    #[tracing_test::traced_test]
1166    fn test_update_leader_certificate_odd() -> Result<()> {
1167        let rng = &mut TestRng::default();
1168
1169        // Sample the test instance.
1170        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1171        assert_eq!(storage.max_gc_rounds(), 10);
1172
1173        // Initialize the BFT.
1174        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1175
1176        // Ensure this call fails on an odd round.
1177        let result = bft.update_leader_certificate_to_even_round(1);
1178        assert!(!result);
1179        Ok(())
1180    }
1181
1182    #[test]
1183    #[tracing_test::traced_test]
1184    fn test_update_leader_certificate_bad_round() -> Result<()> {
1185        let rng = &mut TestRng::default();
1186
1187        // Sample the test instance.
1188        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1189        assert_eq!(storage.max_gc_rounds(), 10);
1190
1191        // Initialize the BFT.
1192        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1193
1194        // Ensure this call succeeds on an even round.
1195        let result = bft.update_leader_certificate_to_even_round(6);
1196        assert!(!result);
1197        Ok(())
1198    }
1199
1200    #[test]
1201    #[tracing_test::traced_test]
1202    fn test_update_leader_certificate_even() -> Result<()> {
1203        let rng = &mut TestRng::default();
1204
1205        // Set the current round.
1206        let current_round = 3;
1207
1208        // Sample the certificates.
1209        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1210            current_round,
1211            rng,
1212        );
1213
1214        // Initialize the committee.
1215        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1216            2,
1217            vec![
1218                certificates[0].author(),
1219                certificates[1].author(),
1220                certificates[2].author(),
1221                certificates[3].author(),
1222            ],
1223            rng,
1224        );
1225
1226        // Initialize the ledger.
1227        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1228
1229        // Initialize the storage.
1230        let transmissions = Arc::new(BFTMemoryService::new());
1231        let storage = Storage::new(ledger.clone(), transmissions, 10, None);
1232        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1233        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1234        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1235        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1236        assert_eq!(storage.current_round(), 2);
1237
1238        // Retrieve the leader certificate.
1239        let leader = certificates[0].author(); // committee.get_leader(2).unwrap();
1240        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1241
1242        // Initialize the BFT.
1243        let account = Account::new(rng)?;
1244        let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1245
1246        // Set the leader certificate.
1247        *bft.leader_certificate.write() = Some(leader_certificate);
1248
1249        // Update the leader certificate.
1250        // Ensure this call succeeds on an even round.
1251        let result = bft.update_leader_certificate_to_even_round(2);
1252        assert!(result);
1253
1254        Ok(())
1255    }
1256
1257    #[tokio::test]
1258    #[tracing_test::traced_test]
1259    async fn test_order_dag_with_dfs() -> Result<()> {
1260        let rng = &mut TestRng::default();
1261
1262        // Sample the test instance.
1263        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1264
1265        // Initialize the round parameters.
1266        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1267        let current_round = previous_round + 1;
1268
1269        // Sample the current certificate and previous certificates.
1270        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1271            current_round,
1272            rng,
1273        );
1274
1275        /* Test GC */
1276
1277        // Ensure the function succeeds in returning only certificates above GC.
1278        {
1279            // Initialize the storage.
1280            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1281            // Initialize the BFT.
1282            let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1283
1284            // Insert a mock DAG in the BFT.
1285            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1286
1287            // Insert the previous certificates into the BFT.
1288            for certificate in previous_certificates.clone() {
1289                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1290            }
1291
1292            // Ensure this call succeeds and returns all given certificates.
1293            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1294            assert!(result.is_ok());
1295            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1296            assert_eq!(candidate_certificates.len(), 1);
1297            let expected_certificates = vec![certificate.clone()];
1298            assert_eq!(
1299                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1300                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1301            );
1302            assert_eq!(candidate_certificates, expected_certificates);
1303        }
1304
1305        /* Test normal case */
1306
1307        // Ensure the function succeeds in returning all given certificates.
1308        {
1309            // Initialize the storage.
1310            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1311            // Initialize the BFT.
1312            let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1313
1314            // Insert a mock DAG in the BFT.
1315            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1316
1317            // Insert the previous certificates into the BFT.
1318            for certificate in previous_certificates.clone() {
1319                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1320            }
1321
1322            // Ensure this call succeeds and returns all given certificates.
1323            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1324            assert!(result.is_ok());
1325            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1326            assert_eq!(candidate_certificates.len(), 5);
1327            let expected_certificates = vec![
1328                previous_certificates[0].clone(),
1329                previous_certificates[1].clone(),
1330                previous_certificates[2].clone(),
1331                previous_certificates[3].clone(),
1332                certificate,
1333            ];
1334            assert_eq!(
1335                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1336                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1337            );
1338            assert_eq!(candidate_certificates, expected_certificates);
1339        }
1340
1341        Ok(())
1342    }
1343
1344    #[test]
1345    #[tracing_test::traced_test]
1346    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1347        let rng = &mut TestRng::default();
1348
1349        // Sample the test instance.
1350        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1351        assert_eq!(committee.starting_round(), 1);
1352        assert_eq!(storage.current_round(), 1);
1353        assert_eq!(storage.max_gc_rounds(), 1);
1354
1355        // Initialize the round parameters.
1356        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1357        let current_round = previous_round + 1;
1358
1359        // Sample the current certificate and previous certificates.
1360        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1361            current_round,
1362            rng,
1363        );
1364        // Construct the previous certificate IDs.
1365        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1366
1367        /* Test missing previous certificate. */
1368
1369        // Initialize the BFT.
1370        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1371
1372        // The expected error message.
1373        let error_msg = format!(
1374            "Missing previous certificate {} for round {previous_round}",
1375            crate::helpers::fmt_id(previous_certificate_ids[3]),
1376        );
1377
1378        // Ensure this call fails on a missing previous certificate.
1379        let result = bft.order_dag_with_dfs::<false>(certificate);
1380        assert!(result.is_err());
1381        assert_eq!(result.unwrap_err().to_string(), error_msg);
1382        Ok(())
1383    }
1384
1385    #[tokio::test]
1386    #[tracing_test::traced_test]
1387    async fn test_bft_gc_on_commit() -> Result<()> {
1388        let rng = &mut TestRng::default();
1389
1390        // Initialize the round parameters.
1391        let max_gc_rounds = 1;
1392        let committee_round = 0;
1393        let commit_round = 2;
1394        let current_round = commit_round + 1;
1395
1396        // Sample the certificates.
1397        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1398            current_round,
1399            rng,
1400        );
1401
1402        // Initialize the committee.
1403        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1404            committee_round,
1405            vec![
1406                certificates[0].author(),
1407                certificates[1].author(),
1408                certificates[2].author(),
1409                certificates[3].author(),
1410            ],
1411            rng,
1412        );
1413
1414        // Initialize the ledger.
1415        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1416
1417        // Initialize the storage.
1418        let transmissions = Arc::new(BFTMemoryService::new());
1419        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
1420        // Insert the certificates into the storage.
1421        for certificate in certificates.iter() {
1422            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1423        }
1424
1425        // Get the leader certificate.
1426        let leader = certificates[0].author(); // committee.get_leader(commit_round).unwrap();
1427        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1428
1429        // Initialize the BFT.
1430        let account = Account::new(rng)?;
1431        let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1432        // Insert a mock DAG in the BFT.
1433        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1434
1435        // Ensure that the `gc_round` has not been updated yet.
1436        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1437
1438        // Insert the certificates into the BFT.
1439        for certificate in certificates {
1440            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1441        }
1442
1443        // Commit the leader certificate.
1444        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1445
1446        // Ensure that the `gc_round` has been updated.
1447        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1448
1449        Ok(())
1450    }
1451
1452    #[tokio::test]
1453    #[tracing_test::traced_test]
1454    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1455        let rng = &mut TestRng::default();
1456
1457        // Initialize the round parameters.
1458        let max_gc_rounds = 1;
1459        let committee_round = 0;
1460        let commit_round = 2;
1461        let current_round = commit_round + 1;
1462
1463        // Sample the current certificate and previous certificates.
1464        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1465            current_round,
1466            rng,
1467        );
1468
1469        // Initialize the committee.
1470        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1471            committee_round,
1472            vec![
1473                certificates[0].author(),
1474                certificates[1].author(),
1475                certificates[2].author(),
1476                certificates[3].author(),
1477            ],
1478            rng,
1479        );
1480
1481        // Initialize the ledger.
1482        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1483
1484        // Initialize the storage.
1485        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1486        // Insert the certificates into the storage.
1487        for certificate in certificates.iter() {
1488            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1489        }
1490
1491        // Get the leader certificate.
1492        let leader = certificates[0].author(); // committee.get_leader(commit_round).unwrap();
1493        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1494
1495        // Initialize the BFT.
1496        let account = Account::new(rng)?;
1497        let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1498
1499        // Insert a mock DAG in the BFT.
1500        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1501
1502        // Insert the previous certificates into the BFT.
1503        for certificate in certificates.clone() {
1504            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1505        }
1506
1507        // Commit the leader certificate.
1508        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1509
1510        // Simulate a bootup of the BFT.
1511
1512        // Initialize a new instance of storage.
1513        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1514        // Initialize a new instance of BFT.
1515        let bootup_bft = BFT::new(account, storage_2, false, StorageMode::Development(0), ledger, None)?;
1516
1517        // Sync the BFT DAG at bootup.
1518        bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1519
1520        // Check that the BFT starts from the same last committed round.
1521        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1522
1523        // Ensure that both BFTs have committed the leader certificate.
1524        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1525        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1526
1527        // Check the state of the bootup BFT.
1528        for certificate in certificates {
1529            let certificate_round = certificate.round();
1530            let certificate_id = certificate.id();
1531            // Check that the bootup BFT has committed the certificates.
1532            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1533            // Check that the bootup BFT does not contain the certificates in its graph, because
1534            // it should not need to order them again in subsequent subdags.
1535            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1536        }
1537
1538        Ok(())
1539    }
1540
1541    #[tokio::test]
1542    #[tracing_test::traced_test]
1543    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1544        /*
1545        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1546        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1547        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1548        */
1549
1550        let rng = &mut TestRng::default();
1551        let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1552
1553        let private_keys = vec![
1554            PrivateKey::new(rng_pks).unwrap(),
1555            PrivateKey::new(rng_pks).unwrap(),
1556            PrivateKey::new(rng_pks).unwrap(),
1557            PrivateKey::new(rng_pks).unwrap(),
1558        ];
1559        let address0 = Address::try_from(private_keys[0])?;
1560
1561        // Initialize the round parameters.
1562        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1563        let committee_round = 0;
1564        let commit_round = 2;
1565        let current_round = commit_round + 1;
1566        let next_round = current_round + 1;
1567
1568        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1569        let (round_to_certificates_map, committee) = {
1570            let addresses = vec![
1571                Address::try_from(private_keys[0])?,
1572                Address::try_from(private_keys[1])?,
1573                Address::try_from(private_keys[2])?,
1574                Address::try_from(private_keys[3])?,
1575            ];
1576            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1577                committee_round,
1578                addresses,
1579                rng,
1580            );
1581            // Initialize a mapping from the round number to the set of batch certificates in the round.
1582            let mut round_to_certificates_map: IndexMap<
1583                u64,
1584                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1585            > = IndexMap::new();
1586            let mut previous_certificates = IndexSet::with_capacity(4);
1587            // Initialize the genesis batch certificates.
1588            for _ in 0..4 {
1589                previous_certificates.insert(sample_batch_certificate(rng));
1590            }
1591            for round in 0..commit_round + 3 {
1592                let mut current_certificates = IndexSet::new();
1593                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1594                    IndexSet::new()
1595                } else {
1596                    previous_certificates.iter().map(|c| c.id()).collect()
1597                };
1598                let transmission_ids =
1599                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1600                        .into_iter()
1601                        .collect::<IndexSet<_>>();
1602                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1603                let committee_id = committee.id();
1604                for (i, private_key_1) in private_keys.iter().enumerate() {
1605                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1606                        private_key_1,
1607                        round,
1608                        timestamp,
1609                        committee_id,
1610                        transmission_ids.clone(),
1611                        previous_certificate_ids.clone(),
1612                        rng,
1613                    )
1614                    .unwrap();
1615                    let mut signatures = IndexSet::with_capacity(4);
1616                    for (j, private_key_2) in private_keys.iter().enumerate() {
1617                        if i != j {
1618                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1619                        }
1620                    }
1621                    let certificate =
1622                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1623                    current_certificates.insert(certificate);
1624                }
1625                // Update the mapping.
1626                round_to_certificates_map.insert(round, current_certificates.clone());
1627                previous_certificates = current_certificates.clone();
1628            }
1629            (round_to_certificates_map, committee)
1630        };
1631
1632        // Initialize the ledger.
1633        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1634        // Initialize the storage.
1635        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1636        // Get the leaders for the next 2 commit rounds.
1637        let leader = address0; // committee.get_leader(commit_round).unwrap();
1638        let next_leader = address0; // committee.get_leader(next_round).unwrap();
1639        // Insert the pre shutdown certificates into the storage.
1640        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1641        for i in 1..=commit_round {
1642            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1643            if i == commit_round {
1644                // Only insert the leader certificate for the commit round.
1645                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1646                if let Some(c) = leader_certificate {
1647                    pre_shutdown_certificates.push(c.clone());
1648                }
1649                continue;
1650            }
1651            pre_shutdown_certificates.extend(certificates);
1652        }
1653        for certificate in pre_shutdown_certificates.iter() {
1654            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1655        }
1656        // Insert the post shutdown certificates into the storage.
1657        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1658            Vec::new();
1659        for j in commit_round..=commit_round + 2 {
1660            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1661            post_shutdown_certificates.extend(certificate);
1662        }
1663        for certificate in post_shutdown_certificates.iter() {
1664            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1665        }
1666        // Get the leader certificates.
1667        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1668        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1669
1670        // Initialize the BFT without bootup.
1671        let account = Account::try_from(private_keys[0])?;
1672        let ledger_dir = default_ledger_dir(1, true, "0");
1673        let storage_mode = amareleo_storage_mode(ledger_dir);
1674        let bft = BFT::new(account.clone(), storage, true, storage_mode.clone(), ledger.clone(), None)?;
1675
1676        // Insert a mock DAG in the BFT without bootup.
1677        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1678
1679        // Insert the certificates into the BFT without bootup.
1680        for certificate in pre_shutdown_certificates.clone() {
1681            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1682        }
1683
1684        // Insert the post shutdown certificates into the BFT without bootup.
1685        for certificate in post_shutdown_certificates.clone() {
1686            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1687        }
1688        // Commit the second leader certificate.
1689        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1690        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1691        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1692
1693        // Simulate a bootup of the BFT.
1694
1695        // Initialize a new instance of storage.
1696        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1697
1698        // Initialize a new instance of BFT with bootup.
1699        let bootup_bft = BFT::new(account, bootup_storage.clone(), true, storage_mode.clone(), ledger.clone(), None)?;
1700
1701        // Sync the BFT DAG at bootup.
1702        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1703
1704        // Insert the post shutdown certificates to the storage and BFT with bootup.
1705        for certificate in post_shutdown_certificates.iter() {
1706            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1707        }
1708        for certificate in post_shutdown_certificates.clone() {
1709            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1710        }
1711        // Commit the second leader certificate.
1712        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1713        let commit_subdag_metadata_bootup =
1714            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1715        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1716        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1717
1718        // Check that the final state of both BFTs is the same.
1719
1720        // Check that both BFTs start from the same last committed round.
1721        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1722
1723        // Ensure that both BFTs have committed the leader certificates.
1724        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1725        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1726        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1727        assert!(
1728            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1729        );
1730
1731        // Check that the bootup BFT has committed the pre shutdown certificates.
1732        for certificate in pre_shutdown_certificates.clone() {
1733            let certificate_round = certificate.round();
1734            let certificate_id = certificate.id();
1735            // Check that both BFTs have committed the certificates.
1736            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1737            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1738            // Check that the bootup BFT does not contain the certificates in its graph, because
1739            // it should not need to order them again in subsequent subdags.
1740            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1741            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1742        }
1743
1744        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1745        for certificate in committed_certificates_bootup.clone() {
1746            let certificate_round = certificate.round();
1747            let certificate_id = certificate.id();
1748            // Check that the both BFTs have committed the certificates.
1749            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1750            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1751            // Check that the bootup BFT does not contain the certificates in its graph, because
1752            // it should not need to order them again in subsequent subdags.
1753            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1754            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1755        }
1756
1757        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1758        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1759
1760        Ok(())
1761    }
1762
1763    #[tokio::test]
1764    #[tracing_test::traced_test]
1765    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1766        /*
1767        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1768        2. Add post shutdown certificates to the bootup BFT.
1769        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1770        */
1771
1772        let rng = &mut TestRng::default();
1773        let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1774
1775        let private_keys = vec![
1776            PrivateKey::new(rng_pks).unwrap(),
1777            PrivateKey::new(rng_pks).unwrap(),
1778            PrivateKey::new(rng_pks).unwrap(),
1779            PrivateKey::new(rng_pks).unwrap(),
1780        ];
1781        let address0 = Address::try_from(private_keys[0])?;
1782
1783        // Initialize the round parameters.
1784        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1785        let committee_round = 0;
1786        let commit_round = 2;
1787        let current_round = commit_round + 1;
1788        let next_round = current_round + 1;
1789
1790        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1791        let (round_to_certificates_map, committee) = {
1792            let addresses = vec![
1793                Address::try_from(private_keys[0])?,
1794                Address::try_from(private_keys[1])?,
1795                Address::try_from(private_keys[2])?,
1796                Address::try_from(private_keys[3])?,
1797            ];
1798            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1799                committee_round,
1800                addresses,
1801                rng,
1802            );
1803            // Initialize a mapping from the round number to the set of batch certificates in the round.
1804            let mut round_to_certificates_map: IndexMap<
1805                u64,
1806                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1807            > = IndexMap::new();
1808            let mut previous_certificates = IndexSet::with_capacity(4);
1809            // Initialize the genesis batch certificates.
1810            for _ in 0..4 {
1811                previous_certificates.insert(sample_batch_certificate(rng));
1812            }
1813            for round in 0..=commit_round + 2 {
1814                let mut current_certificates = IndexSet::new();
1815                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1816                    IndexSet::new()
1817                } else {
1818                    previous_certificates.iter().map(|c| c.id()).collect()
1819                };
1820                let transmission_ids =
1821                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1822                        .into_iter()
1823                        .collect::<IndexSet<_>>();
1824                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1825                let committee_id = committee.id();
1826                for (i, private_key_1) in private_keys.iter().enumerate() {
1827                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1828                        private_key_1,
1829                        round,
1830                        timestamp,
1831                        committee_id,
1832                        transmission_ids.clone(),
1833                        previous_certificate_ids.clone(),
1834                        rng,
1835                    )
1836                    .unwrap();
1837                    let mut signatures = IndexSet::with_capacity(4);
1838                    for (j, private_key_2) in private_keys.iter().enumerate() {
1839                        if i != j {
1840                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1841                        }
1842                    }
1843                    let certificate =
1844                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1845                    current_certificates.insert(certificate);
1846                }
1847                // Update the mapping.
1848                round_to_certificates_map.insert(round, current_certificates.clone());
1849                previous_certificates = current_certificates.clone();
1850            }
1851            (round_to_certificates_map, committee)
1852        };
1853
1854        // Initialize the ledger.
1855        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1856        // Initialize the storage.
1857        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1858        // Get the leaders for the next 2 commit rounds.
1859        let leader = address0; // committee.get_leader(commit_round).unwrap();
1860        let next_leader = address0; // committee.get_leader(next_round).unwrap();
1861        // Insert the pre shutdown certificates into the storage.
1862        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1863        for i in 1..=commit_round {
1864            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1865            if i == commit_round {
1866                // Only insert the leader certificate for the commit round.
1867                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1868                if let Some(c) = leader_certificate {
1869                    pre_shutdown_certificates.push(c.clone());
1870                }
1871                continue;
1872            }
1873            pre_shutdown_certificates.extend(certificates);
1874        }
1875        for certificate in pre_shutdown_certificates.iter() {
1876            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1877        }
1878        // Initialize the bootup BFT.
1879        let account = Account::try_from(private_keys[0])?;
1880        let bootup_bft =
1881            BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1882        // Insert a mock DAG in the BFT without bootup.
1883        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1884        // Sync the BFT DAG at bootup.
1885        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1886
1887        // Insert the post shutdown certificates into the storage.
1888        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1889            Vec::new();
1890        for j in commit_round..=commit_round + 2 {
1891            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1892            post_shutdown_certificates.extend(certificate);
1893        }
1894        for certificate in post_shutdown_certificates.iter() {
1895            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1896        }
1897
1898        // Insert the post shutdown certificates into the DAG.
1899        for certificate in post_shutdown_certificates.clone() {
1900            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1901        }
1902
1903        // Get the next leader certificate to commit.
1904        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1905        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1906        let committed_certificates = commit_subdag.values().flatten();
1907
1908        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1909        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1910            for committed_certificate in committed_certificates.clone() {
1911                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1912            }
1913        }
1914        Ok(())
1915    }
1916}