Skip to main content

snarkos_node_bft/
bft.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
18    helpers::{ConsensusSender, DAG, PrimaryReceiver, PrimarySender, Storage, fmt_id, now},
19    primary::{Primary, PrimaryCallback},
20    sync::SyncCallback,
21};
22
23use snarkos_account::Account;
24use snarkos_node_bft_ledger_service::LedgerService;
25use snarkos_node_sync::{BlockSync, Ping};
26use snarkos_utilities::NodeDataDir;
27
28use snarkvm::{
29    console::account::Address,
30    ledger::{
31        block::Transaction,
32        committee::Committee,
33        narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID},
34        puzzle::{Solution, SolutionID},
35    },
36    prelude::{Field, Network, Result, bail, ensure},
37    utilities::flatten_error,
38};
39
40use anyhow::Context;
41use colored::Colorize;
42use indexmap::{IndexMap, IndexSet};
43#[cfg(feature = "locktick")]
44use locktick::{parking_lot::RwLock, tokio::Mutex as TMutex};
45#[cfg(not(feature = "locktick"))]
46use parking_lot::RwLock;
47use std::{
48    collections::{BTreeMap, HashSet},
49    net::SocketAddr,
50    sync::{
51        Arc,
52        atomic::{AtomicI64, Ordering},
53    },
54};
55#[cfg(not(feature = "locktick"))]
56use tokio::sync::Mutex as TMutex;
57use tokio::sync::{OnceCell, oneshot};
58
59#[derive(Clone)]
60pub struct BFT<N: Network> {
61    /// The primary for this node.
62    primary: Primary<N>,
63    /// The DAG of batches from which we build the blockchain.
64    dag: Arc<RwLock<DAG<N>>>,
65    /// The batch certificate of the leader from the current even round, if one was present.
66    leader_certificate: Arc<RwLock<Option<BatchCertificate<N>>>>,
67    /// The timer for the leader certificate to be received.
68    leader_certificate_timer: Arc<AtomicI64>,
69    /// The consensus sender.
70    consensus_sender: Arc<OnceCell<ConsensusSender<N>>>,
71    /// The BFT lock.
72    lock: Arc<TMutex<()>>,
73}
74
75impl<N: Network> BFT<N> {
76    /// Initializes a new instance of the BFT.
77    #[allow(clippy::too_many_arguments)]
78    pub fn new(
79        account: Account<N>,
80        storage: Storage<N>,
81        ledger: Arc<dyn LedgerService<N>>,
82        block_sync: Arc<BlockSync<N>>,
83        ip: Option<SocketAddr>,
84        trusted_validators: &[SocketAddr],
85        trusted_peers_only: bool,
86        node_data_dir: NodeDataDir,
87        dev: Option<u16>,
88    ) -> Result<Self> {
89        Ok(Self {
90            primary: Primary::new(
91                account,
92                storage,
93                ledger,
94                block_sync,
95                ip,
96                trusted_validators,
97                trusted_peers_only,
98                node_data_dir,
99                dev,
100            )?,
101            dag: Default::default(),
102            leader_certificate: Default::default(),
103            leader_certificate_timer: Default::default(),
104            consensus_sender: Default::default(),
105            lock: Default::default(),
106        })
107    }
108
109    /// Run the BFT instance.
110    ///
111    /// This will return as soon as all required tasks are spawned.
112    /// The function must not be called more than once per instance.
113    pub async fn run(
114        &mut self,
115        ping: Option<Arc<Ping<N>>>,
116        consensus_sender: Option<ConsensusSender<N>>,
117        primary_sender: PrimarySender<N>,
118        primary_receiver: PrimaryReceiver<N>,
119    ) -> Result<()> {
120        info!("Starting the BFT instance...");
121        // Set up callbacks.
122        let primary_callback = Some(Arc::new(self.clone()) as Arc<dyn PrimaryCallback<N>>);
123
124        let sync_callback = Some(Arc::new(self.clone()) as Arc<dyn SyncCallback<N>>);
125
126        // Next, run the primary instance.
127        self.primary.run(ping, primary_callback, sync_callback, primary_sender, primary_receiver).await?;
128
129        // Lastly, set the consensus sender.
130        // Note: This ensures that the BFT does not advance the ledger during initial syncing.
131        if let Some(consensus_sender) = consensus_sender {
132            self.consensus_sender.set(consensus_sender).expect("Consensus sender already set");
133        }
134        Ok(())
135    }
136
137    /// Returns `true` if the primary is synced.
138    pub fn is_synced(&self) -> bool {
139        self.primary.is_synced()
140    }
141
142    /// Returns the primary.
143    pub const fn primary(&self) -> &Primary<N> {
144        &self.primary
145    }
146
147    /// Returns the storage.
148    pub const fn storage(&self) -> &Storage<N> {
149        self.primary.storage()
150    }
151
152    /// Returns the ledger.
153    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
154        self.primary.ledger()
155    }
156
157    /// Returns the leader of the current even round, if one was present.
158    pub fn leader(&self) -> Option<Address<N>> {
159        self.leader_certificate.read().as_ref().map(|certificate| certificate.author())
160    }
161
162    /// Returns the certificate of the leader from the current even round, if one was present.
163    pub const fn leader_certificate(&self) -> &Arc<RwLock<Option<BatchCertificate<N>>>> {
164        &self.leader_certificate
165    }
166}
167
168impl<N: Network> BFT<N> {
169    /// Returns the number of unconfirmed transmissions.
170    pub fn num_unconfirmed_transmissions(&self) -> usize {
171        self.primary.num_unconfirmed_transmissions()
172    }
173
174    /// Returns the number of unconfirmed ratifications.
175    pub fn num_unconfirmed_ratifications(&self) -> usize {
176        self.primary.num_unconfirmed_ratifications()
177    }
178
179    /// Returns the number of solutions.
180    pub fn num_unconfirmed_solutions(&self) -> usize {
181        self.primary.num_unconfirmed_solutions()
182    }
183
184    /// Returns the number of unconfirmed transactions.
185    pub fn num_unconfirmed_transactions(&self) -> usize {
186        self.primary.num_unconfirmed_transactions()
187    }
188}
189
190impl<N: Network> BFT<N> {
191    /// Returns the worker transmission IDs.
192    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
193        self.primary.worker_transmission_ids()
194    }
195
196    /// Returns the worker transmissions.
197    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
198        self.primary.worker_transmissions()
199    }
200
201    /// Returns the worker solutions.
202    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
203        self.primary.worker_solutions()
204    }
205
206    /// Returns the worker transactions.
207    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
208        self.primary.worker_transactions()
209    }
210}
211
212#[async_trait::async_trait]
213impl<N: Network> PrimaryCallback<N> for BFT<N> {
214    /// Notification that a new round has started.
215    fn update_to_next_round(&self, current_round: u64) -> bool {
216        // Ensure the current round is at least the storage round (this is a sanity check).
217        let storage_round = self.storage().current_round();
218        if current_round < storage_round {
219            debug!(
220                "BFT is safely skipping an update for round {current_round}, as storage is at round {storage_round}"
221            );
222            return false;
223        }
224
225        // Determine if the BFT is ready to update to the next round.
226        let is_ready = match current_round.is_multiple_of(2) {
227            true => self.update_leader_certificate_to_even_round(current_round),
228            false => self.is_leader_quorum_or_nonleaders_available(current_round),
229        };
230
231        #[cfg(feature = "metrics")]
232        {
233            let start = self.leader_certificate_timer.load(Ordering::SeqCst);
234            // Only log if the timer was set, otherwise we get a time difference since the EPOCH.
235            if start > 0 {
236                let end = now();
237                let elapsed = std::time::Duration::from_secs((end - start) as u64);
238                metrics::histogram(metrics::bft::COMMIT_ROUNDS_LATENCY, elapsed.as_secs_f64());
239            }
240        }
241
242        // Log whether the round is going to update.
243        if current_round.is_multiple_of(2) {
244            // Determine if there is a leader certificate.
245            if let Some(leader_certificate) = self.leader_certificate.read().as_ref() {
246                // Ensure the state of the leader certificate is consistent with the BFT being ready.
247                if !is_ready {
248                    trace!(is_ready, "BFT - A leader certificate was found, but 'is_ready' is false");
249                }
250                // Log the leader election.
251                let leader_round = leader_certificate.round();
252                match leader_round == current_round {
253                    true => {
254                        info!("Round {current_round} elected a leader - {}", leader_certificate.author());
255                        #[cfg(feature = "metrics")]
256                        metrics::increment_counter(metrics::bft::LEADERS_ELECTED);
257                    }
258                    false => warn!("BFT failed to elect a leader for round {current_round} (!= {leader_round})"),
259                }
260            } else {
261                match is_ready {
262                    true => info!("Round {current_round} reached quorum without a leader"),
263                    false => info!("{}", format!("Round {current_round} did not elect a leader (yet)").dimmed()),
264                }
265            }
266        }
267
268        // If the BFT is ready, then update to the next round.
269        if is_ready {
270            // Update to the next round in storage.
271            if let Err(err) = self
272                .storage()
273                .increment_to_next_round(current_round)
274                .with_context(|| format!("BFT failed to increment to the next round from round {current_round}"))
275            {
276                warn!("{}", &flatten_error(err));
277                return false;
278            }
279            // Update the timer for the leader certificate.
280            self.leader_certificate_timer.store(now(), Ordering::SeqCst);
281        }
282
283        is_ready
284    }
285
286    /// Notification about a new certificated generated by `Primary` or received by the `Primary` from a peer.
287    async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
288        // Update the DAG with the certificate.
289        self.update_dag::<true, false>(certificate).await
290    }
291}
292
293#[async_trait::async_trait]
294impl<N: Network> SyncCallback<N> for BFT<N> {
295    /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must**
296    /// already exist in the ledger.
297    ///
298    /// This method commits all the certificates into the DAG.
299    /// Note that there is no need to insert the certificates into the DAG, because these certificates
300    /// already exist in the ledger and therefore do not need to be re-ordered into future committed subdags.
301    async fn sync_dag_at_bootup(&self, certificates: Vec<BatchCertificate<N>>) {
302        // Acquire the BFT write lock.
303        let mut dag = self.dag.write();
304
305        // Commit all the certificates.
306        for certificate in certificates {
307            dag.commit(&certificate, self.storage().max_gc_rounds());
308        }
309    }
310
311    /// Notification about a new certificate detected by the `Sync` instance after fetching a new block.
312    async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
313        // Update the DAG with the certificate.
314        self.update_dag::<true, true>(certificate).await
315    }
316}
317
318impl<N: Network> BFT<N> {
319    /// Updates the leader certificate to the current even round,
320    /// returning `true` if the BFT is ready to update to the next round.
321    ///
322    /// This method runs on every even round, by determining the leader of the current even round,
323    /// and setting the leader certificate to their certificate in the round, if they were present.
324    fn update_leader_certificate_to_even_round(&self, even_round: u64) -> bool {
325        // Retrieve the current round.
326        let current_round = self.storage().current_round();
327        // Ensure the current round matches the given round.
328        if current_round != even_round {
329            warn!("BFT storage (at round {current_round}) is out of sync with the current even round {even_round}");
330            return false;
331        }
332
333        // If the current round is odd, return false.
334        if !current_round.is_multiple_of(2) || current_round < 2 {
335            error!("BFT cannot update the leader certificate in an odd round");
336            return false;
337        }
338
339        // Retrieve the certificates for the current round.
340        let current_certificates = self.storage().get_certificates_for_round(current_round);
341        // If there are no current certificates, set the leader certificate to 'None', and return early.
342        if current_certificates.is_empty() {
343            // Set the leader certificate to 'None'.
344            *self.leader_certificate.write() = None;
345            return false;
346        }
347
348        // Retrieve the committee lookback of the current round.
349        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
350            Ok(committee) => committee,
351            Err(err) => {
352                let err = err.context(format!(
353                    "BFT failed to retrieve the committee lookback for the even round {current_round}"
354                ));
355                warn!("{}", &flatten_error(err));
356                return false;
357            }
358        };
359        // Determine the leader of the current round.
360        let leader = match self.ledger().latest_leader() {
361            Some((cached_round, cached_leader)) if cached_round == current_round => cached_leader,
362            _ => {
363                // Compute the leader for the current round.
364                let computed_leader = match committee_lookback.get_leader(current_round) {
365                    Ok(leader) => leader,
366                    Err(err) => {
367                        let err =
368                            err.context(format!("BFT failed to compute the leader for the even round {current_round}"));
369                        error!("{}", &flatten_error(err));
370                        return false;
371                    }
372                };
373
374                // Cache the computed leader.
375                self.ledger().update_latest_leader(current_round, computed_leader);
376
377                computed_leader
378            }
379        };
380        // Find and set the leader certificate, if the leader was present in the current even round.
381        let leader_certificate = current_certificates.iter().find(|certificate| certificate.author() == leader);
382        *self.leader_certificate.write() = leader_certificate.cloned();
383
384        self.is_even_round_ready_for_next_round(current_certificates, committee_lookback, current_round)
385    }
386
387    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
388    ///  - If the leader certificate is set for the current even round.
389    ///  - The timer for the leader certificate has expired.
390    fn is_even_round_ready_for_next_round(
391        &self,
392        certificates: IndexSet<BatchCertificate<N>>,
393        committee: Committee<N>,
394        current_round: u64,
395    ) -> bool {
396        // Retrieve the authors for the current round.
397        let authors = certificates.into_iter().map(|c| c.author()).collect();
398        // Check if quorum threshold is reached.
399        if !committee.is_quorum_threshold_reached(&authors) {
400            trace!("BFT failed to reach quorum threshold in even round {current_round}");
401            return false;
402        }
403        // If the leader certificate is set for the current even round, return 'true'.
404        if let Some(leader_certificate) = self.leader_certificate.read().as_ref()
405            && leader_certificate.round() == current_round
406        {
407            return true;
408        }
409        // If the timer has expired, and we can achieve quorum threshold (N - f) without the leader, return 'true'.
410        if self.is_timer_expired() {
411            debug!("BFT (timer expired) - Advancing from round {current_round} to the next round (without the leader)");
412            return true;
413        }
414        // Otherwise, return 'false'.
415        false
416    }
417
418    /// Returns `true` if the timer for the leader certificate has expired.
419    ///
420    /// This is always true for a new BFT instance.
421    fn is_timer_expired(&self) -> bool {
422        self.leader_certificate_timer.load(Ordering::SeqCst) + MAX_LEADER_CERTIFICATE_DELAY_IN_SECS <= now()
423    }
424
425    /// Returns 'true' if the quorum threshold `(N - f)` is reached for this round under one of the following conditions:
426    ///  - The leader certificate is `None`.
427    ///  - The leader certificate is not included up to availability threshold `(f + 1)` (in the previous certificates of the current round).
428    ///  - The leader certificate timer has expired.
429    fn is_leader_quorum_or_nonleaders_available(&self, odd_round: u64) -> bool {
430        // Retrieve the current round.
431        let current_round = self.storage().current_round();
432        // Ensure the current round matches the given round.
433        if current_round != odd_round {
434            warn!("BFT storage (at round {current_round}) is out of sync with the current odd round {odd_round}");
435            return false;
436        }
437        // If the current round is even, return false.
438        if current_round % 2 != 1 {
439            error!("BFT does not compute stakes for the leader certificate in an even round");
440            return false;
441        }
442        // Retrieve the certificates for the current round.
443        let current_certificates = self.storage().get_certificates_for_round(current_round);
444        // Retrieve the committee lookback for the current round.
445        let committee_lookback = match self.ledger().get_committee_lookback_for_round(current_round) {
446            Ok(committee) => committee,
447            Err(err) => {
448                let err = err.context(format!(
449                    "BFT failed to retrieve the committee lookback for the odd round {current_round}"
450                ));
451                error!("{}", &flatten_error(err));
452                return false;
453            }
454        };
455        // Retrieve the authors of the current certificates.
456        let authors = current_certificates.clone().into_iter().map(|c| c.author()).collect();
457        // Check if quorum threshold is reached.
458        if !committee_lookback.is_quorum_threshold_reached(&authors) {
459            trace!("BFT failed reach quorum threshold in odd round {current_round}.");
460            return false;
461        }
462        // Retrieve the leader certificate.
463        let Some(leader_certificate) = self.leader_certificate.read().clone() else {
464            // If there is no leader certificate for the previous round, return 'true'.
465            return true;
466        };
467        // Compute the stake for the leader certificate.
468        let (stake_with_leader, stake_without_leader) = self.compute_stake_for_leader_certificate(
469            leader_certificate.id(),
470            current_certificates,
471            &committee_lookback,
472        );
473        // Return 'true' if any of the following conditions hold:
474        stake_with_leader >= committee_lookback.availability_threshold()
475            || stake_without_leader >= committee_lookback.quorum_threshold()
476            || self.is_timer_expired()
477    }
478
479    /// Computes the amount of stake that has & has not signed for the leader certificate.
480    fn compute_stake_for_leader_certificate(
481        &self,
482        leader_certificate_id: Field<N>,
483        current_certificates: IndexSet<BatchCertificate<N>>,
484        current_committee: &Committee<N>,
485    ) -> (u64, u64) {
486        // If there are no current certificates, return early.
487        if current_certificates.is_empty() {
488            return (0, 0);
489        }
490
491        // Initialize a tracker for the stake with the leader.
492        let mut stake_with_leader = 0u64;
493        // Initialize a tracker for the stake without the leader.
494        let mut stake_without_leader = 0u64;
495        // Iterate over the current certificates.
496        for certificate in current_certificates {
497            // Retrieve the stake for the author of the certificate.
498            let stake = current_committee.get_stake(certificate.author());
499            // Determine if the certificate includes the leader.
500            match certificate.previous_certificate_ids().iter().any(|id| *id == leader_certificate_id) {
501                // If the certificate includes the leader, add the stake to the stake with the leader.
502                true => stake_with_leader = stake_with_leader.saturating_add(stake),
503                // If the certificate does not include the leader, add the stake to the stake without the leader.
504                false => stake_without_leader = stake_without_leader.saturating_add(stake),
505            }
506        }
507        // Return the stake with the leader, and the stake without the leader.
508        (stake_with_leader, stake_without_leader)
509    }
510}
511
512impl<N: Network> BFT<N> {
513    /// Stores the certificate in the DAG, and attempts to commit one or more anchors.
514    async fn update_dag<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
515        &self,
516        certificate: BatchCertificate<N>,
517    ) -> Result<()> {
518        // Acquire the BFT lock.
519        let _lock = self.lock.lock().await;
520
521        // ### First, insert the certificate into the DAG. ###
522        // Retrieve the round of the new certificate to add to the DAG.
523        let certificate_round = certificate.round();
524
525        // Insert the certificate into the DAG.
526        self.dag.write().insert(certificate);
527
528        // ### Second, determine if a new leader certificate can be committed. ###
529        let commit_round = certificate_round.saturating_sub(1);
530
531        // Leaders are elected in even rounds.
532        // If the previous round is odd, the current round cannot commit any leader certs.
533        // Similarly, no leader certificate can be committed for round zero.
534        if !commit_round.is_multiple_of(2) || commit_round < 2 {
535            return Ok(());
536        }
537        // If the commit round is at or below the last committed round, return early.
538        if commit_round <= self.dag.read().last_committed_round() {
539            return Ok(());
540        }
541
542        /* Proceeding to check if the leader is ready to be committed. */
543        trace!("Checking if the leader is ready to be committed for round {commit_round}...");
544
545        // Retrieve the committee lookback for the commit round.
546        let committee_lookback = self.ledger().get_committee_lookback_for_round(commit_round).with_context(|| {
547            format!("BFT failed to retrieve the committee with lag for commit round {commit_round}")
548        })?;
549
550        // Either retrieve the cached leader or compute it.
551        let leader = match self.ledger().latest_leader() {
552            Some((cached_round, cached_leader)) if cached_round == commit_round => cached_leader,
553            _ => {
554                // Compute the leader for the commit round.
555                let computed_leader = committee_lookback
556                    .get_leader(commit_round)
557                    .with_context(|| format!("BFT failed to compute the leader for commit round {commit_round}"))?;
558
559                // Cache the computed leader.
560                self.ledger().update_latest_leader(commit_round, computed_leader);
561
562                computed_leader
563            }
564        };
565
566        // Retrieve the leader certificate for the commit round.
567        let Some(leader_certificate) = self.dag.read().get_certificate_for_round_with_author(commit_round, leader)
568        else {
569            trace!("BFT did not find the leader certificate for commit round {commit_round} yet");
570            return Ok(());
571        };
572        // Retrieve all of the certificates for the **certificate** round.
573        let certificates = self.dag.read().get_certificates_for_round(certificate_round).with_context(|| {
574            format!("BFT failed to retrieve the certificates for certificate round {certificate_round}")
575        })?;
576
577        // Retrieve the committee lookback for the certificate round (i.e. the round just after the commit round).
578        let certificate_committee_lookback =
579            self.ledger().get_committee_lookback_for_round(certificate_round).with_context(|| {
580                format!("BFT failed to retrieve the committee lookback for certificate round {certificate_round}")
581            })?;
582
583        // Construct a set over the authors who included the leader's certificate in the certificate round.
584        let authors = certificates
585            .values()
586            .filter_map(|c| match c.previous_certificate_ids().contains(&leader_certificate.id()) {
587                true => Some(c.author()),
588                false => None,
589            })
590            .collect();
591        // Check if the leader is ready to be committed.
592        if !certificate_committee_lookback.is_availability_threshold_reached(&authors) {
593            // If the leader is not ready to be committed, return early.
594            trace!("BFT is not ready to commit {commit_round}. Availability threshold has not been reached yet.");
595            return Ok(());
596        }
597
598        if IS_SYNCING {
599            info!("Proceeding to commit round {commit_round} with leader '{}' from block sync", fmt_id(leader));
600        } else {
601            info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader));
602        }
603
604        // Commit the leader certificate, and all previous leader certificates since the last committed round.
605        self.commit_leader_certificate::<ALLOW_LEDGER_ACCESS, IS_SYNCING>(leader_certificate).await
606    }
607
608    /// Commits the leader certificate, and all previous leader certificates since the last committed round.
609    async fn commit_leader_certificate<const ALLOW_LEDGER_ACCESS: bool, const IS_SYNCING: bool>(
610        &self,
611        leader_certificate: BatchCertificate<N>,
612    ) -> Result<()> {
613        #[cfg(feature = "metrics")]
614        let start = std::time::Instant::now();
615        #[cfg(debug_assertions)]
616        trace!("Attempting to commit leader certificate for round {}...", leader_certificate.round());
617
618        // Fetch the leader round.
619        let latest_leader_round = leader_certificate.round();
620        // Determine the list of all previous leader certificates since the last committed round.
621        // The order of the leader certificates is from **newest** to **oldest**.
622        let mut leader_certificates = vec![leader_certificate.clone()];
623        {
624            // Retrieve the leader round.
625            let leader_round = leader_certificate.round();
626
627            let mut current_certificate = leader_certificate;
628            for round in (self.dag.read().last_committed_round() + 2..=leader_round.saturating_sub(2)).rev().step_by(2)
629            {
630                // Retrieve the previous committee for the leader round.
631                let previous_committee_lookback =
632                    self.ledger().get_committee_lookback_for_round(round).with_context(|| {
633                        format!("BFT failed to retrieve a previous committee lookback for the even round {round}")
634                    })?;
635
636                // Either retrieve the cached leader or compute it.
637                let leader = match self.ledger().latest_leader() {
638                    Some((cached_round, cached_leader)) if cached_round == round => cached_leader,
639                    _ => {
640                        // Compute the leader for the commit round.
641                        let computed_leader = previous_committee_lookback
642                            .get_leader(round)
643                            .with_context(|| format!("BFT failed to compute the leader for the even round {round}"))?;
644
645                        // Cache the computed leader.
646                        self.ledger().update_latest_leader(round, computed_leader);
647
648                        computed_leader
649                    }
650                };
651                // Retrieve the previous leader certificate.
652                let Some(previous_certificate) = self.dag.read().get_certificate_for_round_with_author(round, leader)
653                else {
654                    continue;
655                };
656                // Determine if there is a path between the previous certificate and the current certificate.
657                if self.is_linked(previous_certificate.clone(), current_certificate.clone())? {
658                    // Add the previous leader certificate to the list of certificates to commit.
659                    leader_certificates.push(previous_certificate.clone());
660                    // Update the current certificate to the previous leader certificate.
661                    current_certificate = previous_certificate;
662                } else {
663                    #[cfg(debug_assertions)]
664                    trace!(
665                        "Skipping anchor for round {round} as it is not linked to the most recent committed leader certificate"
666                    );
667                }
668            }
669        }
670
671        // Iterate over the leader certificates to commit.
672        for leader_certificate in leader_certificates.into_iter().rev() {
673            // Retrieve the leader certificate round.
674            let leader_round = leader_certificate.round();
675            // Compute the commit subdag.
676            let commit_subdag = self
677                .order_dag_with_dfs::<ALLOW_LEDGER_ACCESS>(leader_certificate)
678                .with_context(|| "BFT failed to order the DAG with DFS")?;
679            // If the node is not syncing, trigger consensus, as this will build a new block for the ledger.
680            if !IS_SYNCING {
681                // Initialize a map for the deduped transmissions.
682                let mut transmissions = IndexMap::new();
683                // Initialize a map for the deduped transaction ids.
684                let mut seen_transaction_ids = IndexSet::new();
685                // Initialize a map for the deduped solution ids.
686                let mut seen_solution_ids = IndexSet::new();
687                // Start from the oldest leader certificate.
688                for certificate in commit_subdag.values().flatten() {
689                    // Retrieve the transmissions.
690                    for transmission_id in certificate.transmission_ids() {
691                        // If the transaction ID or solution ID already exists in the map, skip it.
692                        // Note: This additional check is done to ensure that we do not include duplicate
693                        // transaction IDs or solution IDs that may have a different transmission ID.
694                        match transmission_id {
695                            TransmissionID::Solution(solution_id, _) => {
696                                // If the solution already exists, skip it.
697                                if seen_solution_ids.contains(&solution_id) {
698                                    continue;
699                                }
700                            }
701                            TransmissionID::Transaction(transaction_id, _) => {
702                                // If the transaction already exists, skip it.
703                                if seen_transaction_ids.contains(transaction_id) {
704                                    continue;
705                                }
706                            }
707                            TransmissionID::Ratification => {
708                                bail!("Ratifications are currently not supported in the BFT.")
709                            }
710                        }
711                        // If the transmission already exists in the map, skip it.
712                        if transmissions.contains_key(transmission_id) {
713                            continue;
714                        }
715                        // If the transmission already exists in the ledger, skip it.
716                        // Note: On failure to read from the ledger, we skip including this transmission, out of safety.
717                        if self.ledger().contains_transmission(transmission_id).unwrap_or(true) {
718                            continue;
719                        }
720                        // Retrieve the transmission.
721                        let transmission = self.storage().get_transmission(*transmission_id).with_context(|| {
722                            format!(
723                                "BFT failed to retrieve transmission '{}.{}' from round {}",
724                                fmt_id(transmission_id),
725                                fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed(),
726                                certificate.round()
727                            )
728                        })?;
729                        // Insert the transaction ID or solution ID into the map.
730                        match transmission_id {
731                            TransmissionID::Solution(id, _) => {
732                                seen_solution_ids.insert(id);
733                            }
734                            TransmissionID::Transaction(id, _) => {
735                                seen_transaction_ids.insert(id);
736                            }
737                            TransmissionID::Ratification => {}
738                        }
739                        // Add the transmission to the set.
740                        transmissions.insert(*transmission_id, transmission);
741                    }
742                }
743                // Trigger consensus, as this will build a new block for the ledger.
744                // Construct the subdag.
745                let subdag = Subdag::from(commit_subdag.clone())?;
746                // Retrieve the anchor round.
747                let anchor_round = subdag.anchor_round();
748                // Retrieve the number of transmissions.
749                let num_transmissions = transmissions.len();
750                // Retrieve metadata about the subdag.
751                let subdag_metadata = subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
752
753                // Ensure the subdag anchor round matches the leader round.
754                ensure!(
755                    anchor_round == leader_round,
756                    "BFT failed to commit - the subdag anchor round {anchor_round} does not match the leader round {leader_round}",
757                );
758
759                // Trigger consensus.
760                if let Some(consensus_sender) = self.consensus_sender.get() {
761                    // Initialize a callback sender and receiver.
762                    let (callback_sender, callback_receiver) = oneshot::channel();
763                    // Send the subdag and transmissions to consensus.
764                    consensus_sender.tx_consensus_subdag.send((subdag, transmissions, callback_sender)).await?;
765                    // Await the callback to continue.
766                    match callback_receiver.await {
767                        Ok(Ok(())) => (), // continue
768                        Ok(Err(err)) => {
769                            let err = err.context(format!("BFT failed to advance the subdag for round {anchor_round}"));
770                            error!("{}", &flatten_error(err));
771                            return Ok(());
772                        }
773                        Err(err) => {
774                            let err: anyhow::Error = err.into();
775                            let err =
776                                err.context(format!("BFT failed to receive the callback for round {anchor_round}"));
777                            error!("{}", flatten_error(err));
778                            return Ok(());
779                        }
780                    }
781                }
782
783                info!(
784                    "Committing a subDAG with anchor round {anchor_round} and {num_transmissions} transmissions: {subdag_metadata:?} (syncing={IS_SYNCING})"
785                );
786            }
787
788            // Update the DAG, as the subdag was successfully included into a block.
789            {
790                let mut dag_write = self.dag.write();
791                let mut count = 0;
792                for certificate in commit_subdag.values().flatten() {
793                    dag_write.commit(certificate, self.storage().max_gc_rounds());
794                    count += 1;
795                }
796
797                trace!("Committed {count} certificates to the DAG");
798            }
799
800            // Update the validator telemetry.
801            #[cfg(feature = "telemetry")]
802            self.primary().gateway().validator_telemetry().insert_subdag(&Subdag::from(commit_subdag)?);
803        }
804
805        // Perform garbage collection based on the latest committed leader round.
806        // The protocol guarantees that validators commit the same anchors in the same order,
807        // but they may do so in different chunks of anchors,
808        // where 'chunk' refers to the vector of certificates that the loop just above iterates over.
809        // Doing garbage collection at the end of each chunk (as we do here),
810        // as opposed to after each certificate in the chunk (if we moved this call inside the loop, at the end),
811        // may give raise to a discrepancy between the DAGs of different validators who commit different chunks:
812        // one validator may have more certificates than the other, not yet garbage collected.
813        // However, when `order_dag_with_dfs()` collects the sub-DAG to commit from an anchor,
814        // it excludes certificates that are below the GC round,
815        // so the possible aforementioned discrepancy between DAGs should not affect the consensus.
816        // That exclusion in `order_dag_with_dfs()` is critical to prevent forking,
817        // so long as garbage collection is done after each chunk.
818        // If garbage collection were done after each committed certificate,
819        // that exclusion in `order_dag_with_dfs()` should be unnecessary.
820        self.storage().garbage_collect_certificates(latest_leader_round);
821
822        #[cfg(feature = "metrics")]
823        metrics::histogram(metrics::bft::COMMIT_LEADER_CERTIFICATE_LATENCY, start.elapsed().as_secs_f64());
824        Ok(())
825    }
826
827    /// Returns the subdag of batch certificates to commit.
828    fn order_dag_with_dfs<const ALLOW_LEDGER_ACCESS: bool>(
829        &self,
830        leader_certificate: BatchCertificate<N>,
831    ) -> Result<BTreeMap<u64, IndexSet<BatchCertificate<N>>>> {
832        // Initialize a map for the certificates to commit.
833        let mut commit = BTreeMap::<u64, IndexSet<_>>::new();
834        // Initialize a set for the already ordered certificates.
835        let mut already_ordered = HashSet::new();
836        // Initialize a buffer for the certificates to order.
837        let mut buffer = vec![leader_certificate];
838        // Iterate over the certificates to order.
839        while let Some(certificate) = buffer.pop() {
840            // Insert the certificate into the map.
841            commit.entry(certificate.round()).or_default().insert(certificate.clone());
842
843            // Check if the previous certificate is below the GC round.
844            // This is currently a critical check to prevent forking,
845            // as explained in the comment at the end of `commit_leader_certificate()`,
846            // just before the call to garbage collection.
847            let previous_round = certificate.round().saturating_sub(1);
848            if previous_round + self.storage().max_gc_rounds() <= self.dag.read().last_committed_round() {
849                continue;
850            }
851            // Iterate over the previous certificate IDs.
852            // Note: Using '.rev()' ensures we remain order-preserving (i.e. "left-to-right" on each level),
853            // because this 'while' loop uses 'pop()' to retrieve the next certificate to order.
854            for previous_certificate_id in certificate.previous_certificate_ids().iter().rev() {
855                // If the previous certificate is already ordered, continue.
856                if already_ordered.contains(previous_certificate_id) {
857                    continue;
858                }
859                // If the previous certificate was recently committed, continue.
860                if self.dag.read().is_recently_committed(previous_round, *previous_certificate_id) {
861                    continue;
862                }
863                // If the previous certificate already exists in the ledger, continue.
864                if ALLOW_LEDGER_ACCESS && self.ledger().contains_certificate(previous_certificate_id).unwrap_or(false) {
865                    continue;
866                }
867
868                // Retrieve the previous certificate.
869                let previous_certificate = {
870                    // Start by retrieving the previous certificate from the DAG.
871                    match self.dag.read().get_certificate_for_round_with_id(previous_round, *previous_certificate_id) {
872                        // If the previous certificate is found, return it.
873                        Some(previous_certificate) => previous_certificate,
874                        // If the previous certificate is not found, retrieve it from the storage.
875                        None => match self.storage().get_certificate(*previous_certificate_id) {
876                            // If the previous certificate is found, return it.
877                            Some(previous_certificate) => previous_certificate,
878                            // Otherwise, the previous certificate is missing, and throw an error.
879                            None => bail!(
880                                "Missing previous certificate {} for round {previous_round}",
881                                fmt_id(previous_certificate_id)
882                            ),
883                        },
884                    }
885                };
886                // Insert the previous certificate into the set of already ordered certificates.
887                already_ordered.insert(previous_certificate.id());
888                // Insert the previous certificate into the buffer.
889                buffer.push(previous_certificate);
890            }
891        }
892        // Ensure we only retain certificates that are above the GC round.
893        commit.retain(|round, _| round + self.storage().max_gc_rounds() > self.dag.read().last_committed_round());
894        // Return the certificates to commit.
895        Ok(commit)
896    }
897
898    /// Returns `true` if there is a path from the previous certificate to the current certificate.
899    fn is_linked(
900        &self,
901        previous_certificate: BatchCertificate<N>,
902        current_certificate: BatchCertificate<N>,
903    ) -> Result<bool> {
904        // Initialize the list containing the traversal.
905        let mut traversal = vec![current_certificate.clone()];
906        // Iterate over the rounds from the current certificate to the previous certificate.
907        for round in (previous_certificate.round()..current_certificate.round()).rev() {
908            // Retrieve all of the certificates for this past round.
909            let Some(certificates) = self.dag.read().get_certificates_for_round(round) else {
910                // This is a critical error, as the traversal should have these certificates.
911                // If this error is hit, it is likely that the maximum GC rounds should be increased.
912                bail!("BFT failed to retrieve the certificates for past round {round}");
913            };
914            // Filter the certificates to only include those that are in the traversal.
915            traversal = certificates
916                .into_values()
917                .filter(|p| traversal.iter().any(|c| c.previous_certificate_ids().contains(&p.id())))
918                .collect();
919        }
920        Ok(traversal.contains(&previous_certificate))
921    }
922}
923
924impl<N: Network> BFT<N> {
925    /// Shuts down the BFT.
926    pub async fn shut_down(&self) {
927        info!("Shutting down the BFT...");
928        // Acquire the lock.
929        let _lock = self.lock.lock().await;
930        // Shut down the primary.
931        self.primary.shut_down().await;
932    }
933}
934
935#[cfg(test)]
936mod tests {
937    use crate::{
938        BFT,
939        MAX_LEADER_CERTIFICATE_DELAY_IN_SECS,
940        helpers::{Storage, dag::test_helpers::mock_dag_with_modified_last_committed_round},
941        sync::SyncCallback,
942    };
943
944    use snarkos_account::Account;
945    use snarkos_node_bft_ledger_service::{LedgerService, MockLedgerService};
946    use snarkos_node_bft_storage_service::BFTMemoryService;
947    use snarkos_node_sync::BlockSync;
948    use snarkos_utilities::NodeDataDir;
949
950    use snarkvm::{
951        console::account::{Address, PrivateKey},
952        ledger::{
953            committee::{
954                Committee,
955                test_helpers::{sample_committee, sample_committee_for_round, sample_committee_for_round_and_members},
956            },
957            narwhal::{
958                BatchCertificate,
959                batch_certificate::test_helpers::{
960                    sample_batch_certificate,
961                    sample_batch_certificate_for_round,
962                    sample_batch_certificate_for_round_with_committee,
963                },
964            },
965        },
966        utilities::TestRng,
967    };
968
969    use anyhow::Result;
970    use indexmap::{IndexMap, IndexSet};
971    use std::sync::Arc;
972
973    type CurrentNetwork = snarkvm::console::network::MainnetV0;
974
975    /// Samples a new test instance, with an optional committee round and the given maximum GC rounds.
976    fn sample_test_instance(
977        committee_round: Option<u64>,
978        max_gc_rounds: u64,
979        rng: &mut TestRng,
980    ) -> (
981        Committee<CurrentNetwork>,
982        Account<CurrentNetwork>,
983        Arc<MockLedgerService<CurrentNetwork>>,
984        Storage<CurrentNetwork>,
985    ) {
986        let committee = match committee_round {
987            Some(round) => sample_committee_for_round(round, rng),
988            None => sample_committee(rng),
989        };
990        let account = Account::new(rng).unwrap();
991        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
992        let transmissions = Arc::new(BFTMemoryService::new());
993        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
994
995        (committee, account, ledger, storage)
996    }
997
998    // Helper function to set up BFT for testing.
999    fn initialize_bft(
1000        account: Account<CurrentNetwork>,
1001        storage: Storage<CurrentNetwork>,
1002        ledger: Arc<MockLedgerService<CurrentNetwork>>,
1003    ) -> anyhow::Result<BFT<CurrentNetwork>> {
1004        // Create the block synchronization logic.
1005        let block_sync = Arc::new(BlockSync::new(ledger.clone()));
1006        // Initialize the BFT.
1007        BFT::new(
1008            account.clone(),
1009            storage.clone(),
1010            ledger.clone(),
1011            block_sync,
1012            None,
1013            &[],
1014            false,
1015            NodeDataDir::new_test(None),
1016            None,
1017        )
1018    }
1019
1020    #[test]
1021    #[tracing_test::traced_test]
1022    fn test_is_leader_quorum_odd() -> Result<()> {
1023        let rng = &mut TestRng::default();
1024
1025        // Sample batch certificates.
1026        let mut certificates = IndexSet::new();
1027        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1028        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1029        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1030        certificates.insert(snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_for_round_with_previous_certificate_ids(1, IndexSet::new(), rng));
1031
1032        // Initialize the committee.
1033        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1034            1,
1035            vec![
1036                certificates[0].author(),
1037                certificates[1].author(),
1038                certificates[2].author(),
1039                certificates[3].author(),
1040            ],
1041            rng,
1042        );
1043
1044        // Initialize the ledger.
1045        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1046        // Initialize the storage.
1047        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1048        // Initialize the account.
1049        let account = Account::new(rng)?;
1050        // Initialize the BFT.
1051        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1052        assert!(bft.is_timer_expired());
1053        // Ensure this call succeeds on an odd round.
1054        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1055        // If timer has expired but quorum threshold is not reached, return 'false'.
1056        assert!(!result);
1057        // Insert certificates into storage.
1058        for certificate in certificates.iter() {
1059            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1060        }
1061        // Ensure this call succeeds on an odd round.
1062        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1063        assert!(result); // no previous leader certificate
1064        // Set the leader certificate.
1065        let leader_certificate = sample_batch_certificate(rng);
1066        *bft.leader_certificate.write() = Some(leader_certificate);
1067        // Ensure this call succeeds on an odd round.
1068        let result = bft.is_leader_quorum_or_nonleaders_available(1);
1069        assert!(result); // should now fall through to the end of function
1070
1071        Ok(())
1072    }
1073
1074    #[test]
1075    #[tracing_test::traced_test]
1076    fn test_is_leader_quorum_even_out_of_sync() -> Result<()> {
1077        let rng = &mut TestRng::default();
1078
1079        // Sample the test instance.
1080        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 10, rng);
1081        assert_eq!(committee.starting_round(), 1);
1082        assert_eq!(storage.current_round(), 1);
1083        assert_eq!(storage.max_gc_rounds(), 10);
1084
1085        // Set up the BFT logic.
1086        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1087        assert!(bft.is_timer_expired());
1088
1089        // Store is at round 1, and we are checking for round 2.
1090        // Ensure this call fails on an even round.
1091        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1092        assert!(!result);
1093        Ok(())
1094    }
1095
1096    #[test]
1097    #[tracing_test::traced_test]
1098    fn test_is_leader_quorum_even() -> Result<()> {
1099        let rng = &mut TestRng::default();
1100
1101        // Sample the test instance.
1102        let (committee, account, ledger, storage) = sample_test_instance(Some(2), 10, rng);
1103        assert_eq!(committee.starting_round(), 2);
1104        assert_eq!(storage.current_round(), 2);
1105        assert_eq!(storage.max_gc_rounds(), 10);
1106
1107        // Set up the BFT logic.
1108        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1109        assert!(bft.is_timer_expired());
1110
1111        // Ensure this call fails on an even round.
1112        let result = bft.is_leader_quorum_or_nonleaders_available(2);
1113        assert!(!result);
1114        Ok(())
1115    }
1116
1117    #[test]
1118    #[tracing_test::traced_test]
1119    fn test_is_even_round_ready() -> Result<()> {
1120        let rng = &mut TestRng::default();
1121
1122        // Sample batch certificates.
1123        let mut certificates = IndexSet::new();
1124        certificates.insert(sample_batch_certificate_for_round(2, rng));
1125        certificates.insert(sample_batch_certificate_for_round(2, rng));
1126        certificates.insert(sample_batch_certificate_for_round(2, rng));
1127        certificates.insert(sample_batch_certificate_for_round(2, rng));
1128
1129        // Initialize the committee.
1130        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1131            2,
1132            vec![
1133                certificates[0].author(),
1134                certificates[1].author(),
1135                certificates[2].author(),
1136                certificates[3].author(),
1137            ],
1138            rng,
1139        );
1140
1141        // Initialize the ledger.
1142        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1143        // Initialize the storage.
1144        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1145        // Initialize the account.
1146        let account = Account::new(rng)?;
1147
1148        // Set up the BFT logic.
1149        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1150        assert!(bft.is_timer_expired());
1151
1152        // Set the leader certificate.
1153        let leader_certificate = sample_batch_certificate_for_round(2, rng);
1154        *bft.leader_certificate.write() = Some(leader_certificate);
1155        let result = bft.is_even_round_ready_for_next_round(IndexSet::new(), committee.clone(), 2);
1156        // If leader certificate is set but quorum threshold is not reached, we are not ready for the next round.
1157        assert!(!result);
1158        // Once quorum threshold is reached, we are ready for the next round.
1159        let result = bft.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1160        assert!(result);
1161
1162        // Initialize a new BFT.
1163        let bft_timer = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1164        // If the leader certificate is not set and the timer has not expired, we are not ready for the next round.
1165        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1166        if !bft_timer.is_timer_expired() {
1167            assert!(!result);
1168        }
1169        // Wait for the timer to expire.
1170        let leader_certificate_timeout =
1171            std::time::Duration::from_millis(MAX_LEADER_CERTIFICATE_DELAY_IN_SECS as u64 * 1000);
1172        std::thread::sleep(leader_certificate_timeout);
1173        // Once the leader certificate timer has expired and quorum threshold is reached, we are ready to advance to the next round.
1174        let result = bft_timer.is_even_round_ready_for_next_round(certificates.clone(), committee.clone(), 2);
1175        if bft_timer.is_timer_expired() {
1176            assert!(result);
1177        } else {
1178            assert!(!result);
1179        }
1180
1181        Ok(())
1182    }
1183
1184    #[test]
1185    #[tracing_test::traced_test]
1186    fn test_update_leader_certificate_odd() -> Result<()> {
1187        let rng = &mut TestRng::default();
1188
1189        // Sample the test instance.
1190        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1191        assert_eq!(storage.max_gc_rounds(), 10);
1192
1193        // Initialize the BFT.
1194        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1195        assert!(bft.is_timer_expired());
1196
1197        // Ensure this call fails on an odd round.
1198        let result = bft.update_leader_certificate_to_even_round(1);
1199        assert!(!result);
1200        Ok(())
1201    }
1202
1203    #[test]
1204    #[tracing_test::traced_test]
1205    fn test_update_leader_certificate_bad_round() -> Result<()> {
1206        let rng = &mut TestRng::default();
1207
1208        // Sample the test instance.
1209        let (_, account, ledger, storage) = sample_test_instance(None, 10, rng);
1210        assert_eq!(storage.max_gc_rounds(), 10);
1211
1212        // Initialize the BFT.
1213        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1214
1215        // Ensure this call succeeds on an even round.
1216        let result = bft.update_leader_certificate_to_even_round(6);
1217        assert!(!result);
1218        Ok(())
1219    }
1220
1221    #[test]
1222    #[tracing_test::traced_test]
1223    fn test_update_leader_certificate_even() -> Result<()> {
1224        let rng = &mut TestRng::default();
1225
1226        // Set the current round.
1227        let current_round = 3;
1228
1229        // Sample the certificates.
1230        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1231            current_round,
1232            rng,
1233        );
1234
1235        // Initialize the committee.
1236        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1237            2,
1238            vec![
1239                certificates[0].author(),
1240                certificates[1].author(),
1241                certificates[2].author(),
1242                certificates[3].author(),
1243            ],
1244            rng,
1245        );
1246
1247        // Initialize the ledger.
1248        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1249
1250        // Initialize the storage.
1251        let transmissions = Arc::new(BFTMemoryService::new());
1252        let storage = Storage::new(ledger.clone(), transmissions, 10);
1253        storage.testing_only_insert_certificate_testing_only(certificates[0].clone());
1254        storage.testing_only_insert_certificate_testing_only(certificates[1].clone());
1255        storage.testing_only_insert_certificate_testing_only(certificates[2].clone());
1256        storage.testing_only_insert_certificate_testing_only(certificates[3].clone());
1257        assert_eq!(storage.current_round(), 2);
1258
1259        // Retrieve the leader certificate.
1260        let leader = committee.get_leader(2).unwrap();
1261        let leader_certificate = storage.get_certificate_for_round_with_author(2, leader).unwrap();
1262
1263        // Initialize the BFT.
1264        let account = Account::new(rng)?;
1265        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1266
1267        // Set the leader certificate.
1268        *bft.leader_certificate.write() = Some(leader_certificate);
1269
1270        // Update the leader certificate.
1271        // Ensure this call succeeds on an even round.
1272        let result = bft.update_leader_certificate_to_even_round(2);
1273        assert!(result);
1274
1275        Ok(())
1276    }
1277
1278    #[tokio::test]
1279    #[tracing_test::traced_test]
1280    async fn test_order_dag_with_dfs() -> Result<()> {
1281        let rng = &mut TestRng::default();
1282
1283        // Sample the test instance.
1284        let (_, account, ledger, _) = sample_test_instance(Some(1), 10, rng);
1285
1286        // Initialize the round parameters.
1287        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1288        let current_round = previous_round + 1;
1289
1290        // Sample the current certificate and previous certificates.
1291        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1292            current_round,
1293            rng,
1294        );
1295
1296        /* Test GC */
1297
1298        // Ensure the function succeeds in returning only certificates above GC.
1299        {
1300            // Initialize the storage.
1301            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1302            // Initialize the BFT.
1303            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1304
1305            // Insert a mock DAG in the BFT.
1306            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(3);
1307
1308            // Insert the previous certificates into the BFT.
1309            for certificate in previous_certificates.clone() {
1310                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1311            }
1312
1313            // Ensure this call succeeds and returns all given certificates.
1314            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1315            assert!(result.is_ok());
1316            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1317            assert_eq!(candidate_certificates.len(), 1);
1318            let expected_certificates = vec![certificate.clone()];
1319            assert_eq!(
1320                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1321                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1322            );
1323            assert_eq!(candidate_certificates, expected_certificates);
1324        }
1325
1326        /* Test normal case */
1327
1328        // Ensure the function succeeds in returning all given certificates.
1329        {
1330            // Initialize the storage.
1331            let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 1);
1332            // Initialize the BFT.
1333            let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1334
1335            // Insert a mock DAG in the BFT.
1336            *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(2);
1337
1338            // Insert the previous certificates into the BFT.
1339            for certificate in previous_certificates.clone() {
1340                assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1341            }
1342
1343            // Ensure this call succeeds and returns all given certificates.
1344            let result = bft.order_dag_with_dfs::<false>(certificate.clone());
1345            assert!(result.is_ok());
1346            let candidate_certificates = result.unwrap().into_values().flatten().collect::<Vec<_>>();
1347            assert_eq!(candidate_certificates.len(), 5);
1348            let expected_certificates = vec![
1349                previous_certificates[0].clone(),
1350                previous_certificates[1].clone(),
1351                previous_certificates[2].clone(),
1352                previous_certificates[3].clone(),
1353                certificate,
1354            ];
1355            assert_eq!(
1356                candidate_certificates.iter().map(|c| c.id()).collect::<Vec<_>>(),
1357                expected_certificates.iter().map(|c| c.id()).collect::<Vec<_>>()
1358            );
1359            assert_eq!(candidate_certificates, expected_certificates);
1360        }
1361
1362        Ok(())
1363    }
1364
1365    #[test]
1366    #[tracing_test::traced_test]
1367    fn test_order_dag_with_dfs_fails_on_missing_previous_certificate() -> Result<()> {
1368        let rng = &mut TestRng::default();
1369
1370        // Sample the test instance.
1371        let (committee, account, ledger, storage) = sample_test_instance(Some(1), 1, rng);
1372        assert_eq!(committee.starting_round(), 1);
1373        assert_eq!(storage.current_round(), 1);
1374        assert_eq!(storage.max_gc_rounds(), 1);
1375
1376        // Initialize the round parameters.
1377        let previous_round = 2; // <- This must be an even number, for `BFT::update_dag` to behave correctly below.
1378        let current_round = previous_round + 1;
1379
1380        // Sample the current certificate and previous certificates.
1381        let (certificate, previous_certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1382            current_round,
1383            rng,
1384        );
1385        // Construct the previous certificate IDs.
1386        let previous_certificate_ids: IndexSet<_> = previous_certificates.iter().map(|c| c.id()).collect();
1387
1388        /* Test missing previous certificate. */
1389
1390        // Initialize the BFT.
1391        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1392
1393        // The expected error message.
1394        let error_msg = format!(
1395            "Missing previous certificate {} for round {previous_round}",
1396            crate::helpers::fmt_id(previous_certificate_ids[3]),
1397        );
1398
1399        // Ensure this call fails on a missing previous certificate.
1400        let result = bft.order_dag_with_dfs::<false>(certificate);
1401        assert!(result.is_err());
1402        assert_eq!(result.unwrap_err().to_string(), error_msg);
1403        Ok(())
1404    }
1405
1406    #[tokio::test]
1407    async fn test_bft_gc_on_commit() -> Result<()> {
1408        let rng = &mut TestRng::default();
1409
1410        // Initialize the round parameters.
1411        let max_gc_rounds = 1;
1412        let committee_round = 0;
1413        let commit_round = 2;
1414        let current_round = commit_round + 1;
1415
1416        // Sample the certificates.
1417        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1418            current_round,
1419            rng,
1420        );
1421
1422        // Initialize the committee.
1423        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1424            committee_round,
1425            vec![
1426                certificates[0].author(),
1427                certificates[1].author(),
1428                certificates[2].author(),
1429                certificates[3].author(),
1430            ],
1431            rng,
1432        );
1433
1434        // Initialize the ledger.
1435        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1436
1437        // Initialize the storage.
1438        let transmissions = Arc::new(BFTMemoryService::new());
1439        let storage = Storage::new(ledger.clone(), transmissions, max_gc_rounds);
1440        // Insert the certificates into the storage.
1441        for certificate in certificates.iter() {
1442            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1443        }
1444
1445        // Get the leader certificate.
1446        let leader = committee.get_leader(commit_round).unwrap();
1447        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1448
1449        // Initialize the BFT.
1450        let account = Account::new(rng)?;
1451        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1452
1453        // Create an empty mock DAG with last committed round set to `commit_round`.
1454        *bft.dag.write() = mock_dag_with_modified_last_committed_round(commit_round);
1455
1456        // Ensure that the `gc_round` has not been updated yet.
1457        assert_eq!(bft.storage().gc_round(), committee_round.saturating_sub(max_gc_rounds));
1458
1459        // Insert the certificates into the BFT.
1460        for certificate in certificates {
1461            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1462        }
1463
1464        // Commit the leader certificate.
1465        bft.commit_leader_certificate::<false, false>(leader_certificate).await.unwrap();
1466
1467        // Ensure that the `gc_round` has been updated.
1468        assert_eq!(bft.storage().gc_round(), commit_round - max_gc_rounds);
1469
1470        Ok(())
1471    }
1472
1473    #[tokio::test]
1474    #[tracing_test::traced_test]
1475    async fn test_sync_bft_dag_at_bootup() -> Result<()> {
1476        let rng = &mut TestRng::default();
1477
1478        // Initialize the round parameters.
1479        let max_gc_rounds = 1;
1480        let committee_round = 0;
1481        let commit_round = 2;
1482        let current_round = commit_round + 1;
1483
1484        // Sample the current certificate and previous certificates.
1485        let (_, certificates) = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate_with_previous_certificates(
1486            current_round,
1487            rng,
1488        );
1489
1490        // Initialize the committee.
1491        let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1492            committee_round,
1493            vec![
1494                certificates[0].author(),
1495                certificates[1].author(),
1496                certificates[2].author(),
1497                certificates[3].author(),
1498            ],
1499            rng,
1500        );
1501
1502        // Initialize the ledger.
1503        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1504
1505        // Initialize the storage.
1506        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1507        // Insert the certificates into the storage.
1508        for certificate in certificates.iter() {
1509            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1510        }
1511
1512        // Get the leader certificate.
1513        let leader = committee.get_leader(commit_round).unwrap();
1514        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1515
1516        // Initialize the BFT.
1517        let account = Account::new(rng)?;
1518        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1519
1520        // Insert a mock DAG in the BFT.
1521        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(commit_round);
1522
1523        // Insert the previous certificates into the BFT.
1524        for certificate in certificates.clone() {
1525            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1526        }
1527
1528        // Commit the leader certificate.
1529        bft.commit_leader_certificate::<false, false>(leader_certificate.clone()).await.unwrap();
1530
1531        // Simulate a bootup of the BFT.
1532
1533        // Initialize a new instance of storage.
1534        let storage_2 = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1535        // Initialize a new instance of BFT.
1536        let bootup_bft = initialize_bft(account.clone(), storage_2, ledger)?;
1537
1538        // Sync the BFT DAG at bootup.
1539        bootup_bft.sync_dag_at_bootup(certificates.clone()).await;
1540
1541        // Check that the BFT starts from the same last committed round.
1542        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1543
1544        // Ensure that both BFTs have committed the leader certificate.
1545        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1546        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1547
1548        // Check the state of the bootup BFT.
1549        for certificate in certificates {
1550            let certificate_round = certificate.round();
1551            let certificate_id = certificate.id();
1552            // Check that the bootup BFT has committed the certificates.
1553            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1554            // Check that the bootup BFT does not contain the certificates in its graph, because
1555            // it should not need to order them again in subsequent subdags.
1556            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1557        }
1558
1559        Ok(())
1560    }
1561
1562    #[tokio::test]
1563    #[tracing_test::traced_test]
1564    async fn test_sync_bft_dag_at_bootup_shutdown() -> Result<()> {
1565        /*
1566        1. Run one uninterrupted BFT on a set of certificates for 2 leader commits.
1567        2. Run a separate bootup BFT that syncs with a set of pre shutdown certificates, and then commits a second leader normally over a set of post shutdown certificates.
1568        3. Observe that the uninterrupted BFT and the bootup BFT end in the same state.
1569        */
1570
1571        let rng = &mut TestRng::default();
1572
1573        // Initialize the round parameters.
1574        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1575        let committee_round = 0;
1576        let commit_round = 2;
1577        let current_round = commit_round + 1;
1578        let next_round = current_round + 1;
1579
1580        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1581        let (round_to_certificates_map, committee) = {
1582            let private_keys = [
1583                PrivateKey::new(rng).unwrap(),
1584                PrivateKey::new(rng).unwrap(),
1585                PrivateKey::new(rng).unwrap(),
1586                PrivateKey::new(rng).unwrap(),
1587            ];
1588            let addresses = vec![
1589                Address::try_from(private_keys[0])?,
1590                Address::try_from(private_keys[1])?,
1591                Address::try_from(private_keys[2])?,
1592                Address::try_from(private_keys[3])?,
1593            ];
1594            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1595                committee_round,
1596                addresses,
1597                rng,
1598            );
1599            // Initialize a mapping from the round number to the set of batch certificates in the round.
1600            let mut round_to_certificates_map: IndexMap<
1601                u64,
1602                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1603            > = IndexMap::new();
1604            let mut previous_certificates = IndexSet::with_capacity(4);
1605            // Initialize the genesis batch certificates.
1606            for _ in 0..4 {
1607                previous_certificates.insert(sample_batch_certificate(rng));
1608            }
1609            for round in 0..commit_round + 3 {
1610                let mut current_certificates = IndexSet::new();
1611                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1612                    IndexSet::new()
1613                } else {
1614                    previous_certificates.iter().map(|c| c.id()).collect()
1615                };
1616                let transmission_ids =
1617                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1618                        .into_iter()
1619                        .collect::<IndexSet<_>>();
1620                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1621                let committee_id = committee.id();
1622                for (i, private_key_1) in private_keys.iter().enumerate() {
1623                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1624                        private_key_1,
1625                        round,
1626                        timestamp,
1627                        committee_id,
1628                        transmission_ids.clone(),
1629                        previous_certificate_ids.clone(),
1630                        rng,
1631                    )
1632                    .unwrap();
1633                    let mut signatures = IndexSet::with_capacity(4);
1634                    for (j, private_key_2) in private_keys.iter().enumerate() {
1635                        if i != j {
1636                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1637                        }
1638                    }
1639                    let certificate =
1640                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1641                    current_certificates.insert(certificate);
1642                }
1643                // Update the mapping.
1644                round_to_certificates_map.insert(round, current_certificates.clone());
1645                previous_certificates = current_certificates.clone();
1646            }
1647            (round_to_certificates_map, committee)
1648        };
1649
1650        // Initialize the ledger.
1651        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1652        // Initialize the storage.
1653        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1654        // Get the leaders for the next 2 commit rounds.
1655        let leader = committee.get_leader(commit_round).unwrap();
1656        let next_leader = committee.get_leader(next_round).unwrap();
1657        // Insert the pre shutdown certificates into the storage.
1658        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1659        for i in 1..=commit_round {
1660            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1661            if i == commit_round {
1662                // Only insert the leader certificate for the commit round.
1663                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1664                if let Some(c) = leader_certificate {
1665                    pre_shutdown_certificates.push(c.clone());
1666                }
1667                continue;
1668            }
1669            pre_shutdown_certificates.extend(certificates);
1670        }
1671        for certificate in pre_shutdown_certificates.iter() {
1672            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1673        }
1674        // Insert the post shutdown certificates into the storage.
1675        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1676            Vec::new();
1677        for j in commit_round..=commit_round + 2 {
1678            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1679            post_shutdown_certificates.extend(certificate);
1680        }
1681        for certificate in post_shutdown_certificates.iter() {
1682            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1683        }
1684        // Get the leader certificates.
1685        let leader_certificate = storage.get_certificate_for_round_with_author(commit_round, leader).unwrap();
1686        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1687
1688        // Initialize the BFT without bootup.
1689        let account = Account::new(rng)?;
1690        let bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1691
1692        // Insert a mock DAG in the BFT without bootup.
1693        *bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1694
1695        // Insert the certificates into the BFT without bootup.
1696        for certificate in pre_shutdown_certificates.clone() {
1697            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1698        }
1699
1700        // Insert the post shutdown certificates into the BFT without bootup.
1701        for certificate in post_shutdown_certificates.clone() {
1702            assert!(bft.update_dag::<false, false>(certificate).await.is_ok());
1703        }
1704        // Commit the second leader certificate.
1705        let commit_subdag = bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1706        let commit_subdag_metadata = commit_subdag.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1707        bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1708
1709        // Simulate a bootup of the BFT.
1710
1711        // Initialize a new instance of storage.
1712        let bootup_storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1713
1714        // Initialize a new instance of BFT with bootup.
1715        let bootup_bft = initialize_bft(account.clone(), bootup_storage.clone(), ledger.clone())?;
1716
1717        // Sync the BFT DAG at bootup.
1718        bootup_bft.sync_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1719
1720        // Insert the post shutdown certificates to the storage and BFT with bootup.
1721        for certificate in post_shutdown_certificates.iter() {
1722            bootup_bft.storage().testing_only_insert_certificate_testing_only(certificate.clone());
1723        }
1724        for certificate in post_shutdown_certificates.clone() {
1725            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1726        }
1727        // Commit the second leader certificate.
1728        let commit_subdag_bootup = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate.clone()).unwrap();
1729        let commit_subdag_metadata_bootup =
1730            commit_subdag_bootup.iter().map(|(round, c)| (*round, c.len())).collect::<Vec<_>>();
1731        let committed_certificates_bootup = commit_subdag_bootup.values().flatten();
1732        bootup_bft.commit_leader_certificate::<false, false>(next_leader_certificate.clone()).await.unwrap();
1733
1734        // Check that the final state of both BFTs is the same.
1735
1736        // Check that both BFTs start from the same last committed round.
1737        assert_eq!(bft.dag.read().last_committed_round(), bootup_bft.dag.read().last_committed_round());
1738
1739        // Ensure that both BFTs have committed the leader certificates.
1740        assert!(bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1741        assert!(bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id()));
1742        assert!(bootup_bft.dag.read().is_recently_committed(leader_certificate.round(), leader_certificate.id()));
1743        assert!(
1744            bootup_bft.dag.read().is_recently_committed(next_leader_certificate.round(), next_leader_certificate.id())
1745        );
1746
1747        // Check that the bootup BFT has committed the pre shutdown certificates.
1748        for certificate in pre_shutdown_certificates.clone() {
1749            let certificate_round = certificate.round();
1750            let certificate_id = certificate.id();
1751            // Check that both BFTs have committed the certificates.
1752            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1753            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1754            // Check that the bootup BFT does not contain the certificates in its graph, because
1755            // it should not need to order them again in subsequent subdags.
1756            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1757            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1758        }
1759
1760        // Check that that the bootup BFT has committed the subdag stemming from the second leader certificate in consensus.
1761        for certificate in committed_certificates_bootup.clone() {
1762            let certificate_round = certificate.round();
1763            let certificate_id = certificate.id();
1764            // Check that the both BFTs have committed the certificates.
1765            assert!(bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1766            assert!(bootup_bft.dag.read().is_recently_committed(certificate_round, certificate_id));
1767            // Check that the bootup BFT does not contain the certificates in its graph, because
1768            // it should not need to order them again in subsequent subdags.
1769            assert!(!bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1770            assert!(!bootup_bft.dag.read().contains_certificate_in_round(certificate_round, certificate_id));
1771        }
1772
1773        // Check that the commit subdag metadata for the second leader is the same for both BFTs.
1774        assert_eq!(commit_subdag_metadata_bootup, commit_subdag_metadata);
1775
1776        Ok(())
1777    }
1778
1779    #[tokio::test]
1780    #[tracing_test::traced_test]
1781    async fn test_sync_bft_dag_at_bootup_dfs() -> Result<()> {
1782        /*
1783        1. Run a bootup BFT that syncs with a set of pre shutdown certificates.
1784        2. Add post shutdown certificates to the bootup BFT.
1785        2. Observe that in the commit subdag of the second leader certificate, there are no repeated vertices from the pre shutdown certificates.
1786        */
1787
1788        let rng = &mut TestRng::default();
1789
1790        // Initialize the round parameters.
1791        let max_gc_rounds = snarkvm::ledger::narwhal::BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64;
1792        let committee_round = 0;
1793        let commit_round = 2;
1794        let current_round = commit_round + 1;
1795        let next_round = current_round + 1;
1796
1797        // Sample 5 rounds of batch certificates starting at the genesis round from a static set of 4 authors.
1798        let (round_to_certificates_map, committee) = {
1799            let private_keys = [
1800                PrivateKey::new(rng).unwrap(),
1801                PrivateKey::new(rng).unwrap(),
1802                PrivateKey::new(rng).unwrap(),
1803                PrivateKey::new(rng).unwrap(),
1804            ];
1805            let addresses = vec![
1806                Address::try_from(private_keys[0])?,
1807                Address::try_from(private_keys[1])?,
1808                Address::try_from(private_keys[2])?,
1809                Address::try_from(private_keys[3])?,
1810            ];
1811            let committee = snarkvm::ledger::committee::test_helpers::sample_committee_for_round_and_members(
1812                committee_round,
1813                addresses,
1814                rng,
1815            );
1816            // Initialize a mapping from the round number to the set of batch certificates in the round.
1817            let mut round_to_certificates_map: IndexMap<
1818                u64,
1819                IndexSet<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>>,
1820            > = IndexMap::new();
1821            let mut previous_certificates = IndexSet::with_capacity(4);
1822            // Initialize the genesis batch certificates.
1823            for _ in 0..4 {
1824                previous_certificates.insert(sample_batch_certificate(rng));
1825            }
1826            for round in 0..=commit_round + 2 {
1827                let mut current_certificates = IndexSet::new();
1828                let previous_certificate_ids: IndexSet<_> = if round == 0 || round == 1 {
1829                    IndexSet::new()
1830                } else {
1831                    previous_certificates.iter().map(|c| c.id()).collect()
1832                };
1833                let transmission_ids =
1834                    snarkvm::ledger::narwhal::transmission_id::test_helpers::sample_transmission_ids(rng)
1835                        .into_iter()
1836                        .collect::<IndexSet<_>>();
1837                let timestamp = time::OffsetDateTime::now_utc().unix_timestamp();
1838                let committee_id = committee.id();
1839                for (i, private_key_1) in private_keys.iter().enumerate() {
1840                    let batch_header = snarkvm::ledger::narwhal::BatchHeader::new(
1841                        private_key_1,
1842                        round,
1843                        timestamp,
1844                        committee_id,
1845                        transmission_ids.clone(),
1846                        previous_certificate_ids.clone(),
1847                        rng,
1848                    )
1849                    .unwrap();
1850                    let mut signatures = IndexSet::with_capacity(4);
1851                    for (j, private_key_2) in private_keys.iter().enumerate() {
1852                        if i != j {
1853                            signatures.insert(private_key_2.sign(&[batch_header.batch_id()], rng).unwrap());
1854                        }
1855                    }
1856                    let certificate =
1857                        snarkvm::ledger::narwhal::BatchCertificate::from(batch_header, signatures).unwrap();
1858                    current_certificates.insert(certificate);
1859                }
1860                // Update the mapping.
1861                round_to_certificates_map.insert(round, current_certificates.clone());
1862                previous_certificates = current_certificates.clone();
1863            }
1864            (round_to_certificates_map, committee)
1865        };
1866
1867        // Initialize the ledger.
1868        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1869        // Initialize the storage.
1870        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1871        // Get the leaders for the next 2 commit rounds.
1872        let leader = committee.get_leader(commit_round).unwrap();
1873        let next_leader = committee.get_leader(next_round).unwrap();
1874        // Insert the pre shutdown certificates into the storage.
1875        let mut pre_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> = Vec::new();
1876        for i in 1..=commit_round {
1877            let certificates = (*round_to_certificates_map.get(&i).unwrap()).clone();
1878            if i == commit_round {
1879                // Only insert the leader certificate for the commit round.
1880                let leader_certificate = certificates.iter().find(|certificate| certificate.author() == leader);
1881                if let Some(c) = leader_certificate {
1882                    pre_shutdown_certificates.push(c.clone());
1883                }
1884                continue;
1885            }
1886            pre_shutdown_certificates.extend(certificates);
1887        }
1888        for certificate in pre_shutdown_certificates.iter() {
1889            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1890        }
1891        // Initialize the bootup BFT.
1892        let account = Account::new(rng)?;
1893        let bootup_bft = initialize_bft(account.clone(), storage.clone(), ledger.clone())?;
1894
1895        // Insert a mock DAG in the BFT without bootup.
1896        *bootup_bft.dag.write() = crate::helpers::dag::test_helpers::mock_dag_with_modified_last_committed_round(0);
1897        // Sync the BFT DAG at bootup.
1898        bootup_bft.sync_dag_at_bootup(pre_shutdown_certificates.clone()).await;
1899
1900        // Insert the post shutdown certificates into the storage.
1901        let mut post_shutdown_certificates: Vec<snarkvm::ledger::narwhal::BatchCertificate<CurrentNetwork>> =
1902            Vec::new();
1903        for j in commit_round..=commit_round + 2 {
1904            let certificate = (*round_to_certificates_map.get(&j).unwrap()).clone();
1905            post_shutdown_certificates.extend(certificate);
1906        }
1907        for certificate in post_shutdown_certificates.iter() {
1908            storage.testing_only_insert_certificate_testing_only(certificate.clone());
1909        }
1910
1911        // Insert the post shutdown certificates into the DAG.
1912        for certificate in post_shutdown_certificates.clone() {
1913            assert!(bootup_bft.update_dag::<false, false>(certificate).await.is_ok());
1914        }
1915
1916        // Get the next leader certificate to commit.
1917        let next_leader_certificate = storage.get_certificate_for_round_with_author(next_round, next_leader).unwrap();
1918        let commit_subdag = bootup_bft.order_dag_with_dfs::<false>(next_leader_certificate).unwrap();
1919        let committed_certificates = commit_subdag.values().flatten();
1920
1921        // Check that none of the certificates synced from the bootup appear in the subdag for the next commit round.
1922        for pre_shutdown_certificate in pre_shutdown_certificates.clone() {
1923            for committed_certificate in committed_certificates.clone() {
1924                assert_ne!(pre_shutdown_certificate.id(), committed_certificate.id());
1925            }
1926        }
1927        Ok(())
1928    }
1929
1930    /// Tests that a leader certificate can be committed by sufficient endorsements in a succeeding leader certificate.
1931    #[test_log::test(tokio::test)]
1932    async fn test_commit_via_is_linked() {
1933        let rng = &mut TestRng::default();
1934
1935        let committee_round = 0;
1936        let leader_round_1 = 2;
1937        let leader_round_2 = 4; // subsequent even round
1938        let max_gc_rounds = 50;
1939
1940        // Create a committee with four members.
1941        let num_authors = 4;
1942        let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
1943        let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
1944
1945        let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
1946        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
1947        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
1948        let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
1949
1950        let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
1951
1952        // Round 1
1953        let round1_certs: IndexSet<_> = (0..num_authors)
1954            .map(|idx| {
1955                let author = &private_keys[idx];
1956                let endorsements: Vec<_> = private_keys
1957                    .iter()
1958                    .enumerate()
1959                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
1960                    .collect();
1961
1962                sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
1963            })
1964            .collect();
1965        certificates_by_round.insert(1, round1_certs.clone());
1966
1967        let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
1968        let mut leader1_certificate = None;
1969
1970        let round2_certs: IndexSet<_> = (0..num_authors)
1971            .map(|idx| {
1972                let author = &private_keys[idx];
1973                let endorsements: Vec<_> = private_keys
1974                    .iter()
1975                    .enumerate()
1976                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
1977                    .collect();
1978                let cert = sample_batch_certificate_for_round_with_committee(
1979                    leader_round_1,
1980                    round1_certs.iter().map(|c| c.id()).collect(),
1981                    author,
1982                    &endorsements[..],
1983                    rng,
1984                );
1985
1986                if cert.author() == leader1 {
1987                    leader1_certificate = Some(cert.clone());
1988                }
1989                cert
1990            })
1991            .collect();
1992        certificates_by_round.insert(leader_round_1, round2_certs.clone());
1993
1994        let round3_certs: IndexSet<_> = (0..num_authors)
1995            .map(|idx| {
1996                let author = &private_keys[idx];
1997                let endorsements: Vec<_> = private_keys
1998                    .iter()
1999                    .enumerate()
2000                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2001                    .collect();
2002
2003                let previous_certificate_ids: IndexSet<_> = round2_certs
2004                    .iter()
2005                    .filter_map(|cert| {
2006                        // Only have the leader endorse the previous round's leader certificate.
2007                        if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2008                    })
2009                    .collect();
2010
2011                sample_batch_certificate_for_round_with_committee(
2012                    leader_round_1 + 1,
2013                    previous_certificate_ids,
2014                    author,
2015                    &endorsements[..],
2016                    rng,
2017                )
2018            })
2019            .collect();
2020        certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2021
2022        // Ensure the first leader's certificate is not committed yet.
2023        let leader_certificate_1 = leader1_certificate.unwrap();
2024        assert!(
2025            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2026            "Leader certificate 1 should not be committed yet"
2027        );
2028        assert_eq!(bft.dag.read().last_committed_round(), 0);
2029
2030        let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2031        let round4_certs: IndexSet<_> = (0..num_authors)
2032            .map(|idx| {
2033                let endorsements: Vec<_> = private_keys
2034                    .iter()
2035                    .enumerate()
2036                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2037                    .collect();
2038
2039                sample_batch_certificate_for_round_with_committee(
2040                    leader_round_2,
2041                    round3_certs.iter().map(|c| c.id()).collect(),
2042                    &private_keys[idx],
2043                    &endorsements[..],
2044                    rng,
2045                )
2046            })
2047            .collect();
2048        certificates_by_round.insert(leader_round_2, round4_certs.clone());
2049
2050        // Insert all certificates into the storage and DAG.
2051        for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2052            storage.testing_only_insert_certificate_testing_only(certificate.clone());
2053            bft.update_dag::<false, false>(certificate).await.unwrap();
2054        }
2055
2056        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2057
2058        assert!(
2059            bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2060            "Leader certificate 1 should be linked to leader certificate 2"
2061        );
2062
2063        // Explicitely commit leader certificate 2.
2064        bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2065
2066        // Leader certificate 1 should be committed transitively when committing the leader certificate 2.
2067        assert!(
2068            bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2069            "Leader certificate for round 2 should be committed when committing at round 4"
2070        );
2071
2072        // Leader certificate 2 should be committed as the above call was successful.
2073        assert!(
2074            bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2075            "Leader certificate for round 4 should be committed"
2076        );
2077
2078        assert_eq!(bft.dag.read().last_committed_round(), 4);
2079    }
2080
2081    #[test_log::test(tokio::test)]
2082    async fn test_commit_via_is_linked_with_skipped_anchor() {
2083        let rng = &mut TestRng::default();
2084
2085        let committee_round = 0;
2086        let leader_round_1 = 2;
2087        let leader_round_2 = 4;
2088        let max_gc_rounds = 50;
2089
2090        let num_authors = 4;
2091        let private_keys: Vec<_> = (0..num_authors).map(|_| PrivateKey::new(rng).unwrap()).collect();
2092        let addresses: Vec<_> = private_keys.iter().map(|pkey| Address::try_from(pkey).unwrap()).collect();
2093
2094        let committee = sample_committee_for_round_and_members(committee_round, addresses.clone(), rng);
2095        let ledger = Arc::new(MockLedgerService::new(committee.clone()));
2096        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), max_gc_rounds);
2097        let bft = initialize_bft(Account::new(rng).unwrap(), storage.clone(), ledger.clone()).unwrap();
2098
2099        let mut certificates_by_round: IndexMap<u64, IndexSet<BatchCertificate<CurrentNetwork>>> = IndexMap::new();
2100
2101        // Round 1
2102        let round1_certs: IndexSet<_> = (0..num_authors)
2103            .map(|idx| {
2104                let author = &private_keys[idx];
2105                let endorsements: Vec<_> = private_keys
2106                    .iter()
2107                    .enumerate()
2108                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2109                    .collect();
2110
2111                sample_batch_certificate_for_round_with_committee(1, IndexSet::new(), author, &endorsements[..], rng)
2112            })
2113            .collect();
2114        certificates_by_round.insert(1, round1_certs.clone());
2115
2116        let leader1 = ledger.get_committee_for_round(leader_round_1 + 1).unwrap().get_leader(leader_round_1).unwrap();
2117        let mut leader1_certificate = None;
2118
2119        let round2_certs: IndexSet<_> = (0..num_authors)
2120            .map(|idx| {
2121                let author = &private_keys[idx];
2122                let endorsements: Vec<_> = private_keys
2123                    .iter()
2124                    .enumerate()
2125                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2126                    .collect();
2127                let cert = sample_batch_certificate_for_round_with_committee(
2128                    leader_round_1,
2129                    round1_certs.iter().map(|c| c.id()).collect(),
2130                    author,
2131                    &endorsements[..],
2132                    rng,
2133                );
2134
2135                if cert.author() == leader1 {
2136                    leader1_certificate = Some(cert.clone());
2137                }
2138                cert
2139            })
2140            .collect();
2141        certificates_by_round.insert(leader_round_1, round2_certs.clone());
2142
2143        let round3_certs: IndexSet<_> = (0..num_authors)
2144            .map(|idx| {
2145                let author = &private_keys[idx];
2146                let endorsements: Vec<_> = private_keys
2147                    .iter()
2148                    .enumerate()
2149                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2150                    .collect();
2151
2152                let previous_certificate_ids: IndexSet<_> = round2_certs
2153                    .iter()
2154                    .filter_map(|cert| {
2155                        // Only have the leader endorse the previous round's leader certificate.
2156                        if cert.author() == leader1 && cert.author() != addresses[idx] { None } else { Some(cert.id()) }
2157                    })
2158                    .collect();
2159
2160                sample_batch_certificate_for_round_with_committee(
2161                    leader_round_1 + 1,
2162                    previous_certificate_ids,
2163                    author,
2164                    &endorsements[..],
2165                    rng,
2166                )
2167            })
2168            .collect();
2169        certificates_by_round.insert(leader_round_1 + 1, round3_certs.clone());
2170
2171        // Ensure the first leader's certificate is not committed yet.
2172        let leader_certificate_1 = leader1_certificate.unwrap();
2173        assert!(
2174            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2175            "Leader certificate 1 should not be committed yet"
2176        );
2177
2178        let leader2 = ledger.get_committee_for_round(leader_round_2 + 1).unwrap().get_leader(leader_round_2).unwrap();
2179        let round4_certs: IndexSet<_> = (0..num_authors)
2180            .map(|idx| {
2181                let endorsements: Vec<_> = private_keys
2182                    .iter()
2183                    .enumerate()
2184                    .filter_map(|(other_idx, pkey)| if idx == other_idx { None } else { Some(*pkey) })
2185                    .collect();
2186
2187                // Do not create a path to the previous leader certificate.
2188                let previous_certificate_ids: IndexSet<_> = round3_certs
2189                    .iter()
2190                    .filter_map(|cert| if cert.author() == leader1 { None } else { Some(cert.id()) })
2191                    .collect();
2192
2193                sample_batch_certificate_for_round_with_committee(
2194                    leader_round_2,
2195                    previous_certificate_ids,
2196                    &private_keys[idx],
2197                    &endorsements[..],
2198                    rng,
2199                )
2200            })
2201            .collect();
2202        certificates_by_round.insert(leader_round_2, round4_certs.clone());
2203
2204        // Insert all certificates into the storage and DAG.
2205        for certificate in certificates_by_round.into_iter().flat_map(|(_, certs)| certs) {
2206            storage.testing_only_insert_certificate_testing_only(certificate.clone());
2207            bft.update_dag::<false, false>(certificate).await.unwrap();
2208        }
2209
2210        let leader_certificate_2 = storage.get_certificate_for_round_with_author(leader_round_2, leader2).unwrap();
2211
2212        assert!(
2213            !bft.is_linked(leader_certificate_1.clone(), leader_certificate_2.clone()).unwrap(),
2214            "Leader certificate 1 should not be linked to leader certificate 2"
2215        );
2216        assert_eq!(bft.dag.read().last_committed_round(), 0);
2217
2218        // Explicitely commit leader certificate 2.
2219        bft.commit_leader_certificate::<false, false>(leader_certificate_2.clone()).await.unwrap();
2220
2221        // Leader certificate 1 should be committed transitively when committing the leader certificate 2.
2222        assert!(
2223            !bft.dag.read().is_recently_committed(leader_round_1, leader_certificate_1.id()),
2224            "Leader certificate for round 2 should not be committed when committing at round 4"
2225        );
2226
2227        // Leader certificate 2 should be committed as the above call was successful.
2228        assert!(
2229            bft.dag.read().is_recently_committed(leader_round_2, leader_certificate_2.id()),
2230            "Leader certificate for round 4 should be committed"
2231        );
2232        assert_eq!(bft.dag.read().last_committed_round(), 4);
2233    }
2234}