amareleo_node_bft/
bft.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    Primary,
19    helpers::{
20        BFTReceiver,
21        ConsensusSender,
22        DAG,
23        PrimaryReceiver,
24        PrimarySender,
25        Storage,
26        fmt_id,
27        init_bft_channels,
28        now,
29    },
30};
31use aleo_std::StorageMode;
32use amareleo_chain_account::Account;
33use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
34use amareleo_node_bft_ledger_service::LedgerService;
35use snarkvm::{
36    console::account::Address,
37    ledger::{
38        block::Transaction,
39        committee::Committee,
40        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
41        puzzle::{Solution, SolutionID},
42    },
43    prelude::{Field, Network, Result, bail, ensure},
44};
45
46use colored::Colorize;
47use indexmap::{IndexMap, IndexSet};
48use parking_lot::{Mutex, RwLock};
49use std::{
50    collections::{BTreeMap, HashSet},
51    future::Future,
52    sync::{
53        Arc,
54        atomic::{AtomicI64, Ordering},
55    },
56};
57use tokio::{
58    sync::{Mutex as TMutex, OnceCell, oneshot},
59    task::JoinHandle,
60};
61use tracing::subscriber::DefaultGuard;
62
63#[derive(Clone)]
64pub struct BFT<N: Network> {
65    /// The primary.
66    primary: Primary<N>,
67    /// The DAG.
68    dag: Arc<RwLock<DAG<N>>>,
69    /// The batch certificate of the leader from the current even round, if one was present.
70    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
71    /// The timer for the leader certificate to be received.
72    leader_certificate_timer: Arc<AtomicI64>,
73    /// The consensus sender.
74    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
75    /// Tracing handle
76    tracing: Option<TracingHandler>,
77    /// The spawned handles.
78    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
79    /// The BFT lock.
80    bft_lock: Arc<TMutex<()>>,
81}
82
83impl<N: Network> TracingHandlerGuard for BFT<N> {
84    /// Retruns tracing guard
85    fn get_tracing_guard(&self) -> Option<DefaultGuard> {
86        self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
87    }
88}
89
90impl<N: Network> BFT<N> {
91    /// Initializes a new instance of the BFT.
92    pub fn new(
93        account: Account<N>,
94        storage: Storage<N>,
95        keep_state: bool,
96        storage_mode: StorageMode,
97        ledger: Arc<dyn LedgerService<N>>,
98        tracing: Option<TracingHandler>,
99    ) -> Result<Self> {
100        Ok(Self {
101            primary: Primary::new(account, storage, keep_state, storage_mode, ledger, tracing.clone())?,
102            dag: Default::default(),
103            leader_certificate: Default::default(),
104            leader_certificate_timer: Default::default(),
105            consensus_sender: Default::default(),
106            tracing: tracing.clone(),
107            handles: Default::default(),
108            bft_lock: Default::default(),
109        })
110    }
111
112    /// Run the BFT instance.
113    pub async fn run(
114        &mut self,
115        consensus_sender: Option<ConsensusSender<N>>,
116        primary_sender: PrimarySender<N>,
117        primary_receiver: PrimaryReceiver<N>,
118    ) -> Result<()> {
119        guard_info!(self, "Starting the BFT instance...");
120
121        // Initialize communications between BFT and Primary.
122        let (bft_sender, bft_receiver) = init_bft_channels::<N>();
123
124        // Start the BFT handlers.
125        self.start_handlers(bft_receiver);
126
127        // Run the primary instance.
128        let result = self.primary.run(Some(bft_sender), primary_sender, primary_receiver).await;
129        if let Err(err) = result {
130            guard_error!(self, "BFT failed to run the primary instance - {err}");
131            self.shut_down().await;
132            return Err(err);
133        }
134
135        // Lastly, set the consensus sender.
136        // Note: This ensures that during initial syncing,
137        // the BFT does not advance the ledger.
138        if let Some(consensus_sender) = consensus_sender {
139            let result = self.consensus_sender.set(consensus_sender);
140            if result.is_err() {
141                self.shut_down().await;
142                guard_error!(self, "Consensus sender already set");
143                bail!("Consensus sender already set");
144            }
145        }
146        Ok(())
147    }
148
149    /// Returns `true` if the primary is synced.
150    pub fn is_synced(&self) -> bool {
151        self.primary.is_synced()
152    }
153
154    /// Returns the primary.
155    pub const fn primary(&self) -> &Primary<N> {
156        &self.primary
157    }
158
159    /// Returns the storage.
160    pub const fn storage(&self) -> &Storage<N> {
161        self.primary.storage()
162    }
163
164    /// Returns the ledger.
165    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
166        self.primary.ledger()
167    }
168
169    /// Returns the leader of the current even round, if one was present.
170    pub fn leader(&self) -> Option<Address<N>> {
171        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
172    }
173
174    /// Returns the certificate of the leader from the current even round, if one was present.
175    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
176        &self.leader_certificate
177    }
178}
179
180impl<N: Network> BFT<N> {
181    /// Returns the number of unconfirmed transmissions.
182    pub fn num_unconfirmed_transmissions(&self) -> usize {
183        self.primary.num_unconfirmed_transmissions()
184    }
185
186    /// Returns the number of unconfirmed ratifications.
187    pub fn num_unconfirmed_ratifications(&self) -> usize {
188        self.primary.num_unconfirmed_ratifications()
189    }
190
191    /// Returns the number of solutions.
192    pub fn num_unconfirmed_solutions(&self) -> usize {
193        self.primary.num_unconfirmed_solutions()
194    }
195
196    /// Returns the number of unconfirmed transactions.
197    pub fn num_unconfirmed_transactions(&self) -> usize {
198        self.primary.num_unconfirmed_transactions()
199    }
200}
201
202impl<N: Network> BFT<N> {
203    /// Returns the worker transmission IDs.
204    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
205        self.primary.worker_transmission_ids()
206    }
207
208    /// Returns the worker transmissions.
209    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
210        self.primary.worker_transmissions()
211    }
212
213    /// Returns the worker solutions.
214    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
215        self.primary.worker_solutions()
216    }
217
218    /// Returns the worker transactions.
219    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
220        self.primary.worker_transactions()
221    }
222}
223
224impl<N: Network> BFT<N> {
225    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
226    fn update_to_next_round(&self, current_round: u64) -> bool {
227        // Ensure the current round is at least the storage round (this is a sanity check).
228        let storage_round = self.storage().current_round();
229        if current_round < storage_round {
230            guard_debug!(
231                self,
232                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
233            );
234            return false;
235        }
236
237        // Determine if the BFT is ready to update to the next round.
238        let is_ready = match current_round % 2 == 0 {
239            true => self.update_leader_certificate_to_even_round(current_round),
240            false => self.is_leader_quorum_or_nonleaders_available(current_round),
241        };
242
243        #[cfg(feature = "metrics")]
244        {
245            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
246            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
247            if start > 0 {
248                let end = now();
249                let elapsed = std::time::Duration::from_secs((end - start) as u64);
250                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
251            }
252        }
253
254        // Log whether the round is going to update.
255        if current_round % 2 == 0 {
256            // Determine if there is a leader certificate.
257            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
258                // Ensure the state of the leader certificate is consistent with the BFT being ready.
259                if !is_ready {
260                    guard_trace!(self, is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
261                }
262                // Log the leader election.
263                let leader_round = leader_certificate.round();
264                match leader_round == current_round {
265                    true => {
266                        guard_info!(
267                            self,
268                            "\n\nRound {current_round} elected a leader - {}\n",
269                            leader_certificate.author()
270                        );
271                        #[cfg(feature = "metrics")]
272                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
273                    }
274                    false => {
275                        guard_warn!(self, "BFT failed to elect a leader for round {current_round} (!= {leader_round})")
276                    }
277                }
278            } else {
279                match is_ready {
280                    true => guard_info!(self, "\n\nRound {current_round} reached quorum without a leader\n"),
281                    false => {
282                        guard_info!(self, "{}", format!("\n\nRound {current_round} did not elect a leader\n").dimmed())
283                    }
284                }
285            }
286        }
287
288        // If the BFT is ready, then update to the next round.
289        if is_ready {
290            // Update to the next round in storage.
291            if let Err(e) = self.storage().increment_to_next_round(current_round) {
292                guard_warn!(self, "BFT failed to increment to the next round from round {current_round} - {e}");
293                return false;
294            }
295            // Update the timer for the leader certificate.
296            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
297        }
298
299        is_ready
300    }
301
302    /// Updates the leader certificate to the current even round,
303    /// returning `true` if the BFT is ready to update to the next round.
304    ///
305    /// This method runs on every even round, by determining the leader of the current even round,
306    /// and setting the leader certificate to their certificate in the round, if they were present.
307    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
308        // Retrieve the current round.
309        let current_round = self.storage().current_round();
310        // Ensure the current round matches the given round.
311        if current_round != even_round {
312            guard_warn!(
313                self,
314                "BFT storage (at round {current_round}) is out of sync with the current even round {even_round}"
315            );
316            return false;
317        }
318
319        // If the current round is odd, return false.
320        if current_round % 2 != 0 || current_round < 2 {
321            guard_error!(self, "BFT cannot update the leader certificate in an odd round");
322            return false;
323        }
324
325        // Retrieve the certificates for the current round.
326        let current_certificates = self.storage().get_certificates_for_round(current_round);
327        // If there are no current certificates, set the leader certificate to 'None', and return early.
328        if current_certificates.is_empty() {
329            // Set the leader certificate to 'None'.
330            *self.leader_certificate.write() = None;
331            return false;
332        }
333
334        // Retrieve the committee lookback of the current round.
335        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
336            Ok(committee) => committee,
337            Err(e) => {
338                guard_error!(
339                    self,
340                    "BFT failed to retrieve the committee lookback for the even round {current_round} - {e}"
341                );
342                return false;
343            }
344        };
345        // Determine the leader of the current round.
346        let leader = match self.ledger().latest_leader() {
347            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
348            _ => {
349                // Compute the leader for the current round.
350                let computed_leader = self.primary.account().address();
351                // let computed_leader = match committee_lookback.get_leader(current_round) {
352                //     Ok(leader) => leader,
353                //     Err(e) => {
354                //         guard_error!(self, "BFT failed to compute the leader for the even round {current_round} - {e}");
355                //         return false;
356                //     }
357                // };
358
359                // Cache the computed leader.
360                self.ledger().update_latest_leader(current_round, computed_leader);
361
362                computed_leader
363            }
364        };
365        // Find and set the leader certificate, if the leader was present in the current even round.
366        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
367        *self.leader_certificate.write() = leader_certificate.cloned();
368
369        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
370    }
371
372    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
373    ///  - If the leader certificate is set for the current even round.
374    ///  - The timer for the leader certificate has expired.
375    fn is_even_round_ready_for_next_round(
376        &self,
377        certificates: IndexSet<BatchCertificate<N>>,
378        committee: Committee<N>,
379        current_round: u64,
380    ) -> bool {
381        // Retrieve the authors for the current round.
382        let authors = certificates.into_iter().map(|c| c.author()).collect();
383        // Check if quorum threshold is reached.
384        if !committee.is_quorum_threshold_reached(&authors) {
385            guard_trace!(self, "BFT failed to reach quorum threshold in even round {current_round}");
386            return false;
387        }
388        // If the leader certificate is set for the current even round, return 'true'.
389        if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
390            if leader_certificate.round() == current_round {
391                return true;
392            }
393        }
394        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
395        if self.is_timer_expired() {
396            guard_debug!(
397                self,
398                "BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)"
399            );
400            return true;
401        }
402        // Otherwise, return 'false'.
403        false
404    }
405
406    /// Returns `true` if the timer for the leader certificate has expired.
407    fn is_timer_expired(&self) -> bool {
408        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
409    }
410
411    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
412    ///  - The leader certificate is `None`.
413    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
414    ///  - The leader certificate timer has expired.
415    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
416        // Retrieve the current round.
417        let current_round = self.storage().current_round();
418        // Ensure the current round matches the given round.
419        if current_round != odd_round {
420            guard_warn!(
421                self,
422                "BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}"
423            );
424            return false;
425        }
426        // If the current round is even, return false.
427        if current_round % 2 != 1 {
428            guard_error!(self, "BFT does not compute stakes for the leader certificate in an even round");
429            return false;
430        }
431        // Retrieve the certificates for the current round.
432        let current_certificates = self.storage().get_certificates_for_round(current_round);
433        // Retrieve the committee lookback for the current round.
434        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
435            Ok(committee) => committee,
436            Err(e) => {
437                guard_error!(
438                    self,
439                    "BFT failed to retrieve the committee lookback for the odd round {current_round} - {e}"
440                );
441                return false;
442            }
443        };
444        // Retrieve the authors of the current certificates.
445        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
446        // Check if quorum threshold is reached.
447        if !committee_lookback.is_quorum_threshold_reached(&authors) {
448            guard_trace!(self, "BFT failed reach quorum threshold in odd round {current_round}. ");
449            return false;
450        }
451        // Retrieve the leader certificate.
452        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
453            // If there is no leader certificate for the previous round, return 'true'.
454            return true;
455        };
456        // Compute the stake for the leader certificate.
457        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
458            leader_certificate.id(),
459            current_certificates,
460            &committee_lookback,
461        );
462        // Return 'true' if any of the following conditions hold:
463        stake_with_leader >= committee_lookback.availability_threshold()
464            || stake_without_leader >= committee_lookback.quorum_threshold()
465            || self.is_timer_expired()
466    }
467
468    /// Computes the amount of stake that has & has not signed for the leader certificate.
469    fn compute_stake_for_leader_certificate(
470        &self,
471        leader_certificate_id: Field<N>,
472        current_certificates: IndexSet<BatchCertificate<N>>,
473        current_committee: &Committee<N>,
474    ) -> (u64, u64) {
475        // If there are no current certificates, return early.
476        if current_certificates.is_empty() {
477            return (0, 0);
478        }
479
480        // Initialize a tracker for the stake with the leader.
481        let mut stake_with_leader = 0u64;
482        // Initialize a tracker for the stake without the leader.
483        let mut stake_without_leader = 0u64;
484        // Iterate over the current certificates.
485        for certificate in current_certificates {
486            // Retrieve the stake for the author of the certificate.
487            let stake = current_committee.get_stake(certificate.author());
488            // Determine if the certificate includes the leader.
489            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
490                // If the certificate includes the leader, add the stake to the stake with the leader.
491                true => stake_with_leader = stake_with_leader.saturating_add(stake),
492                // If the certificate does not include the leader, add the stake to the stake without the leader.
493                false => stake_without_leader = stake_without_leader.saturating_add(stake),
494            }
495        }
496        // Return the stake with the leader, and the stake without the leader.
497        (stake_with_leader, stake_without_leader)
498    }
499}
500
501impl<N: Network> BFT<N> {
502    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
503    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
504        &self,
505        certificate: BatchCertificate<N>,
506    ) -> Result<()> {
507        // Acquire the BFT lock.
508        let _lock = self.bft_lock.lock().await;
509        // Retrieve the certificate round.
510        let certificate_round = certificate.round();
511        // Insert the certificate into the DAG.
512        self.dag.write().insert(certificate, self);
513
514        // Construct the commit round.
515        let commit_round = certificate_round.saturating_sub(1);
516        // If the commit round is odd, return early.
517        if commit_round % 2 != 0 || commit_round < 2 {
518            return Ok(());
519        }
520        // If the commit round is at or below the last committed round, return early.
521        if commit_round <= self.dag.read().last_committed_round() {
522            return Ok(());
523        }
524
525        /* Proceeding to check if the leader is ready to be committed. */
526        guard_info!(self, "Checking if the leader is ready to be committed for round {commit_round}...");
527
528        // Retrieve the committee lookback for the commit round.
529        let Ok(committee_lookback) = self.ledger().get_committee_lookback_for_round(commit_round) else {
530            bail!("BFT failed to retrieve the committee with lag for commit round {commit_round}");
531        };
532
533        // Either retrieve the cached leader or compute it.
534        let leader = match self.ledger().latest_leader() {
535            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
536            _ => {
537                // Compute the leader for the commit round.
538                let computed_leader = self.primary.account().address();
539                // let Ok(computed_leader) = committee_lookback.get_leader(commit_round) else {
540                //     bail!("BFT failed to compute the leader for commit round {commit_round}");
541                // };
542
543                // Cache the computed leader.
544                self.ledger().update_latest_leader(commit_round, computed_leader);
545
546                computed_leader
547            }
548        };
549
550        // Retrieve the leader certificate for the commit round.
551        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
552        else {
553            guard_trace!(self, "BFT did not find the leader certificate for commit round {commit_round} yet");
554            return Ok(());
555        };
556        // Retrieve all of the certificates for the **certificate** round.
557        let Some(certificates) = self.dag.read().get_certificates_for_round(certificate_round) else {
558            // TODO (howardwu): Investigate how many certificates we should have at this point.
559            bail!("BFT failed to retrieve the certificates for certificate round {certificate_round}");
560        };
561        // Construct a set over the authors who included the leader's certificate in the certificate round.
562        let authors = certificates
563            .values()
564            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
565                true => Some(c.author()),
566                false => None,
567            })
568            .collect();
569        // Check if the leader is ready to be committed.
570        if !committee_lookback.is_availability_threshold_reached(&authors) {
571            // If the leader is not ready to be committed, return early.
572            guard_trace!(self, "BFT is not ready to commit {commit_round}");
573            return Ok(());
574        }
575
576        /* Proceeding to commit the leader. */
577        guard_info!(self, "Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
578
579        // Commit the leader certificate, and all previous leader certificates since the last committed round.
580        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
581    }
582
583    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
584    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
585        &self,
586        leader_certificate: BatchCertificate<N>,
587    ) -> Result<()> {
588        // Fetch the leader round.
589        let latest_leader_round = leader_certificate.round();
590        // Determine the list of all previous leader certificates since the last committed round.
591        // The order of the leader certificates is from **newest** to **oldest**.
592        let mut leader_certificates = vec![leader_certificate.clone()];
593        {
594            // Retrieve the leader round.
595            let leader_round = leader_certificate.round();
596
597            let mut current_certificate = leader_certificate;
598            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
599            {
600                // // Retrieve the previous committee for the leader round.
601                // let previous_committee_lookback = match self.ledger().get_committee_lookback_for_round(round) {
602                //     Ok(committee) => committee,
603                //     Err(e) => {
604                //         bail!("BFT failed to retrieve a previous committee lookback for the even round {round} - {e}");
605                //     }
606                // };
607
608                // Either retrieve the cached leader or compute it.
609                let leader = match self.ledger().latest_leader() {
610                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
611                    _ => {
612                        // Compute the leader for the commit round.
613                        let computed_leader = self.primary.account().address();
614                        // let computed_leader = match previous_committee_lookback.get_leader(round) {
615                        //     Ok(leader) => leader,
616                        //     Err(e) => {
617                        //         bail!("BFT failed to compute the leader for the even round {round} - {e}");
618                        //     }
619                        // };
620
621                        // Cache the computed leader.
622                        self.ledger().update_latest_leader(round, computed_leader);
623
624                        computed_leader
625                    }
626                };
627                // Retrieve the previous leader certificate.
628                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
629                else {
630                    continue;
631                };
632                // Determine if there is a path between the previous certificate and the current certificate.
633                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
634                    // Add the previous leader certificate to the list of certificates to commit.
635                    leader_certificates.push(previous_certificate.clone());
636                    // Update the current certificate to the previous leader certificate.
637                    current_certificate = previous_certificate;
638                }
639            }
640        }
641
642        // Iterate over the leader certificates to commit.
643        for leader_certificate in leader_certificates.into_iter().rev() {
644            // Retrieve the leader certificate round.
645            let leader_round = leader_certificate.round();
646            // Compute the commit subdag.
647            let commit_subdag = match self.order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate) {
648                Ok(subdag) => subdag,
649                Err(e) => bail!("BFT failed to order the DAG with DFS - {e}"),
650            };
651            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
652            if !IS_SYNCING {
653                // Initialize a map for the deduped transmissions.
654                let mut transmissions = IndexMap::new();
655                // Initialize a map for the deduped transaction ids.
656                let mut seen_transaction_ids = IndexSet::new();
657                // Initialize a map for the deduped solution ids.
658                let mut seen_solution_ids = IndexSet::new();
659                // Start from the oldest leader certificate.
660                for certificate in commit_subdag.values().flatten() {
661                    // Retrieve the transmissions.
662                    for transmission_id in certificate.transmission_ids() {
663                        // If the transaction ID or solution ID already exists in the map, skip it.
664                        // Note: This additional check is done to ensure that we do not include duplicate
665                        // transaction IDs or solution IDs that may have a different transmission ID.
666                        match transmission_id {
667                            TransmissionID::Solution(solution_id, _) => {
668                                // If the solution already exists, skip it.
669                                if seen_solution_ids.contains(&solution_id) {
670                                    continue;
671                                }
672                            }
673                            TransmissionID::Transaction(transaction_id, _) => {
674                                // If the transaction already exists, skip it.
675                                if seen_transaction_ids.contains(transaction_id) {
676                                    continue;
677                                }
678                            }
679                            TransmissionID::Ratification => {
680                                bail!("Ratifications are currently not supported in the BFT.")
681                            }
682                        }
683                        // If the transmission already exists in the map, skip it.
684                        if transmissions.contains_key(transmission_id) {
685                            continue;
686                        }
687                        // If the transmission already exists in the ledger, skip it.
688                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
689                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
690                            continue;
691                        }
692                        // Retrieve the transmission.
693                        let Some(transmission) = self.storage().get_transmission(*transmission_id) else {
694                            bail!(
695                                "BFT failed to retrieve transmission '{}.{}' from round {}",
696                                fmt_id(transmission_id),
697                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
698                                certificate.round()
699                            );
700                        };
701                        // Insert the transaction ID or solution ID into the map.
702                        match transmission_id {
703                            TransmissionID::Solution(id, _) => {
704                                seen_solution_ids.insert(id);
705                            }
706                            TransmissionID::Transaction(id, _) => {
707                                seen_transaction_ids.insert(id);
708                            }
709                            TransmissionID::Ratification => {}
710                        }
711                        // Add the transmission to the set.
712                        transmissions.insert(*transmission_id, transmission);
713                    }
714                }
715                // Trigger consensus, as this will build a new block for the ledger.
716                // Construct the subdag.
717                let subdag = Subdag::from(commit_subdag.clone())?;
718                // Retrieve the anchor round.
719                let anchor_round = subdag.anchor_round();
720                // Retrieve the number of transmissions.
721                let num_transmissions = transmissions.len();
722                // Retrieve metadata about the subdag.
723                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
724
725                // Ensure the subdag anchor round matches the leader round.
726                ensure!(
727                    anchor_round == leader_round,
728                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
729                );
730
731                // Trigger consensus.
732                if let Some(consensus_sender) = self.consensus_sender.get() {
733                    // Initialize a callback sender and receiver.
734                    let (callback_sender, callback_receiver) = oneshot::channel();
735                    // Send the subdag and transmissions to consensus.
736                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
737                    // Await the callback to continue.
738                    match callback_receiver.await {
739                        Ok(Ok(())) => (), // continue
740                        Ok(Err(e)) => {
741                            guard_error!(self, "BFT failed to advance the subdag for round {anchor_round} - {e}");
742                            return Ok(());
743                        }
744                        Err(e) => {
745                            guard_error!(self, "BFT failed to receive the callback for round {anchor_round} - {e}");
746                            return Ok(());
747                        }
748                    }
749                }
750
751                guard_info!(
752                    self,
753                    "\n\nCommitting a subdag from round {anchor_round} with {num_transmissions} transmissions: {subdag_metadata:?}\n"
754                );
755            }
756
757            // Update the DAG, as the subdag was successfully included into a block.
758            let mut dag_write = self.dag.write();
759            for certificate in commit_subdag.values().flatten() {
760                dag_write.commit(certificate, self.storage().max_gc_rounds());
761            }
762        }
763
764        // Perform garbage collection based on the latest committed leader round.
765        self.storage().garbage_collect_certificates(latest_leader_round);
766
767        Ok(())
768    }
769
770    /// Returns the subdag of batch certificates to commit.
771    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
772        &self,
773        leader_certificate: BatchCertificate<N>,
774    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
775        // Initialize a map for the certificates to commit.
776        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
777        // Initialize a set for the already ordered certificates.
778        let mut already_ordered = HashSet::new();
779        // Initialize a buffer for the certificates to order.
780        let mut buffer = vec![leader_certificate];
781        // Iterate over the certificates to order.
782        while let Some(certificate) = buffer.pop() {
783            // Insert the certificate into the map.
784            commit.entry(certificate.round()).or_default().insert(certificate.clone());
785
786            // Check if the previous certificate is below the GC round.
787            let previous_round = certificate.round().saturating_sub(1);
788            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
789                continue;
790            }
791            // Iterate over the previous certificate IDs.
792            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
793            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
794            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
795                // If the previous certificate is already ordered, continue.
796                if already_ordered.contains(previous_certificate_id) {
797                    continue;
798                }
799                // If the previous certificate was recently committed, continue.
800                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
801                    continue;
802                }
803                // If the previous certificate already exists in the ledger, continue.
804                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
805                    continue;
806                }
807
808                // Retrieve the previous certificate.
809                let previous_certificate = {
810                    // Start by retrieving the previous certificate from the DAG.
811                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
812                        // If the previous certificate is found, return it.
813                        Some(previous_certificate) => previous_certificate,
814                        // If the previous certificate is not found, retrieve it from the storage.
815                        None => match self.storage().get_certificate(*previous_certificate_id) {
816                            // If the previous certificate is found, return it.
817                            Some(previous_certificate) => previous_certificate,
818                            // Otherwise, the previous certificate is missing, and throw an error.
819                            None => bail!(
820                                "Missing previous certificate {} for round {previous_round}",
821                                fmt_id(previous_certificate_id)
822                            ),
823                        },
824                    }
825                };
826                // Insert the previous certificate into the set of already ordered certificates.
827                already_ordered.insert(previous_certificate.id());
828                // Insert the previous certificate into the buffer.
829                buffer.push(previous_certificate);
830            }
831        }
832        // Ensure we only retain certificates that are above the GC round.
833        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
834        // Return the certificates to commit.
835        Ok(commit)
836    }
837
838    /// Returns `true` if there is a path from the previous certificate to the current certificate.
839    fn is_linked(
840        &self,
841        previous_certificate: BatchCertificate<N>,
842        current_certificate: BatchCertificate<N>,
843    ) -> Result<bool> {
844        // Initialize the list containing the traversal.
845        let mut traversal = vec![current_certificate.clone()];
846        // Iterate over the rounds from the current certificate to the previous certificate.
847        for round in (previous_certificate.round()..current_certificate.round()).rev() {
848            // Retrieve all of the certificates for this past round.
849            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
850                // This is a critical error, as the traversal should have these certificates.
851                // If this error is hit, it is likely that the maximum GC rounds should be increased.
852                bail!("BFT failed to retrieve the certificates for past round {round}");
853            };
854            // Filter the certificates to only include those that are in the traversal.
855            traversal = certificates
856                .into_values()
857                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
858                .collect();
859        }
860        Ok(traversal.contains(&previous_certificate))
861    }
862}
863
864impl<N: Network> BFT<N> {
865    /// Starts the BFT handlers.
866    fn start_handlers(&self, bft_receiver: BFTReceiver<N>) {
867        let BFTReceiver { mut rx_primary_round, mut rx_primary_certificate, mut rx_sync_bft_dag_at_bootup } =
868            bft_receiver;
869
870        // Process the current round from the primary.
871        let self_ = self.clone();
872        self.spawn(async move {
873            while let Some((current_round, callback)) = rx_primary_round.recv().await {
874                callback.send(self_.update_to_next_round(current_round)).ok();
875            }
876        });
877
878        // Process the certificate from the primary.
879        let self_ = self.clone();
880        self.spawn(async move {
881            while let Some((certificate, callback)) = rx_primary_certificate.recv().await {
882                // Update the DAG with the certificate.
883                let result = self_.update_dag::<true, false>(certificate).await;
884                // Send the callback **after** updating the DAG.
885                // Note: We must await the DAG update before proceeding.
886                callback.send(result).ok();
887            }
888        });
889
890        // Process the request to sync the BFT DAG at bootup.
891        let self_ = self.clone();
892        self.spawn(async move {
893            while let Some(certificates) = rx_sync_bft_dag_at_bootup.recv().await {
894                self_.sync_bft_dag_at_bootup(certificates).await;
895            }
896        });
897    }
898
899    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
900    /// already exist in the ledger.
901    ///
902    /// This method commits all the certificates into the DAG.
903    /// Note that there is no need to insert the certificates into the DAG, because these certificates
904    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
905    async fn sync_bft_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
906        // Acquire the BFT write lock.
907        let mut dag = self.dag.write();
908
909        // Commit all the certificates.
910        for certificate in certificates {
911            dag.commit(&certificate, self.storage().max_gc_rounds());
912        }
913    }
914
915    /// Spawns a task with the given future; it should only be used for long-running tasks.
916    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
917        self.handles.lock().push(tokio::spawn(future));
918    }
919
920    /// Shuts down the BFT.
921    pub async fn shut_down(&self) {
922        guard_info!(self, "Shutting down the BFT...");
923        // Acquire the lock.
924        let _lock = self.bft_lock.lock().await;
925
926        // Shut down the primary.
927        self.primary.shut_down().await;
928
929        // Abort BFT tasks.
930        let mut handles = self.handles.lock();
931        handles.iter().for_each(|handle| handle.abort());
932        handles.clear();
933    }
934}
935
936#[cfg(test)]
937mod tests {
938    use crate::{
939        BFT,
940        DEVELOPMENT_MODE_RNG_SEED,
941        MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
942        helpers::{Storage, amareleo_storage_mode, default_ledger_dir},
943    };
944
945    use amareleo_chain_account::Account;
946    use amareleo_node_bft_ledger_service::MockLedgerService;
947    use amareleo_node_bft_storage_service::BFTMemoryService;
948    use snarkvm::{
949        console::account::{Address, PrivateKey},
950        ledger::{
951            committee::Committee,
952            narwhal::batch_certificate::test_helpers::{sample_batch_certificate, sample_batch_certificate_for_round},
953        },
954        utilities::TestRng,
955    };
956
957    use aleo_std::StorageMode;
958    use anyhow::Result;
959    use indexmap::{IndexMap, IndexSet};
960    use rand::SeedableRng;
961    use rand_chacha::ChaChaRng;
962    use std::sync::Arc;
963
964    type CurrentNetwork = snarkvm::console::network::MainnetV0;
965
966    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
967    fn sample_test_instance(
968        committee_round: Option<u64>,
969        max_gc_rounds: u64,
970        rng: &mut TestRng,
971    ) -> (
972        Committee<CurrentNetwork>,
973        Account<CurrentNetwork>,
974        Arc<MockLedgerService<CurrentNetwork>>,
975        Storage<CurrentNetwork>,
976    ) {
977        let committee = match committee_round {
978            Some(round) => snarkvm::ledger::committee::test_helpers::sample_committee_for_round(round, rng),
979            None => snarkvm::ledger::committee::test_helpers::sample_committee(rng),
980        };
981        let account = Account::new(rng).unwrap();
982        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
983        let transmissions = Arc::new(BFTMemoryService::new());
984        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
985
986        (committee, account, ledger, storage)
987    }
988
989    #[test]
990    #[tracing_test::traced_test]
991    fn test_is_leader_quorum_odd() -> Result<()> {
992        let rng = &mut TestRng::default();
993
994        // Sample batch certificates.
995        let mut certificates = IndexSet::new();
996        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
997        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
998        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
999        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1000
1001        // Initialize the committee.
1002        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1003            1,
1004            vec![
1005                certificates[0].author(),
1006                certificates[1].author(),
1007                certificates[2].author(),
1008                certificates[3].author(),
1009            ],
1010            rng,
1011        );
1012
1013        // Initialize the ledger.
1014        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1015        // Initialize the storage.
1016        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1017        // Initialize the account.
1018        let account = Account::new(rng)?;
1019        // Initialize the BFT.
1020        let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1021        assert!(bft.is_timer_expired());
1022        // Ensure this call succeeds on an odd round.
1023        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1024        // If timer has expired but quorum threshold is not reached, return 'false'.
1025        assert!(!result);
1026        // Insert certificates into storage.
1027        for certificate in certificates.iter() {
1028            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1029        }
1030        // Ensure this call succeeds on an odd round.
1031        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1032        assert!(result); // no previous leader certificate
1033        // Set the leader certificate.
1034        let leader_certificate = sample_batch_certificate(rng);
1035        *bft.leader_certificate.write() = Some(leader_certificate);
1036        // Ensure this call succeeds on an odd round.
1037        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1038        assert!(result); // should now fall through to the end of function
1039
1040        Ok(())
1041    }
1042
1043    #[test]
1044    #[tracing_test::traced_test]
1045    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1046        let rng = &mut TestRng::default();
1047
1048        // Sample the test instance.
1049        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1050        assert_eq!(committee.starting_round(), 1);
1051        assert_eq!(storage.current_round(), 1);
1052        assert_eq!(storage.max_gc_rounds(), 10);
1053
1054        // Initialize the BFT.
1055        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1056        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1057
1058        // Store is at round 1, and we are checking for round 2.
1059        // Ensure this call fails on an even round.
1060        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1061        assert!(!result);
1062        Ok(())
1063    }
1064
1065    #[test]
1066    #[tracing_test::traced_test]
1067    fn test_is_leader_quorum_even() -> Result<()> {
1068        let rng = &mut TestRng::default();
1069
1070        // Sample the test instance.
1071        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1072        assert_eq!(committee.starting_round(), 2);
1073        assert_eq!(storage.current_round(), 2);
1074        assert_eq!(storage.max_gc_rounds(), 10);
1075
1076        // Initialize the BFT.
1077        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1078        assert!(bft.is_timer_expired()); // 0 + 5 < now()
1079
1080        // Ensure this call fails on an even round.
1081        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1082        assert!(!result);
1083        Ok(())
1084    }
1085
1086    #[test]
1087    #[tracing_test::traced_test]
1088    fn test_is_even_round_ready() -> Result<()> {
1089        let rng = &mut TestRng::default();
1090
1091        // Sample batch certificates.
1092        let mut certificates = IndexSet::new();
1093        certificates.insert(sample_batch_certificate_for_round(2, rng));
1094        certificates.insert(sample_batch_certificate_for_round(2, rng));
1095        certificates.insert(sample_batch_certificate_for_round(2, rng));
1096        certificates.insert(sample_batch_certificate_for_round(2, rng));
1097
1098        // Initialize the committee.
1099        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1100            2,
1101            vec![
1102                certificates[0].author(),
1103                certificates[1].author(),
1104                certificates[2].author(),
1105                certificates[3].author(),
1106            ],
1107            rng,
1108        );
1109
1110        // Initialize the ledger.
1111        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1112        // Initialize the storage.
1113        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1114        // Initialize the account.
1115        let account = Account::new(rng)?;
1116        // Initialize the BFT.
1117        let bft = BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1118        // Set the leader certificate.
1119        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1120        *bft.leader_certificate.write() = Some(leader_certificate);
1121        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1122        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1123        assert!(!result);
1124        // Once quorum threshold is reached, we are ready for the next round.
1125        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1126        assert!(result);
1127
1128        // Initialize a new BFT.
1129        let bft_timer =
1130            BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1131        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1132        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1133        if !bft_timer.is_timer_expired() {
1134            assert!(!result);
1135        }
1136        // Wait for the timer to expire.
1137        let leader_certificate_timeout =
1138            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1139        std::thread::sleep(leader_certificate_timeout);
1140        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1141        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1142        if bft_timer.is_timer_expired() {
1143            assert!(result);
1144        } else {
1145            assert!(!result);
1146        }
1147
1148        Ok(())
1149    }
1150
1151    #[test]
1152    #[tracing_test::traced_test]
1153    fn test_update_leader_certificate_odd() -> Result<()> {
1154        let rng = &mut TestRng::default();
1155
1156        // Sample the test instance.
1157        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1158        assert_eq!(storage.max_gc_rounds(), 10);
1159
1160        // Initialize the BFT.
1161        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1162
1163        // Ensure this call fails on an odd round.
1164        let result = bft.update_leader_certificate_to_even_round(1);
1165        assert!(!result);
1166        Ok(())
1167    }
1168
1169    #[test]
1170    #[tracing_test::traced_test]
1171    fn test_update_leader_certificate_bad_round() -> Result<()> {
1172        let rng = &mut TestRng::default();
1173
1174        // Sample the test instance.
1175        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1176        assert_eq!(storage.max_gc_rounds(), 10);
1177
1178        // Initialize the BFT.
1179        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1180
1181        // Ensure this call succeeds on an even round.
1182        let result = bft.update_leader_certificate_to_even_round(6);
1183        assert!(!result);
1184        Ok(())
1185    }
1186
1187    #[test]
1188    #[tracing_test::traced_test]
1189    fn test_update_leader_certificate_even() -> Result<()> {
1190        let rng = &mut TestRng::default();
1191
1192        // Set the current round.
1193        let current_round = 3;
1194
1195        // Sample the certificates.
1196        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1197            current_round,
1198            rng,
1199        );
1200
1201        // Initialize the committee.
1202        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1203            2,
1204            vec![
1205                certificates[0].author(),
1206                certificates[1].author(),
1207                certificates[2].author(),
1208                certificates[3].author(),
1209            ],
1210            rng,
1211        );
1212
1213        // Initialize the ledger.
1214        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1215
1216        // Initialize the storage.
1217        let transmissions = Arc::new(BFTMemoryService::new());
1218        let storage = Storage::new(ledger.clone(), transmissions, 10, None);
1219        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1220        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1221        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1222        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1223        assert_eq!(storage.current_round(), 2);
1224
1225        // Retrieve the leader certificate.
1226        let leader = certificates[0].author(); // committee.get_leader(2).unwrap();
1227        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1228
1229        // Initialize the BFT.
1230        let account = Account::new(rng)?;
1231        let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1232
1233        // Set the leader certificate.
1234        *bft.leader_certificate.write() = Some(leader_certificate);
1235
1236        // Update the leader certificate.
1237        // Ensure this call succeeds on an even round.
1238        let result = bft.update_leader_certificate_to_even_round(2);
1239        assert!(result);
1240
1241        Ok(())
1242    }
1243
1244    #[tokio::test]
1245    #[tracing_test::traced_test]
1246    async fn test_order_dag_with_dfs() -> Result<()> {
1247        let rng = &mut TestRng::default();
1248
1249        // Sample the test instance.
1250        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1251
1252        // Initialize the round parameters.
1253        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1254        let current_round = previous_round + 1;
1255
1256        // Sample the current certificate and previous certificates.
1257        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1258            current_round,
1259            rng,
1260        );
1261
1262        /* Test GC */
1263
1264        // Ensure the function succeeds in returning only certificates above GC.
1265        {
1266            // Initialize the storage.
1267            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1268            // Initialize the BFT.
1269            let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1270
1271            // Insert a mock DAG in the BFT.
1272            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1273
1274            // Insert the previous certificates into the BFT.
1275            for certificate in previous_certificates.clone() {
1276                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1277            }
1278
1279            // Ensure this call succeeds and returns all given certificates.
1280            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1281            assert!(result.is_ok());
1282            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1283            assert_eq!(candidate_certificates.len(), 1);
1284            let expected_certificates = vec![certificate.clone()];
1285            assert_eq!(
1286                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1287                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1288            );
1289            assert_eq!(candidate_certificates, expected_certificates);
1290        }
1291
1292        /* Test normal case */
1293
1294        // Ensure the function succeeds in returning all given certificates.
1295        {
1296            // Initialize the storage.
1297            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1, None);
1298            // Initialize the BFT.
1299            let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1300
1301            // Insert a mock DAG in the BFT.
1302            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1303
1304            // Insert the previous certificates into the BFT.
1305            for certificate in previous_certificates.clone() {
1306                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1307            }
1308
1309            // Ensure this call succeeds and returns all given certificates.
1310            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1311            assert!(result.is_ok());
1312            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1313            assert_eq!(candidate_certificates.len(), 5);
1314            let expected_certificates = vec![
1315                previous_certificates[0].clone(),
1316                previous_certificates[1].clone(),
1317                previous_certificates[2].clone(),
1318                previous_certificates[3].clone(),
1319                certificate,
1320            ];
1321            assert_eq!(
1322                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1323                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1324            );
1325            assert_eq!(candidate_certificates, expected_certificates);
1326        }
1327
1328        Ok(())
1329    }
1330
1331    #[test]
1332    #[tracing_test::traced_test]
1333    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1334        let rng = &mut TestRng::default();
1335
1336        // Sample the test instance.
1337        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1338        assert_eq!(committee.starting_round(), 1);
1339        assert_eq!(storage.current_round(), 1);
1340        assert_eq!(storage.max_gc_rounds(), 1);
1341
1342        // Initialize the round parameters.
1343        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1344        let current_round = previous_round + 1;
1345
1346        // Sample the current certificate and previous certificates.
1347        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1348            current_round,
1349            rng,
1350        );
1351        // Construct the previous certificate IDs.
1352        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1353
1354        /* Test missing previous certificate. */
1355
1356        // Initialize the BFT.
1357        let bft = BFT::new(account, storage, false, StorageMode::Development(0), ledger, None)?;
1358
1359        // The expected error message.
1360        let error_msg = format!(
1361            "Missing previous certificate {} for round {previous_round}",
1362            crate::helpers::fmt_id(previous_certificate_ids[3]),
1363        );
1364
1365        // Ensure this call fails on a missing previous certificate.
1366        let result = bft.order_dag_with_dfs::<false>(certificate);
1367        assert!(result.is_err());
1368        assert_eq!(result.unwrap_err().to_string(), error_msg);
1369        Ok(())
1370    }
1371
1372    #[tokio::test]
1373    #[tracing_test::traced_test]
1374    async fn test_bft_gc_on_commit() -> Result<()> {
1375        let rng = &mut TestRng::default();
1376
1377        // Initialize the round parameters.
1378        let max_gc_rounds = 1;
1379        let committee_round = 0;
1380        let commit_round = 2;
1381        let current_round = commit_round + 1;
1382
1383        // Sample the certificates.
1384        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1385            current_round,
1386            rng,
1387        );
1388
1389        // Initialize the committee.
1390        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1391            committee_round,
1392            vec![
1393                certificates[0].author(),
1394                certificates[1].author(),
1395                certificates[2].author(),
1396                certificates[3].author(),
1397            ],
1398            rng,
1399        );
1400
1401        // Initialize the ledger.
1402        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1403
1404        // Initialize the storage.
1405        let transmissions = Arc::new(BFTMemoryService::new());
1406        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds, None);
1407        // Insert the certificates into the storage.
1408        for certificate in certificates.iter() {
1409            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1410        }
1411
1412        // Get the leader certificate.
1413        let leader = certificates[0].author(); // committee.get_leader(commit_round).unwrap();
1414        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1415
1416        // Initialize the BFT.
1417        let account = Account::new(rng)?;
1418        let bft = BFT::new(account, storage.clone(), false, StorageMode::Development(0), ledger, None)?;
1419        // Insert a mock DAG in the BFT.
1420        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1421
1422        // Ensure that the `gc_round` has not been updated yet.
1423        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1424
1425        // Insert the certificates into the BFT.
1426        for certificate in certificates {
1427            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1428        }
1429
1430        // Commit the leader certificate.
1431        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1432
1433        // Ensure that the `gc_round` has been updated.
1434        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1435
1436        Ok(())
1437    }
1438
1439    #[tokio::test]
1440    #[tracing_test::traced_test]
1441    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1442        let rng = &mut TestRng::default();
1443
1444        // Initialize the round parameters.
1445        let max_gc_rounds = 1;
1446        let committee_round = 0;
1447        let commit_round = 2;
1448        let current_round = commit_round + 1;
1449
1450        // Sample the current certificate and previous certificates.
1451        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1452            current_round,
1453            rng,
1454        );
1455
1456        // Initialize the committee.
1457        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1458            committee_round,
1459            vec![
1460                certificates[0].author(),
1461                certificates[1].author(),
1462                certificates[2].author(),
1463                certificates[3].author(),
1464            ],
1465            rng,
1466        );
1467
1468        // Initialize the ledger.
1469        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1470
1471        // Initialize the storage.
1472        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1473        // Insert the certificates into the storage.
1474        for certificate in certificates.iter() {
1475            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1476        }
1477
1478        // Get the leader certificate.
1479        let leader = certificates[0].author(); // committee.get_leader(commit_round).unwrap();
1480        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1481
1482        // Initialize the BFT.
1483        let account = Account::new(rng)?;
1484        let bft = BFT::new(account.clone(), storage, false, StorageMode::Development(0), ledger.clone(), None)?;
1485
1486        // Insert a mock DAG in the BFT.
1487        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1488
1489        // Insert the previous certificates into the BFT.
1490        for certificate in certificates.clone() {
1491            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1492        }
1493
1494        // Commit the leader certificate.
1495        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1496
1497        // Simulate a bootup of the BFT.
1498
1499        // Initialize a new instance of storage.
1500        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1501        // Initialize a new instance of BFT.
1502        let bootup_bft = BFT::new(account, storage_2, false, StorageMode::Development(0), ledger, None)?;
1503
1504        // Sync the BFT DAG at bootup.
1505        bootup_bft.sync_bft_dag_at_bootup(certificates.clone()).await;
1506
1507        // Check that the BFT starts from the same last committed round.
1508        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1509
1510        // Ensure that both BFTs have committed the leader certificate.
1511        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1512        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1513
1514        // Check the state of the bootup BFT.
1515        for certificate in certificates {
1516            let certificate_round = certificate.round();
1517            let certificate_id = certificate.id();
1518            // Check that the bootup BFT has committed the certificates.
1519            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1520            // Check that the bootup BFT does not contain the certificates in its graph, because
1521            // it should not need to order them again in subsequent subdags.
1522            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1523        }
1524
1525        Ok(())
1526    }
1527
1528    #[tokio::test]
1529    #[tracing_test::traced_test]
1530    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1531        /*
1532        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1533        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1534        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1535        */
1536
1537        let rng = &mut TestRng::default();
1538        let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1539
1540        let private_keys = vec![
1541            PrivateKey::new(rng_pks).unwrap(),
1542            PrivateKey::new(rng_pks).unwrap(),
1543            PrivateKey::new(rng_pks).unwrap(),
1544            PrivateKey::new(rng_pks).unwrap(),
1545        ];
1546        let address0 = Address::try_from(private_keys[0])?;
1547
1548        // Initialize the round parameters.
1549        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1550        let committee_round = 0;
1551        let commit_round = 2;
1552        let current_round = commit_round + 1;
1553        let next_round = current_round + 1;
1554
1555        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1556        let (round_to_certificates_map, committee) = {
1557            let addresses = vec![
1558                Address::try_from(private_keys[0])?,
1559                Address::try_from(private_keys[1])?,
1560                Address::try_from(private_keys[2])?,
1561                Address::try_from(private_keys[3])?,
1562            ];
1563            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1564                committee_round,
1565                addresses,
1566                rng,
1567            );
1568            // Initialize a mapping from the round number to the set of batch certificates in the round.
1569            let mut round_to_certificates_map: IndexMap<
1570                u64,
1571                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1572            > = IndexMap::new();
1573            let mut previous_certificates = IndexSet::with_capacity(4);
1574            // Initialize the genesis batch certificates.
1575            for _ in 0..4 {
1576                previous_certificates.insert(sample_batch_certificate(rng));
1577            }
1578            for round in 0..commit_round + 3 {
1579                let mut current_certificates = IndexSet::new();
1580                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1581                    IndexSet::new()
1582                } else {
1583                    previous_certificates.iter().map(|c| c.id()).collect()
1584                };
1585                let transmission_ids =
1586                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1587                        .into_iter()
1588                        .collect::<IndexSet<_>>();
1589                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1590                let committee_id = committee.id();
1591                for (i, private_key_1) in private_keys.iter().enumerate() {
1592                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1593                        private_key_1,
1594                        round,
1595                        timestamp,
1596                        committee_id,
1597                        transmission_ids.clone(),
1598                        previous_certificate_ids.clone(),
1599                        rng,
1600                    )
1601                    .unwrap();
1602                    let mut signatures = IndexSet::with_capacity(4);
1603                    for (j, private_key_2) in private_keys.iter().enumerate() {
1604                        if i != j {
1605                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1606                        }
1607                    }
1608                    let certificate =
1609                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1610                    current_certificates.insert(certificate);
1611                }
1612                // Update the mapping.
1613                round_to_certificates_map.insert(round, current_certificates.clone());
1614                previous_certificates = current_certificates.clone();
1615            }
1616            (round_to_certificates_map, committee)
1617        };
1618
1619        // Initialize the ledger.
1620        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1621        // Initialize the storage.
1622        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1623        // Get the leaders for the next 2 commit rounds.
1624        let leader = address0; // committee.get_leader(commit_round).unwrap();
1625        let next_leader = address0; // committee.get_leader(next_round).unwrap();
1626        // Insert the pre shutdown certificates into the storage.
1627        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1628        for i in 1..=commit_round {
1629            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1630            if i == commit_round {
1631                // Only insert the leader certificate for the commit round.
1632                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1633                if let Some(c) = leader_certificate {
1634                    pre_shutdown_certificates.push(c.clone());
1635                }
1636                continue;
1637            }
1638            pre_shutdown_certificates.extend(certificates);
1639        }
1640        for certificate in pre_shutdown_certificates.iter() {
1641            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1642        }
1643        // Insert the post shutdown certificates into the storage.
1644        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1645            Vec::new();
1646        for j in commit_round..=commit_round + 2 {
1647            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1648            post_shutdown_certificates.extend(certificate);
1649        }
1650        for certificate in post_shutdown_certificates.iter() {
1651            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1652        }
1653        // Get the leader certificates.
1654        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1655        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1656
1657        // Initialize the BFT without bootup.
1658        let account = Account::try_from(private_keys[0])?;
1659        let ledger_dir = default_ledger_dir(1, true, "0");
1660        let storage_mode = amareleo_storage_mode(ledger_dir);
1661        let bft = BFT::new(account.clone(), storage, true, storage_mode.clone(), ledger.clone(), None)?;
1662
1663        // Insert a mock DAG in the BFT without bootup.
1664        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1665
1666        // Insert the certificates into the BFT without bootup.
1667        for certificate in pre_shutdown_certificates.clone() {
1668            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1669        }
1670
1671        // Insert the post shutdown certificates into the BFT without bootup.
1672        for certificate in post_shutdown_certificates.clone() {
1673            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1674        }
1675        // Commit the second leader certificate.
1676        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1677        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1678        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1679
1680        // Simulate a bootup of the BFT.
1681
1682        // Initialize a new instance of storage.
1683        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1684
1685        // Initialize a new instance of BFT with bootup.
1686        let bootup_bft = BFT::new(account, bootup_storage.clone(), true, storage_mode.clone(), ledger.clone(), None)?;
1687
1688        // Sync the BFT DAG at bootup.
1689        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1690
1691        // Insert the post shutdown certificates to the storage and BFT with bootup.
1692        for certificate in post_shutdown_certificates.iter() {
1693            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1694        }
1695        for certificate in post_shutdown_certificates.clone() {
1696            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1697        }
1698        // Commit the second leader certificate.
1699        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1700        let commit_subdag_metadata_bootup =
1701            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1702        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1703        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1704
1705        // Check that the final state of both BFTs is the same.
1706
1707        // Check that both BFTs start from the same last committed round.
1708        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1709
1710        // Ensure that both BFTs have committed the leader certificates.
1711        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1712        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1713        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1714        assert!(
1715            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1716        );
1717
1718        // Check that the bootup BFT has committed the pre shutdown certificates.
1719        for certificate in pre_shutdown_certificates.clone() {
1720            let certificate_round = certificate.round();
1721            let certificate_id = certificate.id();
1722            // Check that both BFTs have committed the certificates.
1723            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1724            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1725            // Check that the bootup BFT does not contain the certificates in its graph, because
1726            // it should not need to order them again in subsequent subdags.
1727            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1728            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1729        }
1730
1731        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1732        for certificate in committed_certificates_bootup.clone() {
1733            let certificate_round = certificate.round();
1734            let certificate_id = certificate.id();
1735            // Check that the both BFTs have committed the certificates.
1736            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1737            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1738            // Check that the bootup BFT does not contain the certificates in its graph, because
1739            // it should not need to order them again in subsequent subdags.
1740            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1741            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1742        }
1743
1744        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1745        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1746
1747        Ok(())
1748    }
1749
1750    #[tokio::test]
1751    #[tracing_test::traced_test]
1752    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1753        /*
1754        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1755        2. Add post shutdown certificates to the bootup BFT.
1756        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1757        */
1758
1759        let rng = &mut TestRng::default();
1760        let rng_pks = &mut ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1761
1762        let private_keys = vec![
1763            PrivateKey::new(rng_pks).unwrap(),
1764            PrivateKey::new(rng_pks).unwrap(),
1765            PrivateKey::new(rng_pks).unwrap(),
1766            PrivateKey::new(rng_pks).unwrap(),
1767        ];
1768        let address0 = Address::try_from(private_keys[0])?;
1769
1770        // Initialize the round parameters.
1771        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1772        let committee_round = 0;
1773        let commit_round = 2;
1774        let current_round = commit_round + 1;
1775        let next_round = current_round + 1;
1776
1777        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1778        let (round_to_certificates_map, committee) = {
1779            let addresses = vec![
1780                Address::try_from(private_keys[0])?,
1781                Address::try_from(private_keys[1])?,
1782                Address::try_from(private_keys[2])?,
1783                Address::try_from(private_keys[3])?,
1784            ];
1785            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1786                committee_round,
1787                addresses,
1788                rng,
1789            );
1790            // Initialize a mapping from the round number to the set of batch certificates in the round.
1791            let mut round_to_certificates_map: IndexMap<
1792                u64,
1793                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1794            > = IndexMap::new();
1795            let mut previous_certificates = IndexSet::with_capacity(4);
1796            // Initialize the genesis batch certificates.
1797            for _ in 0..4 {
1798                previous_certificates.insert(sample_batch_certificate(rng));
1799            }
1800            for round in 0..=commit_round + 2 {
1801                let mut current_certificates = IndexSet::new();
1802                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1803                    IndexSet::new()
1804                } else {
1805                    previous_certificates.iter().map(|c| c.id()).collect()
1806                };
1807                let transmission_ids =
1808                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1809                        .into_iter()
1810                        .collect::<IndexSet<_>>();
1811                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1812                let committee_id = committee.id();
1813                for (i, private_key_1) in private_keys.iter().enumerate() {
1814                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1815                        private_key_1,
1816                        round,
1817                        timestamp,
1818                        committee_id,
1819                        transmission_ids.clone(),
1820                        previous_certificate_ids.clone(),
1821                        rng,
1822                    )
1823                    .unwrap();
1824                    let mut signatures = IndexSet::with_capacity(4);
1825                    for (j, private_key_2) in private_keys.iter().enumerate() {
1826                        if i != j {
1827                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1828                        }
1829                    }
1830                    let certificate =
1831                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1832                    current_certificates.insert(certificate);
1833                }
1834                // Update the mapping.
1835                round_to_certificates_map.insert(round, current_certificates.clone());
1836                previous_certificates = current_certificates.clone();
1837            }
1838            (round_to_certificates_map, committee)
1839        };
1840
1841        // Initialize the ledger.
1842        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1843        // Initialize the storage.
1844        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds, None);
1845        // Get the leaders for the next 2 commit rounds.
1846        let leader = address0; // committee.get_leader(commit_round).unwrap();
1847        let next_leader = address0; // committee.get_leader(next_round).unwrap();
1848        // Insert the pre shutdown certificates into the storage.
1849        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1850        for i in 1..=commit_round {
1851            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1852            if i == commit_round {
1853                // Only insert the leader certificate for the commit round.
1854                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1855                if let Some(c) = leader_certificate {
1856                    pre_shutdown_certificates.push(c.clone());
1857                }
1858                continue;
1859            }
1860            pre_shutdown_certificates.extend(certificates);
1861        }
1862        for certificate in pre_shutdown_certificates.iter() {
1863            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1864        }
1865        // Initialize the bootup BFT.
1866        let account = Account::try_from(private_keys[0])?;
1867        let bootup_bft =
1868            BFT::new(account.clone(), storage.clone(), false, StorageMode::Development(0), ledger.clone(), None)?;
1869        // Insert a mock DAG in the BFT without bootup.
1870        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1871        // Sync the BFT DAG at bootup.
1872        bootup_bft.sync_bft_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1873
1874        // Insert the post shutdown certificates into the storage.
1875        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1876            Vec::new();
1877        for j in commit_round..=commit_round + 2 {
1878            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1879            post_shutdown_certificates.extend(certificate);
1880        }
1881        for certificate in post_shutdown_certificates.iter() {
1882            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1883        }
1884
1885        // Insert the post shutdown certificates into the DAG.
1886        for certificate in post_shutdown_certificates.clone() {
1887            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1888        }
1889
1890        // Get the next leader certificate to commit.
1891        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1892        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1893        let committed_certificates = commit_subdag.values().flatten();
1894
1895        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1896        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1897            for committed_certificate in committed_certificates.clone() {
1898                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1899            }
1900        }
1901        Ok(())
1902    }
1903}